Skip to content

Commit

Permalink
KAFKA-18311: Configuring repartition topics (3/N) (#18395)
Browse files Browse the repository at this point in the history
A simplified port of "RepartitionTopics" from the client-side to the group coordinator.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Jan 9, 2025
1 parent 11459ae commit 3bda9f8
Show file tree
Hide file tree
Showing 2 changed files with 384 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;

import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;

/**
* Responsible for configuring the number of partitions in repartitioning topics. It computes a fix-point iteration, deriving the number of
* partitions for each repartition topic based on the number of partitions of the source topics of the topology, if the number of
* partitions is not explicitly set in the topology.
*/
public class RepartitionTopics {

private final Logger log;
private final Collection<Subtopology> subtopologies;
private final Function<String, OptionalInt> topicPartitionCountProvider;

/**
* The constructor for the class.
*
* @param logContext The context for emitting log messages.
* @param subtopologies The subtopologies for the requested topology.
* @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the
* broker.
*/
public RepartitionTopics(final LogContext logContext,
final Collection<Subtopology> subtopologies,
final Function<String, OptionalInt> topicPartitionCountProvider) {
this.log = logContext.logger(getClass());
this.subtopologies = subtopologies;
this.topicPartitionCountProvider = topicPartitionCountProvider;
}

/**
* Returns the set of the number of partitions for each repartition topic.
*
* @return the map of repartition topics for the requested topology to their required number of partitions.
*
* @throws TopicConfigurationException if no valid configuration can be found given the broker state, for example, if a source topic
* is missing.
* @throws StreamsInvalidTopologyException if the number of partitions for all repartition topics cannot be determined, e.g.
* because of loops, or if a repartition source topic is not a sink topic of any subtopology.
*/
public Map<String, Integer> setup() {
final Set<String> missingSourceTopicsForTopology = new HashSet<>();

for (final Subtopology subtopology : subtopologies) {
final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(subtopology);
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
}

if (!missingSourceTopicsForTopology.isEmpty()) {
throw TopicConfigurationException.missingSourceTopics(String.format("Missing source topics: %s",
String.join(", ", missingSourceTopicsForTopology)));
}

final Map<String, Integer> repartitionTopicPartitionCount = computeRepartitionTopicPartitionCount();

for (final Subtopology subtopology : subtopologies) {
if (subtopology.repartitionSourceTopics().stream().anyMatch(repartitionTopic -> !repartitionTopicPartitionCount.containsKey(repartitionTopic.name()))) {
throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all repartition topics, because "
+ "a repartition source topic is never used as a sink topic.");
}
}

return repartitionTopicPartitionCount;
}

private Set<String> computeMissingExternalSourceTopics(final Subtopology subtopology) {
final Set<String> missingExternalSourceTopics = new HashSet<>(subtopology.sourceTopics());
for (final TopicInfo topicInfo : subtopology.repartitionSourceTopics()) {
missingExternalSourceTopics.remove(topicInfo.name());
}
missingExternalSourceTopics.removeIf(x -> topicPartitionCountProvider.apply(x).isPresent());
return missingExternalSourceTopics;
}

/**
* Computes the number of partitions and returns it for each repartition topic.
*/
private Map<String, Integer> computeRepartitionTopicPartitionCount() {
boolean partitionCountNeeded;
Map<String, Integer> repartitionTopicPartitionCounts = new HashMap<>();

for (final Subtopology subtopology : subtopologies) {
for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
if (repartitionSourceTopic.partitions() != 0) {
repartitionTopicPartitionCounts.put(repartitionSourceTopic.name(), repartitionSourceTopic.partitions());
}
}
}

do {
partitionCountNeeded = false;
// avoid infinitely looping without making any progress on unknown repartitions
boolean progressMadeThisIteration = false;

for (final Subtopology subtopology : subtopologies) {
for (final String repartitionSinkTopic : subtopology.repartitionSinkTopics()) {
if (!repartitionTopicPartitionCounts.containsKey(repartitionSinkTopic)) {
final Integer numPartitions = computePartitionCount(
repartitionTopicPartitionCounts,
subtopology
);

if (numPartitions == null) {
partitionCountNeeded = true;
log.trace("Unable to determine number of partitions for {}, another iteration is needed",
repartitionSinkTopic);
} else {
log.trace("Determined number of partitions for {} to be {}",
repartitionSinkTopic,
numPartitions);
repartitionTopicPartitionCounts.put(repartitionSinkTopic, numPartitions);
progressMadeThisIteration = true;
}
}
}
}
if (!progressMadeThisIteration && partitionCountNeeded) {
throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all " +
"repartition topics. There may be loops in the topology that cannot be resolved.");
}
} while (partitionCountNeeded);

return repartitionTopicPartitionCounts;
}

