Skip to content

Commit

Permalink
KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consu…
Browse files Browse the repository at this point in the history
…mer HB (#18101)

Add specific error handling for unsupported version in share consumer and consumer

Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield <[email protected]>
  • Loading branch information
peterxcli authored Dec 28, 2024
1 parent bc7a1a8 commit be4d1a6
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -317,31 +315,14 @@ private void onFailure(final Throwable exception, final long responseTimeMs) {
heartbeatRequestState.remainingBackoffMs(responseTimeMs),
exception.getMessage());
logger.debug(message);
} else {
} else if (!handleSpecificFailure(exception)) {
logger.error("{} failed due to fatal error: {}", heartbeatRequestName(), exception.getMessage());
if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));
} else {
// This is the case where building the request fails even though the node supports the API (ex.
// required version 1 not available when regex in use).
handleFatalFailure(exception);
}
handleFatalFailure(exception);
}
// Notify the group manager about the failure after all errors have been handled and propagated.
membershipManager().onHeartbeatFailure(exception instanceof RetriableException);
}

/***
* @return True if the exception is the UnsupportedVersion generated on the client, before sending the request,
* when checking if the API is available on the broker.
*/
private boolean isHBApiUnsupportedErrorMsg(Throwable exception) {
return exception instanceof UnsupportedVersionException &&
exception.getMessage().equals("The node does not support " + ApiKeys.CONSUMER_GROUP_HEARTBEAT);
}

