Skip to content

Commit

Permalink
KAFKA-18073: Prevent dropped records from failed retriable exceptions (
Browse files Browse the repository at this point in the history
…#18146)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
twthorn authored Jan 9, 2025
1 parent 5acbd42 commit b35c294
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ protected <V> V execAndRetry(ProcessingContext<T> context, Operation<V> operatio
errorHandlingMetrics.recordRetry();
} else {
log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline);
context.error(e);
return null;
throw e;
}
if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation as failed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
Expand Down Expand Up @@ -264,7 +265,7 @@ public void testSendRecordsConvertsData() {

assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());

verifyTaskGetTopic();
verifyTopicCreation();
}
Expand Down Expand Up @@ -362,8 +363,8 @@ public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();

createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noopOperator(),
Collections::emptyList);
createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noneOperator(),
Collections::emptyList, transformationChain);

expectSendRecord(null);
expectApplyTransformationChain();
Expand Down Expand Up @@ -706,6 +707,118 @@ public void testSendRecordsRetriableException() {
verify(transformationChain, times(2)).apply(any(), eq(record3));
}

@Test
public void testSendRecordsFailedTransformationErrorToleranceNone() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);

RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
TransformationChain<RetriableException, SourceRecord> transformationChainRetriableException =
WorkerTestUtils.getTransformationChain(retryWithToleranceOperator, List.of(new RetriableException("Test"), record1));
createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator);

expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);

TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));

workerTask.toSend = Arrays.asList(record1);

// The transformation errored out so the error should be re-raised by sendRecords with error tolerance None
Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(exception.getMessage().contains("Tolerance exceeded"));

// Ensure the transformation was called
verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1));

// The second transform call will succeed, batch should succeed at sending the one record (none were skipped)
assertTrue(workerTask.sendRecords());
verifySendRecord(1);
}

@Test
public void testSendRecordsFailedTransformationErrorToleranceAll() {
RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
TransformationChain<RetriableException, SourceRecord> transformationChainRetriableException = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
List.of(new RetriableException("Test")));

createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator);

SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);

expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);

TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));

workerTask.toSend = Arrays.asList(record1);

// The transformation errored out so the error should be ignored & the record skipped with error tolerance all
assertTrue(workerTask.sendRecords());

// Ensure the transformation was called
verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1));
}

@Test
public void testSendRecordsConversionExceptionErrorToleranceNone() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);

RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator();
List<Object> results = Stream.of(record1, record2, record3)
.collect(Collectors.toList());
TransformationChain<RetriableException, SourceRecord> chain = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
results);
createWorkerTask(chain, retryWithToleranceOperator);

// When we try to convert the key/value of each record, throw an exception
throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);

TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));

workerTask.toSend = Arrays.asList(record1, record2, record3);

// Send records should fail when errors.tolerance is none and the conversion call fails
Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords);
assertTrue(exception.getMessage().contains("Tolerance exceeded"));
assertThrows(ConnectException.class, workerTask::sendRecords);
assertThrows(ConnectException.class, workerTask::sendRecords);

// Set the conversion call to succeed, batch should succeed at sending all three records (none were skipped)
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
assertTrue(workerTask.sendRecords());
verifySendRecord(3);
}

@Test
public void testSendRecordsConversionExceptionErrorToleranceAll() {
SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);

RetryWithToleranceOperator<RetriableException> retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator();
List<Object> results = Stream.of(record1, record2, record3)
.collect(Collectors.toList());
TransformationChain<RetriableException, SourceRecord> chain = WorkerTestUtils.getTransformationChain(
retryWithToleranceOperator,
results);
createWorkerTask(chain, retryWithToleranceOperator);

// When we try to convert the key/value of each record, throw an exception
throwExceptionWhenConvertKey(emptyHeaders(), TOPIC);

workerTask.toSend = Arrays.asList(record1, record2, record3);

// With errors.tolerance to all, the faiiled conversion should simply skip the record, and record successful batch
assertTrue(workerTask.sendRecords());
}

private void expectSendRecord(Headers headers) {
if (headers != null)
expectConvertHeadersAndKeyValue(headers, TOPIC);
Expand Down Expand Up @@ -806,6 +919,20 @@ private void expectConvertHeadersAndKeyValue(Headers headers, String topic) {
assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD);
}

private void throwExceptionWhenConvertKey(Headers headers, String topic) {
if (headers.iterator().hasNext()) {
when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA),
anyString()))
.thenAnswer((Answer<byte[]>) invocation -> {
String headerValue = invocation.getArgument(3, String.class);
return headerValue.getBytes(StandardCharsets.UTF_8);
});
}

when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY)))
.thenThrow(new RetriableException("Failed to convert key"));
}

private void expectApplyTransformationChain() {
when(transformationChain.apply(any(), any(SourceRecord.class)))
.thenAnswer(AdditionalAnswers.returnsSecondArg());
Expand All @@ -817,12 +944,19 @@ private RecordHeaders emptyHeaders() {
return new RecordHeaders();
}

private void createWorkerTask(TransformationChain transformationChain, RetryWithToleranceOperator toleranceOperator) {
createWorkerTask(keyConverter, valueConverter, headerConverter, toleranceOperator, Collections::emptyList,
transformationChain);
}

private void createWorkerTask() {
createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList);
createWorkerTask(
keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, transformationChain);
}

private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
TransformationChain transformationChain) {
workerTask = new AbstractWorkerSourceTask(
taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain,
sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void createWorkerTask(TargetState initialState) {
private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noopOperator(), statusBackingStore,
config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList);
}

Expand Down
Loading

0 comments on commit b35c294

Please sign in to comment.