Skip to content

Commit

Permalink
chore: store anonymous ids in userID hll to optimise storage
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Aug 6, 2024
1 parent 29bccb9 commit e14e42b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 57 deletions.
6 changes: 3 additions & 3 deletions enterprise/trackedusers/users_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], userID, idTypeUserID)
}

if anonymousID != "" {
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], anonymousID, idTypeAnonymousID)
if anonymousID != "" && userID != anonymousID {
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], anonymousID, idTypeUserID)
}

if userID != "" && anonymousID != "" {
if userID != "" && anonymousID != "" && userID != anonymousID {
combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID)
}
Expand Down
99 changes: 48 additions & 51 deletions enterprise/trackedusers/users_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (
userIDHll.AddRaw(murmur3.Sum64WithSeed([]byte(uuid.NewString()), murmurSeed))
}
for i := 0; i < noOfAnnID; i++ {
annIDHll.AddRaw(murmur3.Sum64WithSeed([]byte(uuid.NewString()), murmurSeed))
userIDHll.AddRaw(murmur3.Sum64WithSeed([]byte(uuid.NewString()), murmurSeed))
}
for i := 0; i < noOfIdentifiedAnnID; i++ {
identifiedAnnIDHll.AddRaw(murmur3.Sum64WithSeed([]byte(uuid.NewString()), murmurSeed))
Expand Down Expand Up @@ -113,19 +113,15 @@ func TestUniqueUsersReporter(t *testing.T) {
WorkspaceID: sampleWorkspaceID,
SourceID: sampleSourceID,
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand Down Expand Up @@ -158,16 +154,12 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand All @@ -187,14 +179,10 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand Down Expand Up @@ -226,16 +214,12 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand All @@ -256,14 +240,10 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand Down Expand Up @@ -295,16 +275,12 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand All @@ -324,14 +300,10 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand Down Expand Up @@ -359,26 +331,59 @@ func TestUniqueUsersReporter(t *testing.T) {
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: nil,
},
{
WorkspaceID: sampleWorkspaceID2,
SourceID: sampleSourceID,
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: nil,
},
},
},
{
name: "happy case - same user and anonymous id",
jobs: []*jobsdb.JobT{
prepareJob(sampleSourceID, "anon_id", "anon_id", sampleWorkspaceID),
prepareJob(sampleSourceID, "user_id", "user_id", sampleWorkspaceID),
prepareJob(sampleSourceID, "user", "", sampleWorkspaceID),
prepareJob(sampleSourceID, "ann", "ann", sampleWorkspaceID2),
},
trackedUsers: []*UsersReport{
{
WorkspaceID: sampleWorkspaceID,
SourceID: sampleSourceID,
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: nil,
},
{
WorkspaceID: sampleWorkspaceID2,
SourceID: sampleSourceID,
UserIDHll: nil,
AnonymousIDHll: func() *hll.Hll {
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: nil,
},
},
Expand Down Expand Up @@ -407,16 +412,12 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand All @@ -436,14 +437,10 @@ func TestUniqueUsersReporter(t *testing.T) {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: nil,
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ func TestTrackedUsersReporting(t *testing.T) {
{anonymousID: "anon-2"},
}, "identify", writeKey, url)
require.NoError(t, err)
err = sendAliasEvent("user-1", "user-2", writeKey, url)
require.NoError(t, err)
eventsCount++

require.Eventuallyf(t, func() bool {
return tc.webhook.RequestsCount() == eventsCount
Expand All @@ -244,9 +247,9 @@ func TestTrackedUsersReporting(t *testing.T) {
}, 1*time.Minute, 5*time.Second, "data not reported to reporting service")

cardinalityMap := tc.reportingServer.getCardinalityFromReportingServer()
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].userIDCount)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].anonIDCount)
require.Equal(t, 2, cardinalityMap[workspaceID][sourceID].identifiedUsersCount)
require.Equal(t, 4, cardinalityMap[workspaceID][sourceID].userIDCount)
require.Equal(t, 0, cardinalityMap[workspaceID][sourceID].anonIDCount)
require.Equal(t, 3, cardinalityMap[workspaceID][sourceID].identifiedUsersCount)
cancel()
require.NoError(t, wg.Wait())
}
Expand Down Expand Up @@ -371,6 +374,52 @@ func runRudderServer(
return
}

// nolint: bodyclose
func sendAliasEvent(userID, previousID, writeKey,
url string) error {
payload := []byte(fmt.Sprintf(`
{
"batch": [
{
"userId": %[1]q,
"type": "alias",
"previousId": %[2]q,
"anonymousId": %[1]q,
"context": {
"traits": {
"trait1": "new-val"
},
"ip": "14.5.67.21",
"library": {
"name": "http"
}
},
"timestamp": "2020-02-02T00:23:09.544Z"
}
]
}`,
userID,
previousID,
))
req, err := http.NewRequest(http.MethodPost, url+"/v1/batch", bytes.NewReader(payload))
if err != nil {
return err
}
req.SetBasicAuth(writeKey, "password")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b))
}
kithttputil.CloseResponse(resp)
return nil
}

// nolint: bodyclose
func sendEvents(
identifiers []userIdentifier,
Expand Down

0 comments on commit e14e42b

Please sign in to comment.