Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: byte index to read singularEvent, propagate payload func to processor pipeline #5314

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 83 additions & 51 deletions processor/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package processor

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -27,7 +28,6 @@
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/ro"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/metric"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
Expand Down Expand Up @@ -1109,7 +1109,7 @@
func (proc *Handle) getTransformerEvents(
response transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc,
destination *backendconfig.DestinationT,
connection backendconfig.Connection,
inPU, pu string,
Expand All @@ -1130,8 +1130,8 @@
userTransformedEvent := &response.Events[i]
messages := lo.Map(
userTransformedEvent.Metadata.GetMessagesIDs(),
func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
func(msgID string, _ int) func() json.RawMessage {
return eventsByMessageID[msgID].PayloadFunc
},
)

Expand All @@ -1144,12 +1144,7 @@
return []byte(`{}`)
}

sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getDestTransformerEvents] Failed to unmarshal first element in transformed events: %v`, err)
sampleEvent = []byte(`{}`)
}
return sampleEvent
return message()

Check warning on line 1147 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1147

Added line #L1147 was not covered by tests
},
nil)
}
Expand Down Expand Up @@ -1208,7 +1203,7 @@
event *transformer.TransformerResponse,
status, stage string,
payload func() json.RawMessage,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc,
) {
if !proc.isReportingEnabled() {
return
Expand Down Expand Up @@ -1325,7 +1320,7 @@
func (proc *Handle) getNonSuccessfulMetrics(
response transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc,
inPU, pu string,
) *NonSuccessfulTransformationMetrics {
m := &NonSuccessfulTransformationMetrics{}
Expand Down Expand Up @@ -1387,7 +1382,7 @@
transformerResponses []transformer.TransformerResponse,
state string,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc,
inPU, pu string,
) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) {
metrics := make([]*types.PUReportedMetric, 0)
Expand All @@ -1403,15 +1398,18 @@
failedEvent := &transformerResponses[i]
messages := lo.Map(
failedEvent.Metadata.GetMessagesIDs(),
func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
func(msgID string, _ int) func() json.RawMessage {
return eventsByMessageID[msgID].PayloadFunc
},
)
payload, err := jsonfast.Marshal(messages)
if err != nil {
proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal list of failed events: %v`, err)
continue
payload := []byte(`[`)
for i := range messages {
payload = append(payload, messages[i]()...)
if i != len(messages)-1 {
payload = append(payload, []byte(`,`)...)
}
}
payload = append(payload, []byte(`]`)...)

for _, message := range messages {
proc.updateMetricMaps(
Expand All @@ -1426,12 +1424,7 @@
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
}
sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err)
sampleEvent = []byte(`{}`)
}
return sampleEvent
return message()
},
eventsByMessageID)
}
Expand Down Expand Up @@ -1609,7 +1602,7 @@
totalEvents int
marshalStart time.Time
groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent
eventsByMessageID map[string]types.SingularEventWithReceivedAt
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc
procErrorJobs []*jobsdb.JobT
jobIDToSpecificDestMapOnly map[int64]string
groupedEvents map[string][]transformer.TransformerEvent
Expand All @@ -1634,7 +1627,7 @@
var statusList []*jobsdb.JobStatusT
groupedEvents := make(map[string][]transformer.TransformerEvent)
groupedEventsBySourceId := make(map[SourceIDT][]transformer.TransformerEvent)
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt)
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAtWithPayloadFunc)
var procErrorJobs []*jobsdb.JobT
eventSchemaJobs := make([]*jobsdb.JobT, 0)
archivalJobs := make([]*jobsdb.JobT, 0)
Expand Down Expand Up @@ -1761,12 +1754,8 @@

