Skip to content

Commit

Permalink
KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment class…
Browse files Browse the repository at this point in the history
…es (#18276)

* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes

This commit adds the classes to represent a Streams group member in the
consumer coordinator.

Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
cadonna authored Jan 8, 2025
1 parent aa22676 commit 624dd45
Show file tree
Hide file tree
Showing 6 changed files with 1,233 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* An immutable assignment for a member.
*
* @param activeTasks Active tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param standbyTasks Standby tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
* @param warmupTasks Warm-up tasks assigned to the member.
* The key of the map is the subtopology ID and the value is the set of partition IDs.
*/
public record Assignment(Map<String, Set<Integer>> activeTasks,
Map<String, Set<Integer>> standbyTasks,
Map<String, Set<Integer>> warmupTasks) {

public Assignment {
activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
}

/**
* An empty assignment.
*/
public static final Assignment EMPTY = new Assignment(
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);

/**
* Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a
* {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
*
* @param record The record.
* @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}.
*/
public static Assignment fromRecord(
StreamsGroupTargetAssignmentMemberValue record
) {
return new Assignment(
record.activeTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.standbyTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.warmupTasks().stream()
.collect(Collectors.toMap(
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;

import java.util.HashMap;
import java.util.Map;

/**
* The various states that a member can be in. For their definition, refer to the documentation of
* {{@link org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
*/
public enum MemberState {

/**
* The member is fully reconciled with the desired target assignment.
*/
STABLE((byte) 1),

/**
* The member must revoke some tasks in order to be able to transition to the next epoch.
*/
UNREVOKED_TASKS((byte) 2),

/**
* The member transitioned to the last epoch but waits on some tasks which have not been revoked by their previous owners yet.
*/
UNRELEASED_TASKS((byte) 3),

/**
* The member is in an unknown state. This can only happen if a future version of the software introduces a new state unknown by this
* version.
*/
UNKNOWN((byte) 127);

private static final Map<Byte, MemberState> VALUES_TO_ENUMS = new HashMap<>();

static {
for (MemberState state : MemberState.values()) {
VALUES_TO_ENUMS.put(state.value(), state);
}
}

private final byte value;

MemberState(byte value) {
this.value = value;
}

public byte value() {
return value;
}

public static MemberState fromValue(byte value) {
MemberState state = VALUES_TO_ENUMS.get(value);
if (state == null) {
return UNKNOWN;
}
return state;
}
}
Loading

0 comments on commit 624dd45

Please sign in to comment.