Skip to content

Commit

Permalink
streamproducer: check the job type for replication stream
Browse files Browse the repository at this point in the history
Previously, we would panic if the job id corresponded to a job type
different from the replication stream job, and this is now fixed.

Release justification: bug fix.

Release note: None
  • Loading branch information
yuzefovich committed Aug 22, 2022
1 parent 31bd044 commit 89d6c55
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,6 @@ SELECT crdb_internal.stream_ingestion_stats_json(unique_rowid());

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): job.*is not a stream ingestion job
SELECT crdb_internal.stream_ingestion_stats_json(id) FROM (SELECT id FROM system.jobs LIMIT 1);

query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ func getReplicationStreamSpec(
planCtx := dsp.NewPlanningCtx(evalCtx.Ctx(), jobExecCtx.ExtendedEvalContext(),
nil /* planner */, noTxn, sql.DistributionTypeSystemTenantOnly)

replicatedSpans := j.Details().(jobspb.StreamReplicationDetails).Spans
details, ok := j.Details().(jobspb.StreamReplicationDetails)
if !ok {
return nil, errors.Errorf("job with id %d is not a replication stream job", streamID)
}
replicatedSpans := details.Spans
spans := make([]roachpb.Span, 0, len(replicatedSpans))
for _, span := range replicatedSpans {
spans = append(spans, *span)
Expand Down

0 comments on commit 89d6c55

Please sign in to comment.