Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18399: Remove ZooKeeper from KafkaApis (9/N): ALTER_PARTITION_REASSIGNMENTS, LIST_PARTITION_REASSIGNMENTS #18464

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 2 additions & 82 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.server

import kafka.controller.ReplicaAssignment
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
Expand All @@ -37,7 +36,6 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
Expand Down Expand Up @@ -2590,89 +2588,11 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleAlterPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, ALTER)
val alterPartitionReassignmentsRequest = request.body[AlterPartitionReassignmentsRequest]

def sendResponseCallback(result: Either[Map[TopicPartition, ApiError], ApiError]): Unit = {
val responseData = result match {
case Right(topLevelError) =>
new AlterPartitionReassignmentsResponseData().setErrorMessage(topLevelError.message).setErrorCode(topLevelError.error.code)

case Left(assignments) =>
val topicResponses = assignments.groupBy(_._1.topic).map {
case (topic, reassignmentsByTp) =>
val partitionResponses = reassignmentsByTp.map {
case (topicPartition, error) =>
new ReassignablePartitionResponse().setPartitionIndex(topicPartition.partition)
.setErrorCode(error.error.code).setErrorMessage(error.message)
}
new ReassignableTopicResponse().setName(topic).setPartitions(partitionResponses.toList.asJava)
}
new AlterPartitionReassignmentsResponseData().setResponses(topicResponses.toList.asJava)
}

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
)
}

val reassignments = alterPartitionReassignmentsRequest.data.topics.asScala.flatMap {
reassignableTopic => reassignableTopic.partitions.asScala.map {
reassignablePartition =>
val tp = new TopicPartition(reassignableTopic.name, reassignablePartition.partitionIndex)
if (reassignablePartition.replicas == null)
tp -> None // revert call
else
tp -> Some(reassignablePartition.replicas.asScala.map(_.toInt))
}
}.toMap

zkSupport.controller.alterPartitionReassignments(reassignments, sendResponseCallback)
throw KafkaApis.shouldNeverReceive(request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

def handleListPartitionReassignmentsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
authHelper.authorizeClusterOperation(request, DESCRIBE)
val listPartitionReassignmentsRequest = request.body[ListPartitionReassignmentsRequest]

def sendResponseCallback(result: Either[Map[TopicPartition, ReplicaAssignment], ApiError]): Unit = {
val responseData = result match {
case Right(error) => new ListPartitionReassignmentsResponseData().setErrorMessage(error.message).setErrorCode(error.error.code)

case Left(assignments) =>
val topicReassignments = assignments.groupBy(_._1.topic).map {
case (topic, reassignmentsByTp) =>
val partitionReassignments = reassignmentsByTp.map {
case (topicPartition, assignment) =>
new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment()
.setPartitionIndex(topicPartition.partition)
.setAddingReplicas(assignment.addingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
.setRemovingReplicas(assignment.removingReplicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
.setReplicas(assignment.replicas.toList.asJava.asInstanceOf[java.util.List[java.lang.Integer]])
}.toList

new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic)
.setPartitions(partitionReassignments.asJava)
}.toList

new ListPartitionReassignmentsResponseData().setTopics(topicReassignments.asJava)
}

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListPartitionReassignmentsResponse(responseData.setThrottleTimeMs(requestThrottleMs))
)
}

val partitionsOpt = Option(listPartitionReassignmentsRequest.data.topics).map { topics =>
topics.iterator().asScala.flatMap { topic =>
topic.partitionIndexes.iterator().asScala.map { partitionIndex =>
new TopicPartition(topic.name(), partitionIndex)
}
}.toSet
}

zkSupport.controller.listPartitionReassignments(partitionsOpt, sendResponseCallback)
throw KafkaApis.shouldNeverReceive(request)
}

private def configsAuthorizationApiError(resource: ConfigResource): ApiError = {
Expand Down
12 changes: 8 additions & 4 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10611,10 +10611,12 @@ class KafkaApisTest extends Logging {
}

@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
def testRaftShouldAlwaysFailAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleAlterPartitionReassignmentsRequest(request))
assertEquals(s"Should never receive when using a Raft-based metadata quorum: ${request.header.apiKey()}", e.getMessage)
}

@Test
Expand Down Expand Up @@ -10711,10 +10713,12 @@ class KafkaApisTest extends Logging {
}

@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
def testRaftShouldFailListPartitionReassignments(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => kafkaApis.handleListPartitionReassignmentsRequest(request))
assertEquals(s"Should never receive when using a Raft-based metadata quorum: ${request.header.apiKey()}", e.getMessage)
}

@Test
Expand Down
Loading