-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathflushing_batch.go
67 lines (58 loc) · 1.71 KB
/
flushing_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package rupture
import (
"github.com/blevesearch/bleve/v2"
)
// FlushingBatch is a batch of operations that automatically flushes to the
// underlying index once it reaches a certain size.
type FlushingBatch interface {
// Index adds the specified index operation batch, possibly triggering a
// flush.
Index(id string, data interface{}) error
// Remove adds the specified delete operation to the batch, possibly
// triggering a flush.
Delete(id string) error
// Flush flushes the batch's contents.
Flush() error
}
type singleIndexFlushingBatch struct {
maxBatchSize int
batch *bleve.Batch
index bleve.Index
}
func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch {
return &singleIndexFlushingBatch{
maxBatchSize: maxBatchSize,
batch: index.NewBatch(),
index: index,
}
}
// NewFlushingBatch creates a new flushing batch for the specified index. Once
// the number of operations in the batch reaches the specified limit, the batch
// automatically flushes its operations to the index.
func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch {
return newFlushingBatch(index, maxBatchSize)
}
func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error {
if err := b.batch.Index(id, data); err != nil {
return err
}
return b.flushIfFull()
}
func (b *singleIndexFlushingBatch) Delete(id string) error {
b.batch.Delete(id)
return b.flushIfFull()
}
func (b *singleIndexFlushingBatch) flushIfFull() error {
if b.batch.Size() < b.maxBatchSize {
return nil
}
return b.Flush()
}
func (b *singleIndexFlushingBatch) Flush() error {
err := b.index.Batch(b.batch)
if err != nil {
return err
}
b.batch.Reset()
return nil
}