Skip to content

Commit

Permalink
Change the filter API to avoid negative lookahead (closes #188)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Oct 14, 2022
1 parent 8a7632c commit 358637f
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 137 deletions.
2 changes: 1 addition & 1 deletion config/test-fixtures/transform-mixed-filtered.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ transform {
use "spEnrichedFilter" {
atomic_field = "app_id"
regex = "wrong"
regex_timeout = 10
filter_action = "keep"
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1
github.com/dlclark/regexp2 v1.7.0
github.com/dop251/goja v0.0.0-20220722151623-4765a9872229
github.com/hashicorp/hcl/v2 v2.13.0
github.com/json-iterator/go v1.1.12
Expand All @@ -72,6 +71,7 @@ require (
github.com/agext/levenshtein v1.2.3 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/devigned/tab v0.1.1 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion pkg/target/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (st *StdoutTarget) Close() {}
// per message for this target
//
// Note: Technically no limit but we are putting in a limit of 10 MiB here
// to avoid trying to print out huge payloads
//
// to avoid trying to print out huge payloads
func (st *StdoutTarget) MaximumAllowedMessageSizeBytes() int {
return 10485760
}
Expand Down
39 changes: 20 additions & 19 deletions pkg/transform/snowplow_enriched_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/dlclark/regexp2"

"github.com/pkg/errors"
"github.com/snowplow/snowplow-golang-analytics-sdk/analytics"

"github.com/snowplow-devops/stream-replicator/pkg/models"
)

func evaluateSpEnrichedFilter(re *regexp2.Regexp, valuesFound []interface{}) bool {
func evaluateSpEnrichedFilter(re *regexp.Regexp, valuesFound []interface{}) bool {
// if valuesFound is nil, we found no value.
// Because negative matches are a thing, we still want to match against an empty string
if valuesFound == nil {
Expand All @@ -32,22 +29,26 @@ func evaluateSpEnrichedFilter(re *regexp2.Regexp, valuesFound []interface{}) boo
v = "" // because nil gets cast to `<nil>`
}

if ok, _ := re.MatchString(fmt.Sprintf("%v", v)); ok {
if ok := re.MatchString(fmt.Sprintf("%v", v)); ok {
return true
}
}
return false
}

func createSpEnrichedFilterFunction(regex string, regexTimeout int, getFunc valueGetter) (TransformationFunction, error) {
if regexTimeout == 0 {
// default timeout for regex is 10 seconds
regexTimeout = 10
func createSpEnrichedFilterFunction(regex string, getFunc valueGetter, filterAction string) (TransformationFunction, error) {
var dropIfMatched bool
switch filterAction {
case "drop":
dropIfMatched = true
case "keep":
dropIfMatched = false
default:
return nil, fmt.Errorf("Invalid filter action found: %s - must be 'keep' or 'drop'", filterAction)
}

// regexToMatch is what we use to evaluate the actual filter, once we have the value.
regexToMatch, err := regexp2.Compile(regex, 0)
regexToMatch.MatchTimeout = time.Duration(regexTimeout) * time.Second
regexToMatch, err := regexp.Compile(regex)
if err != nil {
return nil, errors.Wrap(err, `error compiling regex for filter`)
}
Expand All @@ -69,10 +70,10 @@ func createSpEnrichedFilterFunction(regex string, regexTimeout int, getFunc valu
}

// evaluate whether the found value passes the filter, determining if the message should be kept
shouldKeepMessage := evaluateSpEnrichedFilter(regexToMatch, valueFound)
matchesRegex := evaluateSpEnrichedFilter(regexToMatch, valueFound)

// if message is not to be kept, return it as a filtered message to be acked in the main function
if !shouldKeepMessage {
if (!matchesRegex && !dropIfMatched) || (matchesRegex && dropIfMatched) {
return nil, message, nil, nil
}

Expand Down Expand Up @@ -103,12 +104,12 @@ func makeBaseValueGetter(field string) valueGetter {
}

// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
func NewSpEnrichedFilterFunction(field, regex string, regexTimeout int) (TransformationFunction, error) {
func NewSpEnrichedFilterFunction(field, regex string, filterAction string) (TransformationFunction, error) {

// getBaseValueForMatch is responsible for retrieving data from the message for base fields
getBaseValueForMatch := makeBaseValueGetter(field)

return createSpEnrichedFilterFunction(regex, regexTimeout, getBaseValueForMatch)
return createSpEnrichedFilterFunction(regex, getBaseValueForMatch, filterAction)
}

// makeContextValueGetter creates a valueGetter for context data
Expand Down Expand Up @@ -137,7 +138,7 @@ func makeContextValueGetter(name string, path []interface{}) valueGetter {
}

// NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context
func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {
func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex string, filterAction string) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
Expand All @@ -147,7 +148,7 @@ func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex stri
// getContextValuesForMatch is responsible for retrieving data from the message for context fields
getContextValuesForMatch := makeContextValueGetter(contextFullName, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getContextValuesForMatch)
return createSpEnrichedFilterFunction(regex, getContextValuesForMatch, filterAction)
}

// makeUnstructValueGetter creates a valueGetter for unstruct data.
Expand Down Expand Up @@ -185,7 +186,7 @@ func makeUnstructValueGetter(eventName string, versionRegex *regexp.Regexp, path
}

// NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event
func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMatch, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {
func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMatch, pathToField, regex string, filterAction string) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
Expand All @@ -201,7 +202,7 @@ func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMa
// It also checks that the correct event name and version are provided, and returns nil if not.
getUnstructValuesForMatch := makeUnstructValueGetter(eventNameToMatch, versionRegex, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getUnstructValuesForMatch)
return createSpEnrichedFilterFunction(regex, getUnstructValuesForMatch, filterAction)
}

// parsePathToArguments parses a string path to custom data (eg. `test1.test2[0].test3`)
Expand Down
Loading

0 comments on commit 358637f

Please sign in to comment.