Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83904: rfc: add user defined function RFC r=mgartner a=mgartner

Release note: None

84286: streamingccl: heartbeat persisted frontier r=samiskin a=samiskin

Resolves #84086 

Previously we would forward the ingestion frontier's resolved timestamp
at the point of heartbeat, however this would result in the chance of
the protected timestamp record of the producer to exceed that of the
ingestion job's persisted frontier.

This PR uses the last persisted frontier value instead.

Release note (bug fix): The protected timestamp of the producer job is
no longer able to exceed the persisted ingestion frontier.


85285: colexec: improve handling of the metadata r=yuzefovich a=yuzefovich

**colmem: introduce a helper to release all reservations from allocator**

Release note: None

This commit audits all of the places where we're operating with the
producer metadata and improves things a bit. This was prompted by the
fact that some types of the metadata (in particular, the
LeafTxnFinalState) can be of non-trivial footprint, so the sooner we
lose the reference to the metadata objects, the more stable CRDB will
be.

**colexec: improve handling of the metadata**

This commit adds the memory accounting for the LeafTxnFinalState
metadata objects in most (all?) places that buffer metadata (inbox,
columnarizer, materializer, parallel unordered synchronizer). The
reservations are released whenever the buffered component is drained,
however, the drainer - who takes over the metadata - might not perform
the accounting. The difficulty there is that the lifecycle of the
metadata objects are not super clear: it's possible that we're at the
end of the execution, and the whole plan is being drained - in such a
scenario, the metadata is pushed into the DistSQLReceiver and then
imported into the root txn and is discarded (modulo the increased root
txn footprint); it's also possible that the metadata from the drained
component gets buffered somewhere up the tree. But this commit adds the
accounting in most such places.

Addresses: #64906.
Addresses: #81451.

Release note: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed Jul 29, 2022
4 parents c8b70fa + 7237d0d + 96149c2 + 0866ddc commit 7a3cf68
Show file tree
Hide file tree
Showing 35 changed files with 1,497 additions and 223 deletions.
1,135 changes: 1,135 additions & 0 deletions docs/RFCS/20220706_user_defined_functions.md

Large diffs are not rendered by default.

45 changes: 40 additions & 5 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type InterceptFn func(event streamingccl.Event, spec SubscriptionToken)
// InterceptableStreamClient
type DialInterceptFn func(streamURL *url.URL) error

// HeartbeatInterceptFn is a function that will intercept calls to a client's
// Heartbeat.
type HeartbeatInterceptFn func(timestamp hlc.Timestamp)

// InterceptableStreamClient wraps a Client, and provides a method to register
// interceptor methods that are run on every streamed Event.
type InterceptableStreamClient interface {
Expand All @@ -104,8 +108,15 @@ type InterceptableStreamClient interface {
// from an InterceptableStreamClient.
RegisterInterception(fn InterceptFn)

// from an InterceptableStreamClient.
// RegisterDialInterception registers an interceptor to be called
// whenever Dial is called on the client.
RegisterDialInterception(fn DialInterceptFn)
// RegisterHeartbeatInterception registers an interceptor to be called
// whenever Heartbeat is called on the client.
RegisterHeartbeatInterception(fn HeartbeatInterceptFn)

// ClearInterceptors clears all registered interceptors on the client.
ClearInterceptors()
}

// randomStreamConfig specifies the variables that controls the rate and type of
Expand Down Expand Up @@ -207,9 +218,10 @@ type randomStreamClient struct {

// interceptors can be registered to peek at every event generated by this
// client and which partition spec it was sent to.
interceptors []func(streamingccl.Event, SubscriptionToken)
dialInterceptors []DialInterceptFn
tableID int
interceptors []InterceptFn
dialInterceptors []DialInterceptFn
heartbeatInterceptors []HeartbeatInterceptFn
tableID int
}
}

