Skip to content

Commit

Permalink
fix: Fix recursive lock in harvester blocking admin socket
Browse files Browse the repository at this point in the history
Fixes #394
  • Loading branch information
driskell committed Oct 15, 2022
1 parent 78f945c commit dc03b40
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions lc-lib/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Harvester struct {
lastCheck time.Time

// Cross routine access (lock required)
completion float64
orphaned bool
orphanTime time.Time
lastLineCount uint64
Expand Down Expand Up @@ -326,6 +327,14 @@ func (h *Harvester) takeMeasurements(isPipelineBlocked bool) error {
if h.offset > h.lastSize {
h.lastSize = h.offset
}
if h.lastOffset >= h.lastSize {
h.completion = 100
} else {
h.completion = float64(h.lastOffset) * 100 / float64(h.lastSize)
if h.completion >= 100 {
h.completion = 99
}
}
if h.lastStaleOffset > h.offset {
h.staleBytes = h.lastStaleOffset - h.offset
} else {
Expand Down Expand Up @@ -371,7 +380,9 @@ func (h *Harvester) statCheck(info os.FileInfo, isPipelineBlocked bool) (err err
// This scenario is only reached if a file is held open by a blocked/slow pipeline
if h.streamConfig.HoldTime > 0 && h.isOrphaned() {
if age := time.Since(h.orphanTime); age > h.streamConfig.HoldTime {
completion := h.calculateCompletion()
h.mutex.RLock()
completion := h.completion
h.mutex.RUnlock()
if completion >= 100 {
readAge := time.Since(h.lastReadTime)
log.Infof("Stopping harvest of %s; file was deleted %v ago (\"hold time\"); all data was processed; last change was %v ago", h.path, age-(age%time.Second), readAge-(readAge%time.Second))
Expand Down Expand Up @@ -513,18 +524,6 @@ func (h *Harvester) SetOrphaned() {
h.mutex.Unlock()
}

// calculateCompletion returns completion percentage of the file
func (h *Harvester) calculateCompletion() float64 {
defer func() {
h.mutex.RUnlock()
}()
h.mutex.RLock()
if h.lastOffset >= h.lastSize {
return 100
}
return float64(h.lastOffset) * 100 / float64(h.lastSize)
}

// isOrphaned returns if orphaned
func (h *Harvester) isOrphaned() bool {
h.mutex.RLock()
Expand All @@ -550,7 +549,7 @@ func (h *Harvester) APIEncodable() api.Encodable {
apiEncodable.SetEntry("orphaned", api.Number(0))
}

apiEncodable.SetEntry("completion", api.Float(h.calculateCompletion()))
apiEncodable.SetEntry("completion", api.Float(h.completion))
if h.lastEOFOff == nil {
apiEncodable.SetEntry("last_eof_offset", api.Null)
} else {
Expand Down

0 comments on commit dc03b40

Please sign in to comment.