Skip to content

Commit

Permalink
feat: Log Carver now appends direct source of a received event to age…
Browse files Browse the repository at this point in the history
…nt[source]
  • Loading branch information
driskell committed Apr 1, 2023
1 parent 022d836 commit 68b88c7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 3 deletions.
19 changes: 16 additions & 3 deletions lc-lib/receiver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,20 @@ ReceiverLoop:
r.connectionLock.Lock()
connection := eventImpl.Context().Value(transports.ContextConnection)
receiver := eventImpl.Context().Value(transports.ContextReceiver).(transports.Receiver)
connectionStatus := r.connectionStatus[connection]
// Schedule partial ack if this is first set of events
if len(r.connectionStatus[connection].progress) == 0 {
if len(connectionStatus.progress) == 0 {
r.scheduler.Set(connection, 5*time.Second)
}
r.connectionStatus[connection].progress = append(r.connectionStatus[connection].progress, &poolEventProgress{event: eventImpl, sequence: 0})
connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0})
r.connectionLock.Unlock()
// Build the events with our acknowledger and submit the bundle
var events = make([]*event.Event, len(eventImpl.Events()))
for idx, item := range eventImpl.Events() {
ctx := context.WithValue(eventImpl.Context(), poolContextEventPosition, &poolEventPosition{nonce: eventImpl.Nonce(), sequence: uint32(idx + 1)})
events[idx] = event.NewEvent(ctx, r, item)
item := event.NewEvent(ctx, r, item)
r.addEventSource(item, connectionStatus)
events[idx] = item
}
spool = append(spool, events)
spoolChan = r.output
Expand Down Expand Up @@ -395,3 +398,13 @@ func (r *Pool) shutdown() {
receiver.Shutdown()
}
}

func (r *Pool) addEventSource(item *event.Event, connectionStatus *poolConnectionStatus) {
source, ok := item.MustResolve("agent[source]", nil).([]string)
if !ok {
// Overwrite invalid / missing map
source = make([]string, 0, 1)
}
source = append(source, connectionStatus.label)
item.MustResolve("agent[source]", source)
}
4 changes: 4 additions & 0 deletions lc-lib/receiver/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package receiver

import (
"fmt"

"github.com/driskell/log-courier/lc-lib/admin/api"
"github.com/driskell/log-courier/lc-lib/transports"
)
Expand All @@ -36,6 +38,7 @@ type poolConnectionStatus struct {
listener string
remote string
desc string
label string
progress []*poolEventProgress
lines int64

Expand All @@ -48,6 +51,7 @@ func newPoolConnectionStatus(p *Pool, listener string, remote string, desc strin
listener: listener,
remote: remote,
desc: desc,
label: fmt.Sprintf("%s [%s]", remote, desc),
progress: make([]*poolEventProgress, 0),
}
}
Expand Down
1 change: 1 addition & 0 deletions testing/log-carver-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ general:
log syslog: false
log stdout: true
log level: debug
debug events: true
admin:
enabled: true
listen address: tcp:127.0.0.1:12341
Expand Down
1 change: 1 addition & 0 deletions testing/log-carver-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ general:
log syslog: false
log stdout: true
log level: debug
debug events: true
admin:
enabled: true
listen address: tcp:127.0.0.1:12341
Expand Down

0 comments on commit 68b88c7

Please sign in to comment.