private void onResponse(final R response, final long currentTimeMs) {
if (errorForResponse(response) == Errors.NONE) {
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
Expand Down Expand Up @@ -404,14 +385,6 @@ private void onErrorResponse(final R response, final long currentTimeMs) {
handleFatalFailure(error.exception(errorMessage));
break;

case UNSUPPORTED_VERSION:
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
break;

case FENCED_MEMBER_EPOCH:
message = String.format("%s failed for member %s because epoch %s is fenced.",
heartbeatRequestName(), membershipManager().memberId(), membershipManager().memberEpoch());
Expand All @@ -437,7 +410,7 @@ private void onErrorResponse(final R response, final long currentTimeMs) {
break;

default:
if (!handleSpecificError(response, currentTimeMs)) {
if (!handleSpecificExceptionInResponse(response, currentTimeMs)) {
// If the manager receives an unknown error - there could be a bug in the code or a new error code
logger.error("{} failed due to unexpected error {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
Expand All @@ -461,15 +434,25 @@ protected void handleFatalFailure(Throwable error) {
membershipManager().transitionToFatal();
}

/**
* Error handling specific failure to a group type when sending the request
* and no response has been received.
*
* @param exception The exception thrown building the request
* @return true if the error was handled, else false
*/
public boolean handleSpecificFailure(Throwable exception) {
return false;
}

/**
* Error handling specific to a group type.
* Error handling specific response exception to a group type.
*
* @param response The heartbeat response
* @param currentTimeMs Current time
* @return true if the error was handled, else false
*/
public boolean handleSpecificError(final R response, final long currentTimeMs) {
public boolean handleSpecificExceptionInResponse(final R response, final long currentTimeMs) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -38,6 +39,8 @@
import java.util.TreeSet;
import java.util.stream.Collectors;

import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;

/**
* This is the heartbeat request manager for consumer groups.
*
Expand Down Expand Up @@ -91,12 +94,43 @@ public ConsumerHeartbeatRequestManager(
* {@inheritDoc}
*/
@Override
public boolean handleSpecificError(final ConsumerGroupHeartbeatResponse response, final long currentTimeMs) {
public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false;
String errorMessage = exception.getMessage();
if (exception instanceof UnsupportedVersionException) {
String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
if (errorMessage.equals(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
message = REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
logger.error("{} regex resolution not supported: {}", heartbeatRequestName(), message);
} else {
logger.error("{} failed due to unsupported version while sending request: {}", heartbeatRequestName(), errorMessage);
}
handleFatalFailure(new UnsupportedVersionException(message, exception));
errorHandled = true;
}
return errorHandled;
}

/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificExceptionInResponse(final ConsumerGroupHeartbeatResponse response, final long currentTimeMs) {
Errors error = errorForResponse(response);
String errorMessage = errorMessageForResponse(response);
boolean errorHandled;

switch (error) {
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
case UNSUPPORTED_VERSION:
logger.error("{} failed due to unsupported version response on broker side: {}",
heartbeatRequestName(), CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
errorHandled = true;
break;

case UNRELEASED_INSTANCE_ID:
logger.error("{} failed due to unreleased instance id {}: {}",
heartbeatRequestName(), membershipManager.groupInstanceId().orElse("null"), errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
*/
private final HeartbeatState heartbeatState;

public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " +
"To use share groups, the cluster must have the share group protocol enabled.";

public ShareHeartbeatRequestManager(
final LogContext logContext,
final Time time,
Expand Down Expand Up @@ -82,6 +86,45 @@ public ShareHeartbeatRequestManager(
this.heartbeatState = heartbeatState;
}

/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false;
if (exception instanceof UnsupportedVersionException) {
logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception));
errorHandled = true;
}
return errorHandled;
}

/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificExceptionInResponse(final ShareGroupHeartbeatResponse response, final long currentTimeMs) {
Errors error = errorForResponse(response);
boolean errorHandled;

switch (error) {
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
case UNSUPPORTED_VERSION:
logger.error("{} failed due to unsupported version: {}",
heartbeatRequestName(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(error.exception(SHARE_PROTOCOL_NOT_SUPPORTED_MSG));
errorHandled = true;
break;

default:
errorHandled = false;
}
return errorHandled;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,25 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
* 2. Required HB API version is not available.
*/
@ParameterizedTest
@ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionFromBroker(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), true);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}

/**
* This validates the UnsupportedApiVersion the client generates while building a HB if:
* REGEX_RESOLUTION_NOT_SUPPORTED_MSG only generated on the client side.
*/
@ParameterizedTest
@ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, REGEX_RESOLUTION_NOT_SUPPORTED_MSG})
public void testUnsupportedVersion(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg));
public void testUnsupportedVersionFromClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), false);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
Expand All @@ -633,14 +649,14 @@ private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
result.unsentRequests.get(0).handler().onComplete(response);
}

private void mockResponseWithException(UnsupportedVersionException exception) {
private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
ClientResponse response = createHeartbeatResponseWithException(
result.unsentRequests.get(0), exception);
result.unsentRequests.get(0), exception, isFromBroker);
result.unsentRequests.get(0).handler().onComplete(response);
}

Expand Down Expand Up @@ -1044,9 +1060,13 @@ private ClientResponse createHeartbeatResponse(

private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception
final UnsupportedVersionException exception,
final boolean isFromBroker
) {
ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(null);
ConsumerGroupHeartbeatResponse response = null;
if (isFromBroker) {
response = new ConsumerGroupHeartbeatResponse(null);
}
return new ClientResponse(
new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.handler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand All @@ -67,6 +69,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -363,7 +366,7 @@ public void testNoCoordinator() {
@ParameterizedTest
@MethodSource("errorProvider")
public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) {
// Handling errors on the second heartbeat
// Handling errors on the second heartbeat
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
Expand Down Expand Up @@ -422,6 +425,46 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
}
}

@ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheBroker(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), true);

ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}

@ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), false);

ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}

private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

// Manually completing the response to test error handling
when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
ClientResponse response = createHeartbeatResponseWithException(
result.unsentRequests.get(0),
exception,
isFromBroker);
result.unsentRequests.get(0).handler().onComplete(response);
}

@Test
public void testHeartbeatState() {
mockJoiningMemberData();
Expand Down Expand Up @@ -646,6 +689,27 @@ private ClientResponse createHeartbeatResponse(
response);
}

private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception,
final boolean isFromClient
) {
ShareGroupHeartbeatResponse response = null;
if (!isFromClient) {
response = new ShareGroupHeartbeatResponse(null);
}
return new ClientResponse(
new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.handler(),
"0",
time.milliseconds(),
time.milliseconds(),
false,
exception,
null,
response);
}

private ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down

0 comments on commit be4d1a6

Please sign in to comment.