Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add string and byte serializer #34

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ import (
"github.com/linkedin/goavro/v2"
)

func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
key := []byte(data.(string))
if schema != "" {
key = ToAvro(data.(string), schema)
}

byteData, err := addMagicByteAndSchemaIdPrefix(configuration, key, topic, keyOrValue, schema)
if err != nil {
return nil, err
}

return byteData, nil
}

func ToAvro(value string, schema string) []byte {
codec, err := goavro.NewCodec(schema)
if err != nil {
Expand All @@ -25,6 +39,16 @@ func ToAvro(value string, schema string) []byte {
return binary
}

func DeserializeAvro(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
dataWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, data, keyOrValue)

if schema != "" {
return FromAvro(dataWithoutPrefix, schema)
}

return dataWithoutPrefix
}

func FromAvro(message []byte, schema string) interface{} {
codec, err := goavro.NewCodec(schema)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions bytearray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kafka

import "errors"

func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
switch data.(type) {
case []interface{}:
bArray := data.([]interface{})
arr := make([]byte, len(bArray))
for i, u := range bArray {
arr[i] = byte(u.(int64))
}
return arr, nil
default:
return nil, errors.New("Invalid data type provided for byte array serializer (requires []byte)")
}
}

func DeserializeByteArray(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
return data
}
15 changes: 5 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func ConsumeInternal(
limit = 1
}

keyDeserializer := GetDeserializer(configuration.Consumer.KeyDeserializer, keySchema)
valueDeserializer := GetDeserializer(configuration.Consumer.ValueDeserializer, valueSchema)

messages := make([]map[string]interface{}, 0)

for i := int64(0); i < limit; i++ {
Expand All @@ -112,19 +115,11 @@ func ConsumeInternal(

message := make(map[string]interface{})
if len(msg.Key) > 0 {
keyWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Key, "key")
message["key"] = string(keyWithoutPrefix)
if keySchema != "" {
message["key"] = FromAvro(keyWithoutPrefix, keySchema)
}
message["key"] = keyDeserializer(configuration, msg.Key, "key", keySchema)
}

if len(msg.Value) > 0 {
valueWithoutPrefix := removeMagicByteAndSchemaIdPrefix(configuration, msg.Value, "value")
message["value"] = string(valueWithoutPrefix)
if valueSchema != "" {
message["value"] = FromAvro(valueWithoutPrefix, valueSchema)
}
message["value"] = valueDeserializer(configuration, msg.Value, "value", valueSchema)
}

messages = append(messages, message)
Expand Down
24 changes: 24 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,27 @@ require (
github.com/segmentio/kafka-go v0.4.28
go.k6.io/k6 v0.36.0
)

require (
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/dop251/goja v0.0.0-20220110113543-261677941f3c // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e // indirect
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 // indirect
golang.org/x/text v0.3.7-0.20210503195748-5c7c50ebbd4f // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
gopkg.in/guregu/null.v3 v3.3.0 // indirect
)
23 changes: 8 additions & 15 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func (*Kafka) Writer(brokers []string, topic string, auth string, compression st
}

func (*Kafka) Produce(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
keySchema string, valueSchema string) error {
return ProduceInternal(ctx, writer, messages, Configuration{}, keySchema, valueSchema)
}

func (*Kafka) ProduceWithConfiguration(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
configurationJson string, keySchema string, valueSchema string) error {
configuration, err := unmarshalConfiguration(configurationJson)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func (*Kafka) ProduceWithConfiguration(
}

func ProduceInternal(
ctx context.Context, writer *kafkago.Writer, messages []map[string]string,
ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
configuration Configuration, keySchema string, valueSchema string) error {
state := lib.GetState(ctx)
err := errors.New("state is nil")
Expand All @@ -88,18 +88,17 @@ func ProduceInternal(
return err
}

keySerializer := GetSerializer(configuration.Producer.KeySerializer, keySchema)
valueSerializer := GetSerializer(configuration.Producer.ValueSerializer, valueSchema)

kafkaMessages := make([]kafkago.Message, len(messages))
for i, message := range messages {

kafkaMessages[i] = kafkago.Message{}

// If a key was provided, add it to the message. Keys are optional.
if _, has_key := message["key"]; has_key {
key := []byte(message["key"])
if keySchema != "" {
key = ToAvro(message["key"], keySchema)
}
keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema)
keyData, err := keySerializer(configuration, writer.Stats().Topic, message["key"], "key", keySchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
Expand All @@ -109,19 +108,13 @@ func ProduceInternal(
}

// Then add then message
value := []byte(message["value"])
if valueSchema != "" {
value = ToAvro(message["value"], valueSchema)
}

valueData, err := addMagicByteAndSchemaIdPrefix(configuration, value, writer.Stats().Topic, "value", valueSchema)
valueData, err := valueSerializer(configuration, writer.Stats().Topic, message["value"], "value", valueSchema)
if err != nil {
ReportError(err, "Creation of message bytes failed.")
return err
}

kafkaMessages[i].Value = valueData

}

err = writer.WriteMessages(ctx, kafkaMessages...)
Expand Down
69 changes: 69 additions & 0 deletions scripts/test_bytes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*

This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 byte array messages per iteration.

*/

import { check } from "k6";
import {
writer,
produceWithConfiguration,
reader,
consumeWithConfiguration,
createTopic,
} from "k6/x/kafka"; // import kafka extension

const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "xk6_kafka_byte_array_topic";

const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

createTopic(bootstrapServers[0], kafkaTopic);

var configuration = JSON.stringify({
producer: {
keySerializer: "org.apache.kafka.common.serialization.StringSerializer",
valueSerializer: "org.apache.kafka.common.serialization.ByteArraySerializer",
},
consumer: {
keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer",
valueDeserializer: "org.apache.kafka.common.serialization.ByteArrayDeserializer",
},
});

const payload = "byte array payload"

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
key: "test-id-abc-" + index,
value: Array.from(payload, (x) => x.charCodeAt(0)),
},
{
key: "test-id-def-" + index,
value: Array.from(payload, (x) => x.charCodeAt(0)),
},
];

let error = produceWithConfiguration(producer, messages, configuration);
check(error, {
"is sent": (err) => err == undefined,
});
}

// Read 10 messages only
let messages = consumeWithConfiguration(consumer, 10, configuration);
check(messages, {
"10 messages returned": (msgs) => msgs.length == 10,
"key starts with 'test-id-' string": (msgs) => msgs[0].key.startsWith("test-id-"),
"payload is correct": (msgs) => String.fromCharCode(...msgs[0].value) === payload,
});
}

export function teardown(data) {
producer.close();
consumer.close();
}
40 changes: 40 additions & 0 deletions serde.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kafka

type Serializer func(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)
type Deserializer func(configuration Configuration, data []byte, keyOrValue string, schema string) interface{}

func GetSerializer(serializer string, schema string) Serializer {
// if schema exists default to AVRO without schema registry
if schema != "" {
return SerializeAvro
}

switch serializer {
case "org.apache.kafka.common.serialization.ByteArraySerializer":
return SerializeByteArray
case "org.apache.kafka.common.serialization.StringSerializer":
return SerializeString
case "io.confluent.kafka.serializers.KafkaAvroSerializer":
return SerializeAvro
default:
return SerializeString
}
}

func GetDeserializer(deserializer string, schema string) Deserializer {
// if schema exists default to AVRO without schema registry
if schema != "" {
return DeserializeAvro
}

switch deserializer {
case "org.apache.kafka.common.serialization.ByteArrayDeserializer":
return DeserializeByteArray
case "org.apache.kafka.common.serialization.StringDeserializer":
return DeserializeString
case "io.confluent.kafka.serializers.KafkaAvroDeserializer":
return DeserializeAvro
default:
return DeserializeString
}
}
16 changes: 16 additions & 0 deletions string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kafka

import "errors"

func SerializeString(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error) {
switch data.(type) {
case string:
return []byte(data.(string)), nil
default:
return nil, errors.New("Invalid data type provided for string serializer (requires string)")
}
}

func DeserializeString(configuration Configuration, data []byte, keyOrValue string, schema string) interface{} {
return string(data)
}