From 65e229c41f920e8d71be626e087400a397612541 Mon Sep 17 00:00:00 2001 From: Carl-Zhou-CN <67902676+Carl-Zhou-CN@users.noreply.github.com> Date: Wed, 4 Sep 2024 14:08:37 +0800 Subject: [PATCH] fix --- .../connectors/seatunnel/kafka/source/KafkaRecordEmitter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java index d8a1b96de83..6593137aff7 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java @@ -68,6 +68,8 @@ public void emitRecord( } else { deserializationSchema.deserialize(consumerRecord.value(), outputCollector); } + // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset + // for the next run splitState.setCurrentOffset(consumerRecord.offset() + 1); } catch (IOException e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {