forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshard_consumer.go
299 lines (260 loc) · 9.76 KB
/
shard_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright (c) 2016 Twitch Interactive
package kinsumer
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
const (
// getRecordsLimit is the max number of records in a single request. This effectively limits the
// total processing speed to getRecordsLimit*5/n where n is the number of parallel clients trying
// to consume from the same kinesis stream
getRecordsLimit = 10000 // 10,000 is the max according to the docs
// maxErrorRetries is how many times we will retry on a shard error
maxErrorRetries = 3
// errorSleepDuration is how long we sleep when an error happens, this is multiplied by the number
// of retries to give a minor backoff behavior
errorSleepDuration = 1 * time.Second
)
// getShardIterator gets a shard iterator after the last sequence number we read or at the start of the stream
func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID string, sequenceNumber string, iteratorStartTimestamp *time.Time) (string, error) {
shardIteratorType := kinesis.ShardIteratorTypeAfterSequenceNumber
// If we do not have a sequenceNumber yet we need to get a shardIterator
// from the horizon
ps := aws.String(sequenceNumber)
if sequenceNumber == "" && iteratorStartTimestamp != nil {
shardIteratorType = kinesis.ShardIteratorTypeAtTimestamp
ps = nil
} else if sequenceNumber == "" {
shardIteratorType = kinesis.ShardIteratorTypeTrimHorizon
ps = nil
} else if sequenceNumber == "LATEST" {
shardIteratorType = kinesis.ShardIteratorTypeLatest
ps = nil
}
resp, err := k.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
ShardIteratorType: &shardIteratorType,
StartingSequenceNumber: ps,
StreamName: aws.String(streamName),
Timestamp: iteratorStartTimestamp,
})
return aws.StringValue(resp.ShardIterator), err
}
// getRecords returns the next records and shard iterator from the given shard iterator
func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
params := &kinesis.GetRecordsInput{
Limit: aws.Int64(getRecordsLimit),
ShardIterator: aws.String(iterator),
}
output, err := k.GetRecords(params)
if err != nil {
return nil, "", 0, err
}
records = output.Records
nextIterator = aws.StringValue(output.NextShardIterator)
lag = time.Duration(aws.Int64Value(output.MillisBehindLatest)) * time.Millisecond
return records, nextIterator, lag, nil
}
// captureShard blocks until we capture the given shardID
func (k *Kinsumer) captureShard(shardID string) (*checkpointer, error) {
// Attempt to capture the shard in dynamo
for {
// Ask the checkpointer to capture the shard
checkpointer, err := capture(
shardID,
k.checkpointTableName,
k.dynamodb,
k.clientName,
k.clientID,
k.maxAgeForClientRecord,
k.config.stats)
if err != nil {
return nil, err
}
if checkpointer != nil {
return checkpointer, nil
}
// Throttle requests so that we don't hammer dynamo
select {
case <-k.stop:
// If we are told to stop consuming we should stop attempting to capture
return nil, nil
case <-time.After(k.config.throttleDelay):
}
}
}
// consume is a blocking call that captures then consumes the given shard in a loop.
// It is also responsible for writing out the checkpoint updates to dynamo.
// TODO: There are no tests for this file. Not sure how to even unit test this.
func (k *Kinsumer) consume(shardID string) {
defer k.waitGroup.Done()
// commitTicker is used to periodically commit, so that we don't hammer dynamo every time
// a shard wants to be check pointed
commitTicker := time.NewTicker(k.config.commitFrequency)
defer commitTicker.Stop()
// capture the checkpointer
checkpointer, err := k.captureShard(shardID)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "captureShard", err: err}
return
}
// if we failed to capture the checkpointer but there was no errors
// we must have stopped, so don't process this shard at all
if checkpointer == nil {
return
}
sequenceNumber := checkpointer.sequenceNumber
// finished means we have reached the end of the shard but haven't necessarily processed/committed everything
finished := false
// Make sure we release the shard when we are done.
defer func() {
innerErr := checkpointer.release()
if innerErr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.release", err: innerErr}
return
}
}()
// Get the starting shard iterator
iterator, err := getShardIterator(k.kinesis, k.streamName, shardID, sequenceNumber, k.config.iteratorStartTimestamp)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
// no throttle on the first request.
nextThrottle := time.After(0)
retryCount := 0
// lastSeqToCheckp is used to check if we have more data to checkpoint before we exit
var lastSeqToCheckp string
// lastSeqNum is used to check if a batch of data is the last in the stream
var lastSeqNum string
mainloop:
for {
// We have reached the end of the shard's data. Set Finished in dynamo and stop processing.
if iterator == "" && !finished {
checkpointer.finish(lastSeqNum)
finished = true
}
// Handle async actions, and throttle requests to keep kinesis happy
select {
case <-k.stop:
break mainloop
case <-commitTicker.C:
finishCommitted, err := checkpointer.commit(k.config.commitFrequency)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
return
}
if finishCommitted {
return
}
// Go back to waiting for a throttle/stop.
continue mainloop
case <-nextThrottle:
}
// Reset the nextThrottle
nextThrottle = time.After(k.config.throttleDelay)
if finished {
continue mainloop
}
// Get records from kinesis
records, next, lag, err := getRecords(k.kinesis, iterator)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
origErrStr := ""
if awsErr.OrigErr() != nil {
origErrStr = fmt.Sprintf("(%s) ", awsErr.OrigErr())
}
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)
// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)
if shouldRetry && retryCount < maxErrorRetries {
retryCount++
// casting retryCount here to time.Duration purely for the multiplication, there is
// no meaning to retryCount nanoseconds
time.Sleep(errorSleepDuration * time.Duration(retryCount))
continue mainloop
}
}
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getRecords", err: err}
return
}
retryCount = 0
// Put all the records we got onto the channel
k.config.stats.EventsFromKinesis(len(records), shardID, lag)
if len(records) > 0 {
retrievedAt := time.Now()
for _, record := range records {
RecordLoop:
// Loop until we stop or the record is consumed, checkpointing if necessary.
for {
select {
case <-commitTicker.C:
finishCommitted, err := checkpointer.commit(k.config.commitFrequency)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
return
}
if finishCommitted {
return
}
case <-k.stop:
break mainloop
case k.records <- &consumedRecord{
record: record,
checkpointer: checkpointer,
retrievedAt: retrievedAt,
}:
checkpointer.lastRecordPassed = time.Now() // Mark the time so we don't retain shards when we're too slow to do so
lastSeqToCheckp = aws.StringValue(record.SequenceNumber)
break RecordLoop
}
}
}
// Update the last sequence number we saw, in case we reached the end of the stream.
lastSeqNum = aws.StringValue(records[len(records)-1].SequenceNumber)
}
iterator = next
}
// Handle checkpointer updates which occur after a stop request comes in (whose originating records were before)
// commit first in case the checkpointer has been updates since the last commit.
checkpointer.commitIntervalCounter = 0 // Reset commitIntervalCounter to avoid retaining ownership if there's no new sequence number
_, err1 := checkpointer.commit(0 * time.Millisecond)
if err1 != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
return
}
// Resume commit loop for some time, ensuring that we don't retain ownership unless there's a new sequence number.
timeoutCounter := 0
// If we have committed the last sequence number returned to the user, just return. Otherwise, keep committing until we reach that state
if !checkpointer.dirty && checkpointer.sequenceNumber == lastSeqToCheckp {
return
}
for {
select {
case <-commitTicker.C:
timeoutCounter += int(k.config.commitFrequency)
checkpointer.commitIntervalCounter = 0
// passing 0 to commit ensures we no longer retain the shard.
finishCommitted, err := checkpointer.commit(0 * time.Millisecond)
if err != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
return
}
if finishCommitted {
return
}
// Once we have committed the last sequence Number we passed to the user, return.
if !checkpointer.dirty && checkpointer.sequenceNumber == lastSeqToCheckp {
return
}
if timeoutCounter >= int(k.maxAgeForClientRecord/2) {
return
}
}
}
}