Skip to content

Commit

Permalink
[Bug][Kafka] kafka reads repeatedly
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Jan 6, 2025
1 parent d72b3c8 commit f5fdeb8
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ public void emitRecord(
} else {
deserializationSchema.deserialize(consumerRecord.value(), outputCollector);
}
// consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
// for the next run
splitState.setCurrentOffset(consumerRecord.offset() + 1);
} catch (Exception e) {
if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
logger.warn(
Expand All @@ -79,6 +76,9 @@ public void emitRecord(
throw e;
}
}
// consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
// for the next run
splitState.setCurrentOffset(consumerRecord.offset() + 1);
}

private static class OutputCollector<T> implements Collector<T> {
Expand Down

0 comments on commit f5fdeb8

Please sign in to comment.