for _, singularEvent := range gatewayBatchEvent.Batch {
messageId := stringify.Any(singularEvent["messageId"])
payloadFunc := ro.Memoize(func() json.RawMessage {
payloadBytes, err := jsonfast.Marshal(singularEvent)
if err != nil {
return nil
}
return payloadBytes
payloadFunc := sync.OnceValue(func() json.RawMessage {
return getEventFromBatch(batchEvent.EventPayload, singularEvent)
})
dedupKey := dedupTypes.KeyValue{
Key: fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId),
Expand Down Expand Up @@ -1797,11 +1786,15 @@

var keyMap map[dedupTypes.KeyValue]bool
var err error
dedupStart := time.Now()
if proc.config.enableDedup {
keyMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
if err != nil {
return nil, err
}
proc.statsFactory.NewTaggedStat("processor.dedup_get_batch", stats.TimerType, stats.Tags{
"method": "v2",
}).Since(dedupStart)
}
for _, event := range jobsWithMetaData {
sourceId := event.eventParams.SourceId
Expand All @@ -1822,10 +1815,6 @@

proc.updateSourceEventStatsDetailed(event.singularEvent, sourceId)
totalEvents++
eventsByMessageID[event.messageID] = types.SingularEventWithReceivedAt{
SingularEvent: event.singularEvent,
ReceivedAt: event.recievedAt,
}

commonMetadataFromSingularEvent := proc.makeCommonMetadataFromSingularEvent(
event.singularEvent,
Expand Down Expand Up @@ -1908,6 +1897,13 @@
if !proc.isDestinationAvailable(event.singularEvent, sourceId, event.eventParams.DestinationID) {
continue
}
eventsByMessageID[event.messageID] = types.SingularEventWithReceivedAtWithPayloadFunc{
SingularEventWithReceivedAt: types.SingularEventWithReceivedAt{
SingularEvent: event.singularEvent,
ReceivedAt: event.recievedAt,
},
PayloadFunc: event.payloadFunc,
}

if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok {
groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0)
Expand Down Expand Up @@ -2206,7 +2202,7 @@
var statusList []*jobsdb.JobStatusT
groupedEvents := make(map[string][]transformer.TransformerEvent)
groupedEventsBySourceId := make(map[SourceIDT][]transformer.TransformerEvent)
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt)
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAtWithPayloadFunc)
var procErrorJobs []*jobsdb.JobT
eventSchemaJobs := make([]*jobsdb.JobT, 0)
archivalJobs := make([]*jobsdb.JobT, 0)
Expand Down Expand Up @@ -2320,12 +2316,8 @@
for _, singularEvent := range gatewayBatchEvent.Batch {
messageId := stringify.Any(singularEvent["messageId"])

payloadFunc := ro.Memoize(func() json.RawMessage {
payloadBytes, err := jsonfast.Marshal(singularEvent)
if err != nil {
return nil
}
return payloadBytes
payloadFunc := sync.OnceValue(func() json.RawMessage {
return getEventFromBatch(batchEvent.EventPayload, singularEvent)
})

if proc.config.enableDedup {
Expand All @@ -2346,10 +2338,6 @@

// We count this as one, not destination specific ones
totalEvents++
eventsByMessageID[messageId] = types.SingularEventWithReceivedAt{
SingularEvent: singularEvent,
ReceivedAt: receivedAt,
}

commonMetadataFromSingularEvent := proc.makeCommonMetadataFromSingularEvent(
singularEvent,
Expand Down Expand Up @@ -2435,6 +2423,13 @@
if !proc.isDestinationAvailable(singularEvent, sourceID, destinationID) {
continue
}
eventsByMessageID[messageId] = types.SingularEventWithReceivedAtWithPayloadFunc{
SingularEventWithReceivedAt: types.SingularEventWithReceivedAt{
SingularEvent: singularEvent,
ReceivedAt: receivedAt,
},
PayloadFunc: payloadFunc,
}

if _, ok := groupedEventsBySourceId[SourceIDT(sourceID)]; !ok {
groupedEventsBySourceId[SourceIDT(sourceID)] = make([]transformer.TransformerEvent, 0)
Expand Down Expand Up @@ -2508,7 +2503,7 @@
groupedEvents map[string][]transformer.TransformerEvent

trackingPlanEnabledMap map[SourceIDT]bool
eventsByMessageID map[string]types.SingularEventWithReceivedAt
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}
reportMetrics []*types.PUReportedMetric
statusList []*jobsdb.JobStatusT
Expand Down Expand Up @@ -2921,7 +2916,7 @@

// helpers
trackingPlanEnabledMap map[SourceIDT]bool,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
eventsByMessageID map[string]types.SingularEventWithReceivedAtWithPayloadFunc,
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{},
) transformSrcDestOutput {
defer proc.stats.pipeProcessing(partition).Since(time.Now())
Expand Down Expand Up @@ -3036,7 +3031,26 @@
proc.logger.Debug("Custom Transform output size", len(eventsToTransform))
trace.Logf(ctx, "UserTransform", "User Transform output size: %d", len(eventsToTransform))

proc.transDebugger.UploadTransformationStatus(&transformationdebugger.TransformationStatusT{SourceID: sourceID, DestID: destID, Destination: destination, UserTransformedEvents: eventsToTransform, EventsByMessageID: eventsByMessageID, FailedEvents: response.FailedEvents, UniqueMessageIds: uniqueMessageIdsBySrcDestKey[srcAndDestKey]})
proc.transDebugger.UploadTransformationStatus(
&transformationdebugger.TransformationStatusT{
SourceID: sourceID,
DestID: destID,
Destination: destination,
UserTransformedEvents: eventsToTransform,
EventsByMessageID: lo.MapEntries(
eventsByMessageID,
func(
k string, v types.SingularEventWithReceivedAtWithPayloadFunc,
) (
string, types.SingularEventWithReceivedAt,
) {
return k, v.SingularEventWithReceivedAt
},
),
FailedEvents: response.FailedEvents,
UniqueMessageIds: uniqueMessageIdsBySrcDestKey[srcAndDestKey],
},
)

// REPORTING - START
if proc.isReportingEnabled() {
Expand Down Expand Up @@ -3735,3 +3749,21 @@
stats.Default.NewTaggedStat("jobsdb_query_timeout", stats.CountType, stats.Tags{"attempt": strconv.Itoa(attempt), "module": "pileup"}).Increment()
})
}

func getEventFromBatch(batch []byte, singularEvent types.SingularEventT) json.RawMessage {
end := bytes.LastIndex(batch, []byte(`}]`))
start := bytes.Index(batch, []byte(`[{`))
if end == -1 || start == -1 {
return getPayloadOld(singularEvent)
}

Check warning on line 3758 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L3757-L3758

Added lines #L3757 - L3758 were not covered by tests
res := batch[start+1 : end+1]
return res
}

func getPayloadOld(event types.SingularEventT) []byte {
payloadBytes, err := jsonfast.Marshal(event)
if err != nil {
return nil
}
return payloadBytes

Check warning on line 3768 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L3763-L3768

Added lines #L3763 - L3768 were not covered by tests
}
Loading
Loading