private Integer computePartitionCount(final Map<String, Integer> repartitionTopicPartitionCounts,
final Subtopology subtopology) {
Integer partitionCount = null;
// try set the number of partitions for this repartition topic if it is not set yet
// use the maximum of all its source topic partitions as the number of partitions

// It is possible that there is another internal topic, i.e,
// map().join().join(map())
for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
Integer numPartitionsCandidate = repartitionTopicPartitionCounts.get(repartitionSourceTopic.name());
if (numPartitionsCandidate != null && (partitionCount == null || numPartitionsCandidate > partitionCount)) {
partitionCount = numPartitionsCandidate;
}
}
for (final String externalSourceTopic : subtopology.sourceTopics()) {
final OptionalInt actualPartitionCount = topicPartitionCountProvider.apply(externalSourceTopic);
if (actualPartitionCount.isPresent() && (partitionCount == null || actualPartitionCount.getAsInt() > partitionCount)) {
partitionCount = actualPartitionCount.getAsInt();
}
}
return partitionCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.function.Function;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RepartitionTopicsTest {

private static final LogContext LOG_CONTEXT = new LogContext();
private static final String SOURCE_TOPIC_NAME1 = "source1";
private static final String SOURCE_TOPIC_NAME2 = "source2";
private static final TopicInfo REPARTITION_TOPIC1 = new TopicInfo().setName("repartition1").setPartitions(4);
private static final TopicInfo REPARTITION_TOPIC2 = new TopicInfo().setName("repartition2").setPartitions(2);
private static final TopicInfo REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = new TopicInfo().setName("repartitionWithoutPartitionCount");

private static OptionalInt sourceTopicPartitionCounts(final String topicName) {
return SOURCE_TOPIC_NAME1.equals(topicName) || SOURCE_TOPIC_NAME2.equals(topicName) ? OptionalInt.of(3) : OptionalInt.empty();
}

@Test
public void shouldSetupRepartitionTopics() {
final Subtopology subtopology1 = new Subtopology()
.setSubtopologyId("subtopology1")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
final Subtopology subtopology2 = new Subtopology()
.setSubtopologyId("subtopology2")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
final List<Subtopology> subtopologies = List.of(subtopology1, subtopology2);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
subtopologies,
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final Map<String, Integer> setup = repartitionTopics.setup();

assertEquals(
Map.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions()),
setup
);
}

@Test
public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
final Subtopology subtopology1 = new Subtopology()
.setSubtopologyId("subtopology1")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
final Subtopology subtopology2 = new Subtopology()
.setSubtopologyId("subtopology2")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
final Function<String, OptionalInt> topicPartitionCountProvider =
s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : sourceTopicPartitionCounts(s);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology1, subtopology2),
topicPartitionCountProvider
);

final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class,
repartitionTopics::setup);

assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
assertEquals("Missing source topics: source1", exception.getMessage());
}

@Test
public void shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToLoops() {
final Subtopology subtopology1 = new Subtopology()
.setSubtopologyId("subtopology1")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology1),
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final StreamsInvalidTopologyException exception = assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);

assertEquals(
"Failed to compute number of partitions for all repartition topics. There may be loops in the topology that cannot be resolved.",
exception.getMessage()
);
}

@Test
public void shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToMissingSinks() {
final Subtopology subtopology1 = new Subtopology()
.setSubtopologyId("subtopology1")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology1),
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final StreamsInvalidTopologyException exception = assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);

assertEquals(
"Failed to compute number of partitions for all repartition topics, because a repartition source topic is never used as a sink topic.",
exception.getMessage()
);
}

@Test
public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
final Subtopology subtopology = new Subtopology()
.setSubtopologyId("subtopology0")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC2));
final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
.setSubtopologyId("subtopologyWithoutPartitionCount")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology, subtopologyWithoutPartitionCount),
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final Map<String, Integer> setup = repartitionTopics.setup();

assertEquals(Map.of(
REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
REPARTITION_TOPIC2.name(), REPARTITION_TOPIC2.partitions(),
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), sourceTopicPartitionCounts(SOURCE_TOPIC_NAME1).getAsInt()
), setup);
}

@Test
public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() {
final Subtopology subtopology = new Subtopology()
.setSubtopologyId("subtopology0")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
.setSubtopologyId("subtopologyWithoutPartitionCount")
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology, subtopologyWithoutPartitionCount),
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final Map<String, Integer> setup = repartitionTopics.setup();

assertEquals(
Map.of(
REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), REPARTITION_TOPIC1.partitions()
),
setup
);
}

@Test
public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics() {
final Subtopology subtopology = new Subtopology()
.setSubtopologyId("subtopology0")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1));
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
List.of(subtopology),
RepartitionTopicsTest::sourceTopicPartitionCounts
);

final Map<String, Integer> setup = repartitionTopics.setup();

assertEquals(Collections.emptyMap(), setup);
}

}

0 comments on commit 3bda9f8

Please sign in to comment.