Expand Down Expand Up @@ -313,8 +325,16 @@ func (m *randomStreamClient) Create(

// Heartbeat implements the Client interface.
func (m *randomStreamClient) Heartbeat(
ctx context.Context, _ streaming.StreamID, _ hlc.Timestamp,
ctx context.Context, _ streaming.StreamID, ts hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
m.mu.Lock()
defer m.mu.Unlock()
for _, interceptor := range m.mu.heartbeatInterceptors {
if interceptor != nil {
interceptor(ts)
}
}

return streampb.StreamReplicationStatus{}, nil
}

Expand Down Expand Up @@ -583,3 +603,18 @@ func (m *randomStreamClient) RegisterDialInterception(fn DialInterceptFn) {
defer m.mu.Unlock()
m.mu.dialInterceptors = append(m.mu.dialInterceptors, fn)
}

// RegisterHeartbeatInterception implements the InterceptableStreamClient interface.
func (m *randomStreamClient) RegisterHeartbeatInterception(fn HeartbeatInterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.heartbeatInterceptors = append(m.mu.heartbeatInterceptors, fn)
}

// ClearInterceptors implements the InterceptableStreamClient interface.
func (m *randomStreamClient) ClearInterceptors() {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.interceptors = make([]InterceptFn, 0)
m.mu.heartbeatInterceptors = make([]HeartbeatInterceptFn, 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -72,6 +73,10 @@ type streamIngestionFrontier struct {
// stream alive.
heartbeatSender *heartbeatSender

// persistedHighWater stores the highwater mark of progress that is persisted
// in the job record.
persistedHighWater hlc.Timestamp

lastPartitionUpdate time.Time
partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress
}
Expand Down Expand Up @@ -112,14 +117,15 @@ func newStreamIngestionFrontierProcessor(
}
}
sf := &streamIngestionFrontier{
flowCtx: flowCtx,
spec: spec,
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: frontier,
partitionProgress: partitionProgress,
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics),
heartbeatSender: heartbeatSender,
flowCtx: flowCtx,
spec: spec,
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: frontier,
partitionProgress: partitionProgress,
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics),
heartbeatSender: heartbeatSender,
persistedHighWater: spec.HighWaterAtStart,
}
if err := sf.Init(
sf,
Expand Down Expand Up @@ -283,9 +289,8 @@ func (sf *streamIngestionFrontier) Next() (
break
}

var frontierChanged bool
var err error
if frontierChanged, err = sf.noteResolvedTimestamps(row[0]); err != nil {
if _, err = sf.noteResolvedTimestamps(row[0]); err != nil {
sf.MoveToDraining(err)
break
}
Expand All @@ -296,27 +301,14 @@ func (sf *streamIngestionFrontier) Next() (
}

// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
select {
case <-sf.Ctx.Done():
sf.MoveToDraining(sf.Ctx.Err())
return nil, sf.DrainHelper()
// Send the frontier update in the heartbeat to the source cluster.
case sf.heartbeatSender.frontierUpdates <- newResolvedTS:
if !frontierChanged {
break
}
progressBytes, err := protoutil.Marshal(&newResolvedTS)
if err != nil {
sf.MoveToDraining(err)
break
}
pushRow := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))),
}
if outRow := sf.ProcessRowHelper(pushRow); outRow != nil {
return outRow, nil
}
// Send the latest persisted highwater in the heartbeat to the source cluster
// as even with retries we will never request an earlier row than it, and
// the source cluster is free to clean up earlier data.
case sf.heartbeatSender.frontierUpdates <- sf.persistedHighWater:
// If heartbeatSender has error, it means remote has error, we want to
// stop the processor.
case <-sf.heartbeatSender.stoppedChan:
Expand Down Expand Up @@ -404,36 +396,57 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq {
return nil
}
f := sf.frontier
registry := sf.flowCtx.Cfg.JobRegistry
jobID := jobspb.JobID(sf.spec.JobID)
f := sf.frontier
job, err := registry.LoadJob(ctx, jobID)
if err != nil {
return err
}

frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0)
f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
frontierResolvedSpans = append(frontierResolvedSpans, jobspb.ResolvedSpan{Span: sp, Timestamp: ts})
return span.ContinueMatch
})

highWatermark := f.Frontier()
partitionProgress := sf.partitionProgress

sf.lastPartitionUpdate = timeutil.Now()

sf.metrics.JobProgressUpdates.Inc(1)
sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans)))
err := registry.UpdateJobWithTxn(ctx, jobID, nil, false, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}

// TODO(pbardea): Only update partitions that have changed.
return job.FractionProgressed(ctx, nil, /* txn */
func(ctx context.Context, details jobspb.ProgressDetails) float32 {
prog := details.(*jobspb.Progress_StreamIngest).StreamIngest
progress := md.Progress

prog.PartitionProgress = partitionProgress
prog.Checkpoint.ResolvedSpans = frontierResolvedSpans
// "FractionProgressed" isn't relevant on jobs that are streaming in changes.
return 0.0
},
)
// Keep the recorded highwater empty until some advancement has been made
if sf.highWaterAtStart.Less(highWatermark) {
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &highWatermark,
}
}

streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest
streamProgress.PartitionProgress = partitionProgress
streamProgress.Checkpoint.ResolvedSpans = frontierResolvedSpans

ju.UpdateProgress(progress)

// Reset RunStats.NumRuns to 1 since the stream ingestion has returned to
// a steady state. By resetting NumRuns,we avoid future job system level
// retries from having a large backoff because of past failures.
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}
return nil
})

if err == nil {
sf.metrics.JobProgressUpdates.Inc(1)
sf.persistedHighWater = f.Frontier()
sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans)))
}

return err
}
Loading

0 comments on commit 7a3cf68

Please sign in to comment.