From f5fdeb8b77614cb58467eac06e07d0986fa95040 Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Mon, 6 Jan 2025 14:30:29 +0800 Subject: [PATCH] [Bug][Kafka] kafka reads repeatedly --- .../seatunnel/kafka/source/KafkaRecordEmitter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java index 7d4f38a3cdf..b3ca28ca034 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java @@ -67,9 +67,6 @@ public void emitRecord( } else { deserializationSchema.deserialize(consumerRecord.value(), outputCollector); } - // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset - // for the next run - splitState.setCurrentOffset(consumerRecord.offset() + 1); } catch (Exception e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) { logger.warn( @@ -79,6 +76,9 @@ public void emitRecord( throw e; } } + // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset + // for the next run + splitState.setCurrentOffset(consumerRecord.offset() + 1); } private static class OutputCollector implements Collector {