From b35c29401acb12c7a2b8a450c5164cddf82361c6 Mon Sep 17 00:00:00 2001 From: Thomas Thornton Date: Thu, 9 Jan 2025 13:13:11 -0500 Subject: [PATCH] KAFKA-18073: Prevent dropped records from failed retriable exceptions (#18146) Reviewers: Greg Harris --- .../errors/RetryWithToleranceOperator.java | 3 +- .../runtime/AbstractWorkerSourceTaskTest.java | 144 +++++++++++++++++- .../ExactlyOnceWorkerSourceTaskTest.java | 2 +- .../connect/runtime/WorkerSinkTaskTest.java | 122 ++++++++++++++- .../runtime/WorkerSinkTaskThreadedTest.java | 2 +- .../connect/runtime/WorkerSourceTaskTest.java | 4 +- .../connect/runtime/WorkerTestUtils.java | 39 +++++ .../RetryWithToleranceOperatorTest.java | 55 ++++--- 8 files changed, 340 insertions(+), 31 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java index 28886b3557c7a..2b9ba9fc5b7b9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java @@ -214,8 +214,7 @@ protected V execAndRetry(ProcessingContext context, Operation operatio errorHandlingMetrics.recordRetry(); } else { log.trace("Can't retry. start={}, attempt={}, deadline={}", startTime, attempt, deadline); - context.error(e); - return null; + throw e; } if (stopping) { log.trace("Shutdown has been scheduled. Marking operation as failed."); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 0cb4db7064726..f33e9bc514b6c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -80,6 +80,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -264,7 +265,7 @@ public void testSendRecordsConvertsData() { assertArrayEquals(SERIALIZED_KEY, sent.getValue().key()); assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value()); - + verifyTaskGetTopic(); verifyTopicCreation(); } @@ -362,8 +363,8 @@ public void testHeadersWithCustomConverter() throws Exception { StringConverter stringConverter = new StringConverter(); SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); - createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noopOperator(), - Collections::emptyList); + createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.noneOperator(), + Collections::emptyList, transformationChain); expectSendRecord(null); expectApplyTransformationChain(); @@ -706,6 +707,118 @@ public void testSendRecordsRetriableException() { verify(transformationChain, times(2)).apply(any(), eq(record3)); } + @Test + public void testSendRecordsFailedTransformationErrorToleranceNone() { + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator(); + TransformationChain transformationChainRetriableException = + WorkerTestUtils.getTransformationChain(retryWithToleranceOperator, List.of(new RetriableException("Test"), record1)); + createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator); + + expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); + + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); + TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + + workerTask.toSend = Arrays.asList(record1); + + // The transformation errored out so the error should be re-raised by sendRecords with error tolerance None + Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords); + assertTrue(exception.getMessage().contains("Tolerance exceeded")); + + // Ensure the transformation was called + verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1)); + + // The second transform call will succeed, batch should succeed at sending the one record (none were skipped) + assertTrue(workerTask.sendRecords()); + verifySendRecord(1); + } + + @Test + public void testSendRecordsFailedTransformationErrorToleranceAll() { + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator(); + TransformationChain transformationChainRetriableException = WorkerTestUtils.getTransformationChain( + retryWithToleranceOperator, + List.of(new RetriableException("Test"))); + + createWorkerTask(transformationChainRetriableException, retryWithToleranceOperator); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); + + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); + TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + + workerTask.toSend = Arrays.asList(record1); + + // The transformation errored out so the error should be ignored & the record skipped with error tolerance all + assertTrue(workerTask.sendRecords()); + + // Ensure the transformation was called + verify(transformationChainRetriableException, times(1)).apply(any(), eq(record1)); + } + + @Test + public void testSendRecordsConversionExceptionErrorToleranceNone() { + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator(); + List results = Stream.of(record1, record2, record3) + .collect(Collectors.toList()); + TransformationChain chain = WorkerTestUtils.getTransformationChain( + retryWithToleranceOperator, + results); + createWorkerTask(chain, retryWithToleranceOperator); + + // When we try to convert the key/value of each record, throw an exception + throwExceptionWhenConvertKey(emptyHeaders(), TOPIC); + + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); + TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + + workerTask.toSend = Arrays.asList(record1, record2, record3); + + // Send records should fail when errors.tolerance is none and the conversion call fails + Exception exception = assertThrows(ConnectException.class, workerTask::sendRecords); + assertTrue(exception.getMessage().contains("Tolerance exceeded")); + assertThrows(ConnectException.class, workerTask::sendRecords); + assertThrows(ConnectException.class, workerTask::sendRecords); + + // Set the conversion call to succeed, batch should succeed at sending all three records (none were skipped) + expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); + assertTrue(workerTask.sendRecords()); + verifySendRecord(3); + } + + @Test + public void testSendRecordsConversionExceptionErrorToleranceAll() { + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator(); + List results = Stream.of(record1, record2, record3) + .collect(Collectors.toList()); + TransformationChain chain = WorkerTestUtils.getTransformationChain( + retryWithToleranceOperator, + results); + createWorkerTask(chain, retryWithToleranceOperator); + + // When we try to convert the key/value of each record, throw an exception + throwExceptionWhenConvertKey(emptyHeaders(), TOPIC); + + workerTask.toSend = Arrays.asList(record1, record2, record3); + + // With errors.tolerance to all, the faiiled conversion should simply skip the record, and record successful batch + assertTrue(workerTask.sendRecords()); + } + private void expectSendRecord(Headers headers) { if (headers != null) expectConvertHeadersAndKeyValue(headers, TOPIC); @@ -806,6 +919,20 @@ private void expectConvertHeadersAndKeyValue(Headers headers, String topic) { assertEquals(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD), SERIALIZED_RECORD); } + private void throwExceptionWhenConvertKey(Headers headers, String topic) { + if (headers.iterator().hasNext()) { + when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA), + anyString())) + .thenAnswer((Answer) invocation -> { + String headerValue = invocation.getArgument(3, String.class); + return headerValue.getBytes(StandardCharsets.UTF_8); + }); + } + + when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY))) + .thenThrow(new RetriableException("Failed to convert key")); + } + private void expectApplyTransformationChain() { when(transformationChain.apply(any(), any(SourceRecord.class))) .thenAnswer(AdditionalAnswers.returnsSecondArg()); @@ -817,12 +944,19 @@ private RecordHeaders emptyHeaders() { return new RecordHeaders(); } + private void createWorkerTask(TransformationChain transformationChain, RetryWithToleranceOperator toleranceOperator) { + createWorkerTask(keyConverter, valueConverter, headerConverter, toleranceOperator, Collections::emptyList, + transformationChain); + } + private void createWorkerTask() { - createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList); + createWorkerTask( + keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, transformationChain); } private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, - RetryWithToleranceOperator retryWithToleranceOperator, Supplier>> errorReportersSupplier) { + RetryWithToleranceOperator retryWithToleranceOperator, Supplier>> errorReportersSupplier, + TransformationChain transformationChain) { workerTask = new AbstractWorkerSourceTask( taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain, sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index be3dc2401ad69..4ee0f61572cdd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -278,7 +278,7 @@ private void createWorkerTask(TargetState initialState) { private void createWorkerTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, - config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noopOperator(), statusBackingStore, + config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore, sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index b3e0189f987f6..4e91183fd3125 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -90,6 +90,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static org.apache.kafka.connect.runtime.WorkerTestUtils.getTransformationChain; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -192,13 +193,18 @@ private void createTask(TargetState initialState) { createTask(initialState, keyConverter, valueConverter, headerConverter); } + private void createTask(TargetState initialState, TransformationChain transformationChain, RetryWithToleranceOperator toleranceOperator) { + createTask(initialState, keyConverter, valueConverter, headerConverter, toleranceOperator, Collections::emptyList, transformationChain); + } + private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { - createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList); + createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.noneOperator(), Collections::emptyList, transformationChain); } private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, RetryWithToleranceOperator> retryWithToleranceOperator, - Supplier>>> errorReportersSupplier) { + Supplier>>> errorReportersSupplier, + TransformationChain transformationChain) { workerTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, @@ -854,6 +860,103 @@ public void testWakeupNotThrownDuringShutdown() { verify(sinkTask).close(any(Collection.class)); } + @Test + public void testRaisesFailedRetriableExceptionFromConvert() { + createTask(initialState); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(invocation -> { + // stop the task during its second iteration + workerTask.stop(); + return new ConsumerRecords<>(Map.of(), Map.of()); + }); + throwExceptionOnConversion(null, new RecordHeaders()); + + workerTask.iteration(); + + assertThrows(ConnectException.class, workerTask::execute); + } + + @Test + public void testSkipsFailedRetriableExceptionFromConvert() { + createTask(initialState, keyConverter, valueConverter, headerConverter, + RetryWithToleranceOperatorTest.allOperator(), Collections::emptyList, transformationChain); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(invocation -> { + // stop the task during its second iteration + workerTask.stop(); + return new ConsumerRecords<>(Map.of(), Map.of()); + }); + throwExceptionOnConversion(null, new RecordHeaders()); + + workerTask.iteration(); + workerTask.execute(); + + verify(sinkTask, times(3)).put(Collections.emptyList()); + } + + @Test + public void testRaisesFailedRetriableExceptionFromTransform() { + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.noneOperator(); + TransformationChain transformationChainRetriableException = getTransformationChain( + retryWithToleranceOperator, List.of(new RetriableException("Test"))); + createTask(initialState, transformationChainRetriableException, retryWithToleranceOperator); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(invocation -> { + // stop the task during its second iteration + workerTask.stop(); + return new ConsumerRecords<>(Map.of(), Map.of()); + }); + expectConversion(null, new RecordHeaders()); + + workerTask.iteration(); + + assertThrows(ConnectException.class, workerTask::execute); + } + + @Test + public void testSkipsFailedRetriableExceptionFromTransform() { + RetryWithToleranceOperator retryWithToleranceOperator = RetryWithToleranceOperatorTest.allOperator(); + TransformationChain transformationChainRetriableException = getTransformationChain( + retryWithToleranceOperator, List.of(new RetriableException("Test"))); + createTask(initialState, transformationChainRetriableException, retryWithToleranceOperator); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + expectPollInitialAssignment() + .thenAnswer(expectConsumerPoll(1)) + .thenAnswer(invocation -> { + // stop the task during its second iteration + workerTask.stop(); + return new ConsumerRecords<>(Map.of(), Map.of()); + }); + expectConversion(null, new RecordHeaders()); + + workerTask.iteration(); + workerTask.execute(); + + verify(sinkTask, times(3)).put(Collections.emptyList()); + } + @Test public void testRequestCommit() { createTask(initialState); @@ -1758,7 +1861,7 @@ public void testPartitionCountInCaseOfPartitionRevocation() { taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, transformationChain, mockConsumer, pluginLoader, time, - RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, Collections::emptyList); + RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore, Collections::emptyList); mockConsumer.updateBeginningOffsets( new HashMap<>() {{ put(TOPIC_PARTITION, 0L); @@ -1852,6 +1955,19 @@ private void expectConversionAndTransformation(final String topicPrefix, final H expectTransformation(topicPrefix); } + private void expectConversion(final String topicPrefix, final Headers headers) { + when(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); + when(valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); + + for (Header header : headers) { + when(headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value()))); + } + } + + private void throwExceptionOnConversion(final String topicPrefix, final Headers headers) { + when(keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenThrow(new RetriableException("Failed to convert")); + } + @SuppressWarnings("unchecked") private void expectTransformation(final String topicPrefix) { when(transformationChain.apply(any(ProcessingContext.class), any(SinkRecord.class))).thenAnswer((Answer) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 67978760e7bd3..2ed01a747a726 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -177,7 +177,7 @@ public void setup() { workerTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, transformationChain, - consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noopOperator(), null, statusBackingStore, + consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore, Collections::emptyList); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 3ddbf164494c4..a04b3bc7caa56 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -231,7 +231,7 @@ public void tearDown() { } private void createWorkerTask() { - createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.noopOperator()); + createWorkerTask(TargetState.STARTED, RetryWithToleranceOperatorTest.noneOperator()); } private void createWorkerTaskWithErrorToleration() { @@ -239,7 +239,7 @@ private void createWorkerTaskWithErrorToleration() { } private void createWorkerTask(TargetState initialState) { - createWorkerTask(initialState, RetryWithToleranceOperatorTest.noopOperator()); + createWorkerTask(initialState, RetryWithToleranceOperatorTest.noneOperator()); } private void createWorkerTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index 06b0e3fb55cc4..06c3a42b64f8d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; + import java.util.AbstractMap.SimpleEntry; import java.util.Collections; import java.util.HashMap; @@ -31,6 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class WorkerTestUtils { @@ -155,4 +165,33 @@ public static void assertAssignment(boolean expectFailed, assertEquals(expectedDelay, assignment.delay(), "Wrong rebalance delay in " + assignment); } + + public static > TransformationChain getTransformationChain( + RetryWithToleranceOperator toleranceOperator, + List results) { + Transformation transformation = mock(Transformation.class); + OngoingStubbing stub = when(transformation.apply(any())); + for (Object result: results) { + if (result instanceof Exception) { + stub = stub.thenThrow((Exception) result); + } else { + stub = stub.thenReturn((R) result); + } + } + return buildTransformationChain(transformation, toleranceOperator); + } + + public static > TransformationChain buildTransformationChain( + Transformation transformation, + RetryWithToleranceOperator toleranceOperator) { + Predicate predicate = mock(Predicate.class); + when(predicate.test(any())).thenReturn(true); + TransformationStage stage = new TransformationStage( + predicate, + false, + transformation); + TransformationChain realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator); + TransformationChain transformationChainRetriableException = Mockito.spy(realTransformationChainRetriableException); + return transformationChainRetriableException; + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index dfa4aa353fe0c..23c4bc25553c6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -97,10 +97,10 @@ public class RetryWithToleranceOperatorTest { put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); }}; - public static RetryWithToleranceOperator noopOperator() { + public static RetryWithToleranceOperator noneOperator() { return genericOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, NONE, new ErrorHandlingMetrics( - new ConnectorTaskId("noop-connector", -1), - new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES), + new ConnectorTaskId("errors-none-tolerate-connector", -1), + new ConnectMetrics("errors-none-tolerate-worker", new TestableWorkerConfig(PROPERTIES), Time.SYSTEM, "test-cluster"))); } @@ -147,56 +147,77 @@ public void testExecuteFailedNoTolerance() { @Test public void testHandleExceptionInTransformations() { - testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception()); + testHandleExceptionInStage(Stage.TRANSFORMATION, new Exception(), ALL); } + @Test + public void testHandleRetriableExceptionInTransformationsToleranceNone() { + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TRANSFORMATION, new RetriableException("Test"), NONE)); + } + + @Test public void testHandleExceptionInHeaderConverter() { - testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception()); + testHandleExceptionInStage(Stage.HEADER_CONVERTER, new Exception(), ALL); + } + + @Test + public void testHandleRetriableExceptionInHeaderConverterToleranceNone() { + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.HEADER_CONVERTER, new RetriableException("Test"), NONE)); } @Test public void testHandleExceptionInValueConverter() { - testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception()); + testHandleExceptionInStage(Stage.VALUE_CONVERTER, new Exception(), ALL); + } + + @Test + public void testHandleRetriableExceptionInValueConverterToleranceNone() { + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.VALUE_CONVERTER, new RetriableException("Test"), NONE)); } @Test public void testHandleExceptionInKeyConverter() { - testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception()); + testHandleExceptionInStage(Stage.KEY_CONVERTER, new Exception(), ALL); + } + + @Test + public void testHandleRetriableExceptionInKeyConverterToleranceNone() { + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KEY_CONVERTER, new RetriableException("Test"), NONE)); } @Test public void testHandleExceptionInTaskPut() { - testHandleExceptionInStage(Stage.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test")); + testHandleExceptionInStage(Stage.TASK_PUT, new org.apache.kafka.connect.errors.RetriableException("Test"), ALL); } @Test public void testHandleExceptionInTaskPoll() { - testHandleExceptionInStage(Stage.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test")); + testHandleExceptionInStage(Stage.TASK_POLL, new org.apache.kafka.connect.errors.RetriableException("Test"), ALL); } @Test public void testThrowExceptionInTaskPut() { - assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_PUT, new Exception())); + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_PUT, new Exception(), ALL)); } @Test public void testThrowExceptionInTaskPoll() { - assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_POLL, new Exception())); + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.TASK_POLL, new Exception(), ALL)); } @Test public void testThrowExceptionInKafkaConsume() { - assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception())); + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_CONSUME, new Exception(), ALL)); } @Test public void testThrowExceptionInKafkaProduce() { - assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception())); + assertThrows(ConnectException.class, () -> testHandleExceptionInStage(Stage.KAFKA_PRODUCE, new Exception(), ALL)); } - private void testHandleExceptionInStage(Stage type, Exception ex) { - RetryWithToleranceOperator> retryWithToleranceOperator = setupExecutor(); + private void testHandleExceptionInStage(Stage type, Exception ex, ToleranceType toleranceType) { + RetryWithToleranceOperator> retryWithToleranceOperator = setupExecutor(toleranceType); ProcessingContext> context = new ProcessingContext<>(consumerRecord); Operation exceptionThrower = () -> { throw ex; @@ -205,8 +226,8 @@ private void testHandleExceptionInStage(Stage type, Exception ex) { assertTrue(context.failed()); } - private RetryWithToleranceOperator setupExecutor() { - return genericOperator(0, ALL, errorHandlingMetrics); + private RetryWithToleranceOperator setupExecutor(ToleranceType toleranceType) { + return genericOperator(0, toleranceType, errorHandlingMetrics); } @Test