From e33a8c2caee602783ab1358fbbba369373fbf3a3 Mon Sep 17 00:00:00 2001 From: Stefan Hildebrandt Date: Thu, 5 May 2022 01:13:45 +0200 Subject: [PATCH] Use ssl client certs for authentication. (#31) * Use ssl client certs for authentication. * Fix linting issue * Skip TLS config if the certificates and key files don't exist * Extract dialer creation into getAuthenticatedDialer function * ListTopics and CreateTopic now support authentication * Add DeleteTopic function * Fix issue with dialer being nil (return an unauthenticated dialer) * Rename dialer constructors * Document auth in topic handling functions * Add docs for deleteTopic * Create topic only on the first VU upon initialization * Delete topics upon teardown Co-authored-by: Stefan Hildebrandt Co-authored-by: Mostafa Moradian --- README.md | 19 ++- auth.go | 96 ++++++++++++++-- configuration.go | 17 +-- consumer.go | 18 +-- producer.go | 18 +-- schemaRegistry.go | 108 ++++++++++++------ scripts/test_avro.js | 15 ++- scripts/test_avro_no_key.js | 17 ++- scripts/test_avro_with_schema_registry.js | 26 +++-- .../test_avro_with_schema_registry_no_key.js | 36 +++--- scripts/test_bytes.js | 17 ++- scripts/test_json.js | 15 ++- scripts/test_json_with_snappy_compression.js | 24 ++-- scripts/test_sasl_auth.js | 32 ++++-- scripts/test_topics.js | 23 +++- topic.go | 65 ++++++++--- 16 files changed, 384 insertions(+), 162 deletions(-) diff --git a/README.md b/README.md index 4da0fed..5db2f6a 100644 --- a/README.md +++ b/README.md @@ -189,18 +189,31 @@ function consumeWithConfiguration(reader: object, limit: number, configurationJs * @param {number} partitions The number of partitions. * @param {number} replicationFactor The replication factor in a clustered setup. * @param {string} compression The compression algorithm. + * @param {string} auth Authentication credentials for SASL PLAIN/SCRAM. * @returns {string} A string containing the error. */ -function createTopic(address: string, topic: string, partitions: number, replicationFactor number, compression string) => string {} +function createTopic(address: string, topic: string, partitions: number, replicationFactor: number, compression: string, auth: string) => string {} /** - * List all topics in Kafka. + * Delete a topic from Kafka. It raises an error if the topic doesn't exist. * * @function * @param {string} address The broker address. + * @param {string} topic The topic name. + * @param {string} auth Authentication credentials for SASL PLAIN/SCRAM. + * @returns {string} A string containing the error. + */ +function deleteTopic(address: string, topic: string, auth: string) => string {} + +/** + * List all topics in Kafka. + * + * @function + * @param {string} address The broker address. + * @param {string} auth Authentication credentials for SASL PLAIN/SCRAM. * @returns {string} A nested list of strings containing a list of topics and the error (if any). */ -function listTopics(address: string) => [[string], string] {} +function listTopics(address: string, auth: string) => [[string], string] {} ``` diff --git a/auth.go b/auth.go index 5295b19..4384402 100644 --- a/auth.go +++ b/auth.go @@ -2,7 +2,11 @@ package kafka import ( "crypto/tls" + "crypto/x509" "encoding/json" + "io/ioutil" + "log" + "os" "time" kafkago "github.com/segmentio/kafka-go" @@ -11,20 +15,24 @@ import ( ) const ( + None = "none" Plain = "plain" SHA256 = "sha256" SHA512 = "sha512" ) type Credentials struct { - Username string `json:"username"` - Password string `json:"password"` - Algorithm string `json:"algorithm"` + Username string `json:"username"` + Password string `json:"password"` + Algorithm string `json:"algorithm"` + ClientCertPem string `json:"clientCertPem"` + ClientKeyPem string `json:"clientKeyPem"` + ServerCaPem string `json:"serverCaPem"` } func unmarshalCredentials(auth string) (creds *Credentials, err error) { creds = &Credentials{ - Algorithm: Plain, + Algorithm: None, } err = json.Unmarshal([]byte(auth), &creds) @@ -32,13 +40,11 @@ func unmarshalCredentials(auth string) (creds *Credentials, err error) { return } -func getDialer(creds *Credentials) (dialer *kafkago.Dialer) { +func getDialerFromCreds(creds *Credentials) (dialer *kafkago.Dialer) { dialer = &kafkago.Dialer{ Timeout: 10 * time.Second, DualStack: true, - TLS: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, + TLS: tlsConfig(creds), } if creds.Algorithm == Plain { @@ -48,7 +54,7 @@ func getDialer(creds *Credentials) (dialer *kafkago.Dialer) { } dialer.SASLMechanism = mechanism return - } else { + } else if creds.Algorithm == SHA256 || creds.Algorithm == SHA512 { hashes := make(map[string]scram.Algorithm) hashes["sha256"] = scram.SHA256 hashes["sha512"] = scram.SHA512 @@ -65,4 +71,76 @@ func getDialer(creds *Credentials) (dialer *kafkago.Dialer) { dialer.SASLMechanism = mechanism return } + return +} + +func getDialerFromAuth(auth string) (dialer *kafkago.Dialer) { + if auth != "" { + // Parse the auth string + creds, err := unmarshalCredentials(auth) + if err != nil { + ReportError(err, "Unable to unmarshal credentials") + return nil + } + + // Try to create an authenticated dialer from the credentials + // with TLS enabled if the credentials specify a client cert + // and key. + dialer = getDialerFromCreds(creds) + if dialer == nil { + ReportError(nil, "Dialer cannot authenticate") + return nil + } + } else { + // Create a normal (unauthenticated) dialer + dialer = &kafkago.Dialer{ + Timeout: 10 * time.Second, + DualStack: false, + } + } + + return +} + +func fileExists(filename string) bool { + _, err := os.Stat(filename) + return err == nil +} + +func tlsConfig(creds *Credentials) *tls.Config { + var clientCertFile = &creds.ClientCertPem + if !fileExists(*clientCertFile) { + ReportError(nil, "client certificate file not found") + return nil + } + + var clientKeyFile = &creds.ClientKeyPem + if !fileExists(*clientKeyFile) { + ReportError(nil, "client key file not found") + return nil + } + + var cert, err = tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile) + if err != nil { + log.Fatalf("Error creating x509 keypair from client cert file %s and client key file %s", *clientCertFile, *clientKeyFile) + } + + var caCertFile = &creds.ServerCaPem + if !fileExists(*caCertFile) { + ReportError(nil, "CA certificate file not found") + return nil + } + + caCert, err := ioutil.ReadFile(*caCertFile) + if err != nil { + log.Fatalf("Error opening cert file %s, Error: %s", *caCertFile, err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + } } diff --git a/configuration.go b/configuration.go index 0fecf77..d7fe749 100644 --- a/configuration.go +++ b/configuration.go @@ -6,13 +6,13 @@ import ( ) type ConsumerConfiguration struct { - KeyDeserializer string `json:"keyDeserializer"` - ValueDeserializer string `json:"valueDeserializer"` + KeyDeserializer string `json:"keyDeserializer"` + ValueDeserializer string `json:"valueDeserializer"` } type ProducerConfiguration struct { - KeySerializer string `json:"keySerializer"` - ValueSerializer string `json:"valueSerializer"` + KeySerializer string `json:"keySerializer"` + ValueSerializer string `json:"valueSerializer"` } type BasicAuth struct { @@ -23,11 +23,12 @@ type BasicAuth struct { type SchemaRegistryConfiguration struct { Url string `json:"url"` BasicAuth BasicAuth `json:"basicAuth"` + UseLatest bool `json:"useLatest"` } type Configuration struct { - Consumer ConsumerConfiguration `json:"consumer"` - Producer ProducerConfiguration `json:"producer"` + Consumer ConsumerConfiguration `json:"consumer"` + Producer ProducerConfiguration `json:"producer"` SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"` } @@ -43,7 +44,7 @@ func useKafkaAvroDeserializer(configuration Configuration, keyOrValue string) bo return false } if keyOrValue == "key" && configuration.Consumer.KeyDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" || - keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" { + keyOrValue == "value" && configuration.Consumer.ValueDeserializer == "io.confluent.kafka.serializers.KafkaAvroDeserializer" { return true } return false @@ -55,7 +56,7 @@ func useKafkaAvroSerializer(configuration Configuration, keyOrValue string) bool return false } if keyOrValue == "key" && configuration.Producer.KeySerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" || - keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" { + keyOrValue == "value" && configuration.Producer.ValueSerializer == "io.confluent.kafka.serializers.KafkaAvroSerializer" { return true } return false diff --git a/consumer.go b/consumer.go index 10d9e07..cc25634 100644 --- a/consumer.go +++ b/consumer.go @@ -12,22 +12,6 @@ import ( func (*Kafka) Reader( brokers []string, topic string, partition int, groupID string, offset int64, auth string) *kafkago.Reader { - var dialer *kafkago.Dialer - - if auth != "" { - creds, err := unmarshalCredentials(auth) - if err != nil { - ReportError(err, "Unable to unmarshal credentials") - return nil - } - - dialer = getDialer(creds) - if dialer == nil { - ReportError(nil, "Dialer cannot authenticate") - return nil - } - } - if groupID != "" { partition = 0 } @@ -40,7 +24,7 @@ func (*Kafka) Reader( MaxWait: time.Millisecond * 200, RebalanceTimeout: time.Second * 5, QueueCapacity: 1, - Dialer: dialer, + Dialer: getDialerFromAuth(auth), }) if offset > 0 { diff --git a/producer.go b/producer.go index 316569e..38d1be1 100644 --- a/producer.go +++ b/producer.go @@ -19,28 +19,12 @@ var ( ) func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer { - var dialer *kafkago.Dialer - - if auth != "" { - creds, err := unmarshalCredentials(auth) - if err != nil { - ReportError(err, "Unable to unmarshal credentials") - return nil - } - - dialer = getDialer(creds) - if dialer == nil { - ReportError(nil, "Dialer cannot authenticate") - return nil - } - } - writerConfig := kafkago.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: &kafkago.LeastBytes{}, BatchSize: 1, - Dialer: dialer, + Dialer: getDialerFromAuth(auth), Async: false, } diff --git a/schemaRegistry.go b/schemaRegistry.go index b9eee49..8901ed4 100644 --- a/schemaRegistry.go +++ b/schemaRegistry.go @@ -45,48 +45,90 @@ func addMagicByteAndSchemaIdPrefix(configuration Configuration, avroData []byte, var schemaIdCache = make(map[string]uint32) +type SchemaInfo struct { + Id int32 `json:"id"` + Version int32 `json:"version"` +} + func getSchemaId(configuration Configuration, topic string, keyOrValue string, schema string) (uint32, error) { if schemaIdCache[schema] > 0 { return schemaIdCache[schema], nil } if useKafkaAvroSerializer(configuration, keyOrValue) { - url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions" - codec, _ := goavro.NewCodec(schema) + if configuration.SchemaRegistry.UseLatest { + url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions/latest" + client := &http.Client{} + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") + if useBasicAuthWithCredentialSourceUserInfo(configuration) { + username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0] + password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1] + req.SetBasicAuth(username, password) + } + resp, err := client.Do(req) + if err != nil { + return 0, err + } + if resp.StatusCode >= 400 { + return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, response=%v", url, resp)) + } + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, err + } + var result SchemaInfo - body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}" + err = json.Unmarshal(bodyBytes, &result) + if err != nil { + return 0, err + } + schemaId := uint32(result.Id) + schemaIdCache[schema] = schemaId + return schemaId, nil + } else { + url := configuration.SchemaRegistry.Url + "/subjects/" + topic + "-" + keyOrValue + "/versions" + codec, _ := goavro.NewCodec(schema) - client := &http.Client{} - req, err := http.NewRequest("POST", url, bytes.NewReader([]byte(body))) - if err != nil { - return 0, err - } - req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") - if useBasicAuthWithCredentialSourceUserInfo(configuration) { - username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0] - password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1] - req.SetBasicAuth(username, password) - } - resp, err := client.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode >= 400 { - return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, body=%v, response=%v", url, body, resp)) - } - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } + body := "{\"schema\":\"" + strings.Replace(codec.CanonicalSchema(), "\"", "\\\"", -1) + "\"}" + + client := &http.Client{} + req, err := http.NewRequest("POST", url, bytes.NewReader([]byte(body))) + if err != nil { + return 0, err + } + req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") + if useBasicAuthWithCredentialSourceUserInfo(configuration) { + username := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[0] + password := strings.Split(configuration.SchemaRegistry.BasicAuth.UserInfo, ":")[1] + req.SetBasicAuth(username, password) + } + resp, err := client.Do(req) + if err != nil { + return 0, err + } + if resp.StatusCode >= 400 { + return 0, errors.New(fmt.Sprintf("Retrieval of schema ids failed. Details: Url= %v, body=%v, response=%v", url, body, resp)) + } + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var result map[string]int32 + err = json.Unmarshal(bodyBytes, &result) + if err != nil { + return 0, err + } + schemaId := uint32(result["id"]) + schemaIdCache[schema] = schemaId + return schemaId, nil - var result map[string]int32 - err = json.Unmarshal(bodyBytes, &result) - if err != nil { - return 0, err } - schemaId := uint32(result["id"]) - schemaIdCache[schema] = schemaId - return schemaId, nil } return 0, nil } diff --git a/scripts/test_avro.js b/scripts/test_avro.js index 205a98d..fa67a7c 100644 --- a/scripts/test_avro.js +++ b/scripts/test_avro.js @@ -58,7 +58,9 @@ const valueSchema = JSON.stringify({ ], }); -createTopic(bootstrapServers[0], kafkaTopic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} export default function () { for (let index = 0; index < 100; index++) { @@ -106,6 +108,17 @@ export default function () { } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_avro_no_key.js b/scripts/test_avro_no_key.js index d8bb4f2..2b44b52 100644 --- a/scripts/test_avro_no_key.js +++ b/scripts/test_avro_no_key.js @@ -46,7 +46,9 @@ const valueSchema = JSON.stringify({ ], }); -createTopic(bootstrapServers[0], kafkaTopic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} export default function () { for (let index = 0; index < 100; index++) { @@ -87,11 +89,22 @@ export default function () { }); for (let index = 0; index < rx_messages.length; index++) { - console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); + console.debug("Received Message: " + JSON.stringify(rx_messages[index])); } } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_avro_with_schema_registry.js b/scripts/test_avro_with_schema_registry.js index cabefd7..392516d 100644 --- a/scripts/test_avro_with_schema_registry.js +++ b/scripts/test_avro_with_schema_registry.js @@ -55,8 +55,7 @@ const valueSchema = `{ var configuration = JSON.stringify({ consumer: { keyDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer", - valueDeserializer: - "io.confluent.kafka.serializers.KafkaAvroDeserializer", + valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer", }, producer: { keySerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer", @@ -71,7 +70,9 @@ var configuration = JSON.stringify({ }, }); -createTopic(bootstrapServers[0], topic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} export default function () { for (let index = 0; index < 100; index++) { @@ -98,19 +99,24 @@ export default function () { }); } - let messages = consumeWithConfiguration( - consumer, - 20, - configuration, - keySchema, - valueSchema - ); + let messages = consumeWithConfiguration(consumer, 20, configuration, keySchema, valueSchema); check(messages, { "20 message returned": (msgs) => msgs.length == 20, }); } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_avro_with_schema_registry_no_key.js b/scripts/test_avro_with_schema_registry_no_key.js index 8df9c59..6752a6e 100644 --- a/scripts/test_avro_with_schema_registry_no_key.js +++ b/scripts/test_avro_with_schema_registry_no_key.js @@ -37,8 +37,7 @@ const valueSchema = `{ var configuration = JSON.stringify({ consumer: { keyDeserializer: "", - valueDeserializer: - "io.confluent.kafka.serializers.KafkaAvroDeserializer", + valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer", }, producer: { keySerializer: "", @@ -49,7 +48,9 @@ var configuration = JSON.stringify({ }, }); -createTopic(bootstrapServers[0], topic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} export default function () { for (let index = 0; index < 100; index++) { @@ -61,35 +62,34 @@ export default function () { }), }, ]; - let error = produceWithConfiguration( - producer, - messages, - configuration, - null, - valueSchema - ); + let error = produceWithConfiguration(producer, messages, configuration, null, valueSchema); check(error, { "is sent": (err) => err == undefined, }); } - let rx_messages = consumeWithConfiguration( - consumer, - 20, - configuration, - null, - valueSchema - ); + let rx_messages = consumeWithConfiguration(consumer, 20, configuration, null, valueSchema); check(rx_messages, { "20 message returned": (msgs) => msgs.length == 20, }); for (let index = 0; index < rx_messages.length; index++) { - console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); + console.debug("Received Message: " + JSON.stringify(rx_messages[index])); } } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_bytes.js b/scripts/test_bytes.js index c6126fa..8cd4c72 100644 --- a/scripts/test_bytes.js +++ b/scripts/test_bytes.js @@ -20,7 +20,9 @@ const kafkaTopic = "xk6_kafka_byte_array_topic"; const producer = writer(bootstrapServers, kafkaTopic); const consumer = reader(bootstrapServers, kafkaTopic); -createTopic(bootstrapServers[0], kafkaTopic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} var configuration = JSON.stringify({ producer: { @@ -33,7 +35,7 @@ var configuration = JSON.stringify({ }, }); -const payload = "byte array payload" +const payload = "byte array payload"; export default function () { for (let index = 0; index < 100; index++) { @@ -64,6 +66,17 @@ export default function () { } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_json.js b/scripts/test_json.js index fea6ff7..78f09dd 100644 --- a/scripts/test_json.js +++ b/scripts/test_json.js @@ -14,7 +14,9 @@ const kafkaTopic = "xk6_kafka_json_topic"; const producer = writer(bootstrapServers, kafkaTopic); const consumer = reader(bootstrapServers, kafkaTopic); -createTopic(bootstrapServers[0], kafkaTopic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic); +} export default function () { for (let index = 0; index < 100; index++) { @@ -61,6 +63,17 @@ export default function () { } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_json_with_snappy_compression.js b/scripts/test_json_with_snappy_compression.js index e0cf295..5f7aa8f 100644 --- a/scripts/test_json_with_snappy_compression.js +++ b/scripts/test_json_with_snappy_compression.js @@ -26,14 +26,11 @@ const consumer = reader(bootstrapServers, kafkaTopic); const replicationFactor = 1; const partitions = 1; -// Create the topic or do nothing if the topic exists. -createTopic( - bootstrapServers[0], - kafkaTopic, - partitions, - replicationFactor, - compression -); + +if (__VU == 1) { + // Create the topic or do nothing if the topic exists. + createTopic(bootstrapServers[0], kafkaTopic, partitions, replicationFactor, compression); +} export default function () { for (let index = 0; index < 100; index++) { @@ -80,6 +77,17 @@ export default function () { } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_sasl_auth.js b/scripts/test_sasl_auth.js index ef0f5d7..c85a917 100644 --- a/scripts/test_sasl_auth.js +++ b/scripts/test_sasl_auth.js @@ -7,9 +7,9 @@ also uses SASL authentication. */ import { check } from "k6"; -import { writer, produce, reader, consume, createTopic } from "k6/x/kafka"; // import kafka extension +import { writer, produce, reader, consume, createTopic, deleteTopic, listTopics } from "k6/x/kafka"; // import kafka extension -const bootstrapServers = ["localhost:9092"]; +const bootstrapServers = ["localhost:9093"]; const kafkaTopic = "xk6_kafka_json_topic"; const auth = JSON.stringify({ username: "client", @@ -23,18 +23,17 @@ const offset = 0; // partition and groupID are mutually exclusive const partition = 1; const groupID = ""; +const partitions = 1; +const replicationFactor = 1; +const compression = ""; const producer = writer(bootstrapServers, kafkaTopic, auth); -const consumer = reader( - bootstrapServers, - kafkaTopic, - partition, - groupID, - offset, - auth -); +const consumer = reader(bootstrapServers, kafkaTopic, partition, groupID, offset, auth); -createTopic(bootstrapServers[0], kafkaTopic); +if (__VU == 1) { + createTopic(bootstrapServers[0], kafkaTopic, partitions, replicationFactor, compression, auth); + console.log("Existing topics: ", listTopics(bootstrapServers[0], auth)); +} export default function () { for (let index = 0; index < 100; index++) { @@ -81,6 +80,17 @@ export default function () { } export function teardown(data) { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } producer.close(); consumer.close(); } diff --git a/scripts/test_topics.js b/scripts/test_topics.js index 4ea3349..30d9a8c 100644 --- a/scripts/test_topics.js +++ b/scripts/test_topics.js @@ -5,19 +5,16 @@ list topics on all Kafka partitions and creates a topic. */ -import { - createTopic, - listTopics -} from 'k6/x/kafka'; // import kafka extension +import { createTopic, listTopics } from "k6/x/kafka"; // import kafka extension const address = "localhost:9092"; const kafkaTopic = "xk6_kafka_test_topic"; -const results = listTopics(address) +const results = listTopics(address); const error = createTopic(address, kafkaTopic); export default function () { - results.forEach(topic => console.log(topic)); + results.forEach((topic) => console.log(topic)); if (error === undefined) { // If no error returns, it means that the topic @@ -27,3 +24,17 @@ export default function () { console.log("Error while creating topic: ", error); } } + +export function teardown() { + if (__VU == 1) { + // Delete the topic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the topic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting topic: ", error); + } + } +} diff --git a/topic.go b/topic.go index 7b84169..54c666b 100644 --- a/topic.go +++ b/topic.go @@ -1,31 +1,29 @@ package kafka import ( - "net" - "strconv" + "errors" "strings" "github.com/segmentio/kafka-go" kafkago "github.com/segmentio/kafka-go" ) -func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor int, compression string) error { - conn, err := kafkago.Dial("tcp", address) - if err != nil { - return err - } - defer conn.Close() +func (k *Kafka) CreateTopic(address, topic string, partitions, replicationFactor int, compression string, auth string) error { + dialer := getDialerFromAuth(auth) - controller, err := conn.Controller() - if err != nil { + ctx := k.vu.Context() + err := errors.New("context is nil") + + if ctx == nil { + ReportError(err, "Cannot determine context") return err } - var controllerConn *kafkago.Conn - controllerConn, err = kafkago.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + + conn, err := dialer.DialContext(ctx, "tcp", address) if err != nil { return err } - defer controllerConn.Close() + defer conn.Close() if partitions <= 0 { partitions = 1 @@ -48,7 +46,32 @@ func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor i }) } - err = controllerConn.CreateTopics([]kafkago.TopicConfig{topicConfig}...) + err = conn.CreateTopics([]kafkago.TopicConfig{topicConfig}...) + if err != nil { + return err + } + + return nil +} + +func (k *Kafka) DeleteTopic(address, topic string, auth string) error { + dialer := getDialerFromAuth(auth) + + ctx := k.vu.Context() + err := errors.New("context is nil") + + if ctx == nil { + ReportError(err, "Cannot determine context") + return err + } + + conn, err := dialer.DialContext(ctx, "tcp", address) + if err != nil { + return err + } + defer conn.Close() + + err = conn.DeleteTopics([]string{topic}...) if err != nil { return err } @@ -56,8 +79,18 @@ func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor i return nil } -func (*Kafka) ListTopics(address string) ([]string, error) { - conn, err := kafkago.Dial("tcp", address) +func (k *Kafka) ListTopics(address string, auth string) ([]string, error) { + dialer := getDialerFromAuth(auth) + + ctx := k.vu.Context() + err := errors.New("context is nil") + + if ctx == nil { + ReportError(err, "Cannot determine context") + return nil, err + } + + conn, err := dialer.DialContext(ctx, "tcp", address) if err != nil { return nil, err }