From 9416dd80fb0dba71ff73a8cb4d2b919f54651006 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:03:50 +0530 Subject: [PATCH] Fix an assert being triggered when no metrics matched on the client side during send push telemetry call (#4826) --- CHANGELOG.md | 29 +++++++++++++++++++++++++++ src-cpp/rdkafkacpp.h | 2 +- src/rdkafka.h | 2 +- src/rdkafka_telemetry.c | 36 ++++++++++++++++++++-------------- src/rdkafka_telemetry_encode.c | 4 ++++ tests/0150-telemetry_mock.c | 12 ++++++++---- vcpkg.json | 2 +- 7 files changed, 65 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68142d0d3c..9bacacb7e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,34 @@ +# librdkafka v2.5.3 + +librdkafka v2.5.3 is a feature release. + +* Fix an assert being triggered during push telemetry call when no metrics matched on the client side. (#4826) + +## Fixes + +### Telemetry fixes + +* Issue: #4833 +Fix a regression introduced with [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) support in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. +Happening since 2.5.0 (#4826). + +*Note: there were no v2.5.1 and v2.5.2 librdkafka releases* + + # librdkafka v2.5.0 +> [!WARNING] +This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription. +> +> You won't face any problem if: +> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability). +> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side. +> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`. +> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there. +> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client. +> +> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all. + librdkafka v2.5.0 is a feature release. * [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) diff --git a/src-cpp/rdkafkacpp.h b/src-cpp/rdkafkacpp.h index eb04afa1f5..23741706f6 100644 --- a/src-cpp/rdkafkacpp.h +++ b/src-cpp/rdkafkacpp.h @@ -112,7 +112,7 @@ namespace RdKafka { * @remark This value should only be used during compile time, * for runtime checks of version use RdKafka::version() */ -#define RD_KAFKA_VERSION 0x020500ff +#define RD_KAFKA_VERSION 0x020503ff /** * @brief Returns the librdkafka version as integer. diff --git a/src/rdkafka.h b/src/rdkafka.h index 7d4ae8112b..b251e4c51a 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -167,7 +167,7 @@ typedef SSIZE_T ssize_t; * @remark This value should only be used during compile time, * for runtime checks of version use rd_kafka_version() */ -#define RD_KAFKA_VERSION 0x020500ff +#define RD_KAFKA_VERSION 0x020503ff /** * @brief Returns the librdkafka version as integer. diff --git a/src/rdkafka_telemetry.c b/src/rdkafka_telemetry.c index 3f2fece177..176a555e62 100644 --- a/src/rdkafka_telemetry.c +++ b/src/rdkafka_telemetry.c @@ -343,20 +343,25 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, rd_bool_t terminating) { rd_buf_t *metrics_payload = rd_kafka_telemetry_encode_metrics(rk); - size_t compressed_metrics_payload_size = 0; - void *compressed_metrics_payload = NULL; - rd_kafka_compression_t compression_used = - rd_kafka_push_telemetry_payload_compress( - rk, rkb, metrics_payload, &compressed_metrics_payload, - &compressed_metrics_payload_size); - if (compressed_metrics_payload_size > - (size_t)rk->rk_telemetry.telemetry_max_bytes) { - rd_kafka_log(rk, LOG_WARNING, "TELEMETRY", - "Metrics payload size %" PRIusz - " exceeds telemetry_max_bytes %" PRId32 - "specified by the broker.", - compressed_metrics_payload_size, - rk->rk_telemetry.telemetry_max_bytes); + size_t compressed_metrics_payload_size = 0; + void *compressed_metrics_payload = NULL; + rd_kafka_compression_t compression_used = RD_KAFKA_COMPRESSION_NONE; + if (metrics_payload) { + compression_used = rd_kafka_push_telemetry_payload_compress( + rk, rkb, metrics_payload, &compressed_metrics_payload, + &compressed_metrics_payload_size); + if (compressed_metrics_payload_size > + (size_t)rk->rk_telemetry.telemetry_max_bytes) { + rd_kafka_log(rk, LOG_WARNING, "TELEMETRY", + "Metrics payload size %" PRIusz + " exceeds telemetry_max_bytes %" PRId32 + "specified by the broker.", + compressed_metrics_payload_size, + rk->rk_telemetry.telemetry_max_bytes); + } + } else { + rd_kafka_dbg(rk, TELEMETRY, "PUSH", + "No metrics to push. Sending empty payload."); } rd_kafka_dbg(rk, TELEMETRY, "PUSH", @@ -369,7 +374,8 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk, 0, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_handle_PushTelemetry, NULL); - rd_buf_destroy_free(metrics_payload); + if (metrics_payload) + rd_buf_destroy_free(metrics_payload); if (compression_used != RD_KAFKA_COMPRESSION_NONE) rd_free(compressed_metrics_payload); diff --git a/src/rdkafka_telemetry_encode.c b/src/rdkafka_telemetry_encode.c index 5e5a5a3dc1..05a27562e1 100644 --- a/src/rdkafka_telemetry_encode.c +++ b/src/rdkafka_telemetry_encode.c @@ -609,6 +609,10 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) { RD_KAFKA_TELEMETRY_METRIC_INFO(rk); size_t total_metrics_count = metrics_to_encode_count; size_t i, metric_idx = 0; + + if (!metrics_to_encode_count) + return NULL; + opentelemetry_proto_metrics_v1_MetricsData metrics_data = opentelemetry_proto_metrics_v1_MetricsData_init_zero; diff --git a/tests/0150-telemetry_mock.c b/tests/0150-telemetry_mock.c index 52fb76032f..871e8c47ce 100644 --- a/tests/0150-telemetry_mock.c +++ b/tests/0150-telemetry_mock.c @@ -202,11 +202,11 @@ void do_test_telemetry_get_subscription_push_telemetry(void) { * resent after the push interval until there are subscriptions. * See `requests_expected` for detailed expected flow. */ -void do_test_telemetry_empty_subscriptions_list(void) { +void do_test_telemetry_empty_subscriptions_list(char *subscription_regex) { rd_kafka_conf_t *conf; const char *bootstraps; rd_kafka_mock_cluster_t *mcluster; - char *expected_metrics[] = {"*"}; + char *expected_metrics[] = {subscription_regex}; rd_kafka_t *producer = NULL; rd_kafka_mock_request_t **requests = NULL; size_t request_cnt; @@ -234,7 +234,7 @@ void do_test_telemetry_empty_subscriptions_list(void) { }; - SUB_TEST(); + SUB_TEST("Test with subscription regex: %s", subscription_regex); mcluster = test_mock_cluster_new(1, &bootstraps); rd_kafka_mock_telemetry_set_requested_metrics(mcluster, NULL, 0); @@ -534,7 +534,11 @@ int main_0150_telemetry_mock(int argc, char **argv) { do_test_telemetry_get_subscription_push_telemetry(); - do_test_telemetry_empty_subscriptions_list(); + // All metrics are subscribed + do_test_telemetry_empty_subscriptions_list("*"); + + // No metrics are subscribed + do_test_telemetry_empty_subscriptions_list("non-existent-metric"); do_test_telemetry_terminating_push(); diff --git a/vcpkg.json b/vcpkg.json index 050d7094c7..15784811ca 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -1,6 +1,6 @@ { "name": "librdkafka", - "version": "2.5.0", + "version": "2.5.3", "dependencies": [ { "name": "zstd",