Skip to content

Commit

Permalink
KAFKA-18315 Fix to Kraft or remove tests associate with Zk Broker con…
Browse files Browse the repository at this point in the history
…fig in DynamicBrokerConfigTest, ReplicaManagerTest, DescribeTopicPartitionsRequestHandlerTest, KafkaConfigTest (#18269)

Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Dec 29, 2024
1 parent e47f698 commit 1156d5c
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.authorizer.Action;
Expand Down Expand Up @@ -534,7 +535,7 @@ KafkaConfig createKafkaDefaultConfig() {
int brokerId = 1;
Properties properties = TestUtils.createBrokerConfig(
brokerId,
"",
null,
true,
true,
TestUtils.RandomPort(),
Expand All @@ -559,6 +560,7 @@ KafkaConfig createKafkaDefaultConfig() {
int voterId = brokerId + 1;
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL");
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
return new KafkaConfig(properties);
}
Expand Down
53 changes: 16 additions & 37 deletions core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.types.Password
Expand Down Expand Up @@ -203,7 +202,7 @@ class DynamicBrokerConfigTest {

@Test
def testUpdateRemoteLogManagerDynamicThreadPool(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
val config = KafkaConfig(origProps)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize())
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, config.remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize())
Expand Down Expand Up @@ -242,7 +241,7 @@ class DynamicBrokerConfigTest {

@Test
def testRemoteLogDynamicThreadPoolWithInvalidValues(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
val config = KafkaConfig(origProps)

val serverMock = mock(classOf[KafkaBroker])
Expand Down Expand Up @@ -450,54 +449,52 @@ class DynamicBrokerConfigTest {
}

@Test
def testPasswordConfigEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
def testPasswordConfigNotEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
val password = "myLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)

try {
configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
} catch {
case _: ConfigException => // expected exception
}
val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertFalse(persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG).contains("myLoginModule"),
"Password not encoded")
val decodedProps = configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, perBrokerConfig = true)
assertEquals("myLoginModule required;", decodedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
}

@Test
def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.put(SaslConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val config = KafkaConfig(props)
config.dynamicConfig.initialize(None, None)
val dynamicProps = new Properties
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, "dynamicLoginModule required;")
val password = "dynamicLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)

val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertFalse(persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG).contains("LoginModule"),
"Password not encoded")
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
config.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
assertEquals(password, config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with same secret should use the dynamic password config
val newConfigWithSameSecret = KafkaConfig(props)
newConfigWithSameSecret.dynamicConfig.initialize(None, None)
newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
Expand All @@ -508,12 +505,12 @@ class DynamicBrokerConfigTest {

@Test
def testDynamicListenerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
val oldConfig = KafkaConfig.fromProps(props)
val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
when(kafkaServer.config).thenReturn(oldConfig)

props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092")
new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))

// it is illegal to update non-reconfiguable configs of existent listeners
Expand Down Expand Up @@ -680,24 +677,6 @@ class DynamicBrokerConfigTest {
DynamicBrokerConfig.brokerConfigSynonyms(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, matchListenerOverride = true))
}

@Test
def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties())

val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
initialProps.remove(ServerConfigs.BACKGROUND_THREADS_CONFIG)
val oldConfig = KafkaConfig.fromProps(initialProps)
val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
dynamicBrokerConfig.initialize(Some(zkClient), None)
dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)

val newprops = new Properties()
newprops.put(ServerConfigs.NUM_IO_THREADS_CONFIG, "10")
newprops.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "100")
dynamicBrokerConfig.updateBrokerConfig(0, newprops)
}

@Test
def testImproperConfigsAreRemoved(): Unit = {
val props = TestUtils.createBrokerConfig(0, null)
Expand Down
Loading

0 comments on commit 1156d5c

Please sign in to comment.