Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups #2378

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
Console.WriteLine($" State: {group.State}");
Console.WriteLine($" Type: {group.Type}");
Console.WriteLine($" Members:");
foreach (var m in group.Members)
{
Expand All @@ -801,6 +802,13 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
topicPartitions = String.Join(", ", m.Assignment.TopicPartitions.Select(tp => tp.ToString()));
}
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
Console.WriteLine($" TargetAssignment:");
var targetTopicPartitions = "";
if (m.TargetAssignment.TopicPartitions != null)
{
targetTopicPartitions = String.Join(", ", m.TargetAssignment.TopicPartitions.Select(tp => tp.ToString()));
}
Console.WriteLine($" TopicPartitions: [{targetTopicPartitions}]");
}
if (includeAuthorizedOperations)
{
Expand Down
6 changes: 6 additions & 0 deletions src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class ConsumerGroupDescription
/// </summary>
public ConsumerGroupState State { get; set; }

/// <summary>
/// Consumer group type.
/// </summary>
public ConsumerGroupType Type { get; set;}

/// <summary>
/// Broker that acts as consumer group coordinator (null if not known).
/// </summary>
Expand Down Expand Up @@ -92,6 +97,7 @@ public override string ToString()
result.Append($"{{\"GroupId\": {GroupId.Quote()}");
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
result.Append($", \"Type\": {Type.ToString().Quote()}");
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");

Expand Down
11 changes: 10 additions & 1 deletion src/Confluent.Kafka/Admin/MemberDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class MemberDescription
/// Member assignment.
/// </summary>
public MemberAssignment Assignment { get; set; }

/// <summary>
/// Target assignment.
/// </summary>
public MemberAssignment TargetAssignment { get; set; }

/// <summary>
/// Returns a JSON representation of this object.
Expand All @@ -63,10 +68,14 @@ public override string ToString()
Assignment.TopicPartitions.Select(topicPartition =>
$"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}"
).ToList());
var targetAssignment = string.Join(",",
TargetAssignment.TopicPartitions.Select(topicPartition =>
$"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}"
).ToList());

result.Append($"{{\"ClientId\": {ClientId.Quote()}");
result.Append($", \"GroupInstanceId\": {GroupInstanceId.Quote()}, \"ConsumerId\": {ConsumerId.Quote()}");
result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}]}}");
result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}], \"TargetAssignment\": [{targetAssignment}]}}");

return result.ToString();
}
Expand Down
9 changes: 9 additions & 0 deletions src/Confluent.Kafka/AdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr
{
member.Assignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(topicPartitionPtr);
}
var targetAssignmentPtr = Librdkafka.MemberDescription_target_assignment(memberPtr);
var targetTopicPartitionPtr = Librdkafka.MemberAssignment_target_partitions(targetAssignmentPtr);
member.TargetAssignment = new MemberAssignment();
if (targetTopicPartitionPtr != IntPtr.Zero)
{
member.TargetAssignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(targetTopicPartitionPtr);
}
members.Add(member);
}

Expand All @@ -362,6 +369,8 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr
PtrToStringUTF8(Librdkafka.ConsumerGroupDescription_partition_assignor(groupPtr)),
State =
Librdkafka.ConsumerGroupDescription_state(groupPtr),
Type =
Librdkafka.ConsumerGroupDescription_type(groupPtr),
Coordinator = coordinator,
Members = members,
AuthorizedOperations = authorizedOperations,
Expand Down
19 changes: 19 additions & 0 deletions src/Confluent.Kafka/Impl/LibRdKafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ static bool SetDelegates(Type nativeMethodsClass)
_ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupDescription_is_simple_consumer_group_delegate));
_ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof (_ConsumerGroupDescription_partition_assignor_delegate));
_ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof (_ConsumerGroupDescription_state_delegate));
_ConsumerGroupDescription_type = (_ConsumerGroupDescription_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_type").CreateDelegate(typeof (_ConsumerGroupDescription_type_delegate));
_ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof (_ConsumerGroupDescription_coordinator_delegate));
_ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof (_ConsumerGroupDescription_member_count_delegate));
_ConsumerGroupDescription_authorized_operations = (_ConsumerGroupDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_authorized_operations").CreateDelegate(typeof (_ConsumerGroupDescription_authorized_operations_delegate));
Expand All @@ -434,6 +435,8 @@ static bool SetDelegates(Type nativeMethodsClass)
_MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof (_MemberDescription_host_delegate));
_MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof (_MemberDescription_assignment_delegate));
_MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof (_MemberAssignment_partitions_delegate));
_MemberDescription_target_assignment = (_MemberDescription_target_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_target_assignment").CreateDelegate(typeof (_MemberDescription_target_assignment_delegate));
_MemberAssignment_target_partitions = (_MemberAssignment_target_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_target_partitions").CreateDelegate(typeof (_MemberAssignment_target_partitions_delegate));
_Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof (_Node_id_delegate));
_Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof (_Node_host_delegate));
_Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof (_Node_port_delegate));
Expand Down Expand Up @@ -1953,6 +1956,12 @@ internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdes
return _ConsumerGroupDescription_state(grpdesc);
}

private delegate ConsumerGroupType _ConsumerGroupDescription_type_delegate(IntPtr grpdesc);
private static _ConsumerGroupDescription_type_delegate _ConsumerGroupDescription_type;

internal static ConsumerGroupType ConsumerGroupDescription_type(IntPtr grpdesc)
=> _ConsumerGroupDescription_type(grpdesc);

private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc);
private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator;
internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc)
Expand Down Expand Up @@ -2003,6 +2012,16 @@ internal static IntPtr MemberDescription_assignment(IntPtr member)
internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment)
=> _MemberAssignment_partitions(assignment);

private delegate IntPtr _MemberDescription_target_assignment_delegate(IntPtr member);
private static _MemberDescription_target_assignment_delegate _MemberDescription_target_assignment;
internal static IntPtr MemberDescription_target_assignment(IntPtr member)
=> _MemberDescription_target_assignment(member);

private delegate IntPtr _MemberAssignment_target_partitions_delegate(IntPtr assignment);
private static _MemberAssignment_target_partitions_delegate _MemberAssignment_target_partitions;
internal static IntPtr MemberAssignment_target_partitions(IntPtr assignment)
=> _MemberAssignment_target_partitions(assignment);

private delegate IntPtr _Node_id_delegate(IntPtr node);
private static _Node_id_delegate _Node_id;
internal static IntPtr Node_id(IntPtr node) => _Node_id(node);
Expand Down
9 changes: 9 additions & 0 deletions src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);

Expand Down Expand Up @@ -1103,6 +1106,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);

Expand Down Expand Up @@ -1107,6 +1110,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);

Expand Down Expand Up @@ -1107,6 +1110,12 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_MemberAssignment_target_partitions(IntPtr assignment);

[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);

Expand Down
Loading