Skip to content

Commit

Permalink
KAFKA-18440: Convert AuthorizationException to fatal error in AdminCl…
Browse files Browse the repository at this point in the history
…ient (#18435)

Reviewers: Divij Vaidya <[email protected]>
  • Loading branch information
FrankYang0529 authored Jan 10, 2025
1 parent c6f2276 commit 2b7c039
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -277,23 +276,21 @@ public void updateFailed(Throwable exception) {
// We depend on pending calls to request another metadata update
this.state = State.QUIESCENT;

if (exception instanceof AuthenticationException) {
log.warn("Metadata update failed due to authentication error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof MismatchedEndpointTypeException) {
log.warn("Metadata update failed due to mismatched endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedEndpointTypeException) {
log.warn("Metadata update failed due to unsupported endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedVersionException) {
if (usingBootstrapControllers) {
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
"DESCRIBE_CLUSTER api.", exception);
} else {
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
if (RequestUtils.isFatalException(exception)) {
log.warn("Fatal error during metadata update", exception);
// avoid unchecked/unconfirmed cast to ApiException
if (exception instanceof ApiException) {
this.fatalException = (ApiException) exception;
}

if (exception instanceof UnsupportedVersionException) {
if (usingBootstrapControllers) {
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
"DESCRIBE_CLUSTER api.", exception);
} else {
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
}
}
this.fatalException = (ApiException) exception;
} else {
log.info("Metadata update failed", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
Expand Down Expand Up @@ -77,4 +84,14 @@ public static ByteBuffer serialize(
writable.flip();
return writable.buffer();
}

public static boolean isFatalException(Throwable e) {
return e instanceof AuthenticationException ||
e instanceof AuthorizationException ||
e instanceof MismatchedEndpointTypeException ||
e instanceof SecurityDisabledException ||
e instanceof UnsupportedVersionException ||
e instanceof UnsupportedEndpointTypeException ||
e instanceof UnsupportedForMessageFormatException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;

Expand Down Expand Up @@ -98,6 +99,16 @@ public void testAuthenticationFailure() {
assertTrue(mgr.isReady());
}

@Test
public void testAuthorizationFailure() {
mgr.transitionToUpdatePending(time.milliseconds());
mgr.updateFailed(new AuthorizationException("Authorization failed"));
assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
assertThrows(AuthorizationException.class, mgr::isReady);
mgr.update(mockCluster(), time.milliseconds());
assertTrue(mgr.isReady());
}

@Test
public void testNeedsRebootstrap() {
long rebootstrapTriggerMs = 1000;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RequestUtilsTest {
@Test
public void testIsFatalException() {
assertTrue(RequestUtils.isFatalException(new AuthenticationException("")));
assertTrue(RequestUtils.isFatalException(new AuthorizationException("")));
assertTrue(RequestUtils.isFatalException(new MismatchedEndpointTypeException("")));
assertTrue(RequestUtils.isFatalException(new SecurityDisabledException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedEndpointTypeException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedForMessageFormatException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedVersionException("")));

// retriable exceptions
assertFalse(RequestUtils.isFatalException(new DisconnectException("")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -35,7 +34,6 @@
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -464,12 +462,12 @@ public void testSaslPlaintext(ClusterInstance clusterInstance) throws Cancellati
}
)
public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
// test with admin
// default ClusterInstance#admin helper with admin credentials
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
admin.describeAcls(AclBindingFilter.ANY).values().get();
}

// test with non-admin
// client with non-admin credentials
Map<String, Object> nonAdminConfig = Map.of(
SaslConfigs.SASL_JAAS_CONFIG,
String.format(
Expand All @@ -480,9 +478,25 @@ public void testSaslPlaintextWithController(ClusterInstance clusterInstance) thr
try (Admin admin = clusterInstance.admin(nonAdminConfig, true)) {
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get()
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
);
assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
}

// client with unknown credentials
Map<String, Object> unknownUserConfig = Map.of(
SaslConfigs.SASL_JAAS_CONFIG,
String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
"unknown", "unknown"
)
);
try (Admin admin = clusterInstance.admin(unknownUserConfig)) {
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
);
assertInstanceOf(TimeoutException.class, exception.getCause());
assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
}
}
}

0 comments on commit 2b7c039

Please sign in to comment.