Skip to content

Commit

Permalink
testing(bigquery/storage/managedwriter): add basic CDC example (#7516)
Browse files Browse the repository at this point in the history
  • Loading branch information
shollyman authored Mar 9, 2023
1 parent b2c40c3 commit 97337e9
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 14 deletions.
143 changes: 143 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SimpleCDC", func(t *testing.T) {
t.Parallel()
testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
})
t.Run("Instrumentation", func(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
Expand All @@ -255,6 +259,7 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})

})
}

Expand Down Expand Up @@ -504,6 +509,144 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(int64(len(testSimpleData))))
}

// testSimpleCDC demonstrates basic Change Data Capture (CDC) functionality. We add an initial set of
// rows to a table, then use CDC to apply updates.
func testSimpleCDC(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())

if err := testTable.Create(ctx, &bigquery.TableMetadata{
Schema: testdata.ExampleEmployeeSchema,
Clustering: &bigquery.Clustering{
Fields: []string{"id"},
},
}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

// Mark the primary key using an ALTER TABLE DDL.
tableIdentifier, _ := testTable.Identifier(bigquery.StandardSQLID)
sql := fmt.Sprintf("ALTER TABLE %s ADD PRIMARY KEY(id) NOT ENFORCED;", tableIdentifier)
if _, err := bqClient.Query(sql).Read(ctx); err != nil {
t.Fatalf("failed ALTER TABLE: %v", err)
}

m := &testdata.ExampleEmployeeCDC{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}

// Setup an initial writer for sending initial inserts.
writer, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
defer writer.Close()
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

initialEmployees := []*testdata.ExampleEmployeeCDC{
{
Id: proto.Int64(1),
Username: proto.String("alice"),
GivenName: proto.String("Alice CEO"),
Departments: []string{"product", "support", "internal"},
Salary: proto.Int64(1),
XCHANGE_TYPE: proto.String("INSERT"),
},
{
Id: proto.Int64(2),
Username: proto.String("bob"),
GivenName: proto.String("Bob Bobberson"),
Departments: []string{"research"},
Salary: proto.Int64(100000),
XCHANGE_TYPE: proto.String("INSERT"),
},
{
Id: proto.Int64(3),
Username: proto.String("clarice"),
GivenName: proto.String("Clarice Clearwater"),
Departments: []string{"product"},
Salary: proto.Int64(100001),
XCHANGE_TYPE: proto.String("INSERT"),
},
}

// First append inserts all the initial employees.
data := make([][]byte, len(initialEmployees))
for k, mesg := range initialEmployees {
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("failed to marshal record %d: %v", k, err)
}
data[k] = b
}
result, err := writer.AppendRows(ctx, data)
if err != nil {
t.Errorf("initial insert failed (%s): %v", writer.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Errorf("result error for initial insert (%s): %v", writer.StreamName(), err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "initial inserts",
withExactRowCount(int64(len(initialEmployees))))

// Create a second writer for applying modifications.
updateWriter, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
defer updateWriter.Close()

// Change bob via an UPSERT CDC
newBob := proto.Clone(initialEmployees[1]).(*testdata.ExampleEmployeeCDC)
newBob.Salary = proto.Int64(105000)
newBob.Departments = []string{"research", "product"}
newBob.XCHANGE_TYPE = proto.String("UPSERT")
b, err := proto.Marshal(newBob)
if err != nil {
t.Fatalf("failed to marshal new bob: %v", err)
}
result, err = updateWriter.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("bob modification failed (%s): %v", updateWriter.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Fatalf("result error for bob modification (%s): %v", updateWriter.StreamName(), err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after bob modification",
withExactRowCount(int64(len(initialEmployees))),
withDistinctValues("id", int64(len(initialEmployees))))

// remote clarice via DELETE CDC
removeClarice := &testdata.ExampleEmployeeCDC{
Id: proto.Int64(3),
XCHANGE_TYPE: proto.String("DELETE"),
}
b, err = proto.Marshal(removeClarice)
if err != nil {
t.Fatalf("failed to marshal clarice removal: %v", err)
}
result, err = updateWriter.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("clarice removal failed (%s): %v", updateWriter.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Fatalf("result error for clarice removal (%s): %v", updateWriter.StreamName(), err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after clarice removal",
withExactRowCount(int64(len(initialEmployees))-1))
}

// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
Expand Down
24 changes: 24 additions & 0 deletions bigquery/storage/managedwriter/testdata/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,28 @@ var (
Type: bigquery.StringFieldType,
},
}

ExampleEmployeeSchema bigquery.Schema = bigquery.Schema{
{
Name: "id",
Type: bigquery.IntegerFieldType,
},
{
Name: "username",
Type: bigquery.StringFieldType,
},
{
Name: "given_name",
Type: bigquery.StringFieldType,
},
{
Name: "departments",
Type: bigquery.StringFieldType,
Repeated: true,
},
{
Name: "salary",
Type: bigquery.IntegerFieldType,
},
}
)
140 changes: 126 additions & 14 deletions bigquery/storage/managedwriter/testdata/validation_proto2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,13 @@ message ValidationP2ColumnAnnotations {
optional string first = 1;
optional string second = 2 [(google.cloud.bigquery.storage.v1.column_name) = "特別コラム"];
optional string third = 3 [(google.cloud.bigquery.storage.v1.column_name) = "second"];
}

message ExampleEmployeeCDC {
optional int64 id = 1; // Primary Key (not enforced)
optional string username = 2;
optional string given_name = 3;
repeated string departments = 4;
optional int64 salary = 5;
optional string _CHANGE_TYPE = 999; // We give it a high tag number to avoid conflicts with normal evolution of the record.
}

0 comments on commit 97337e9

Please sign in to comment.