Skip to content

Commit

Permalink
[BugFix][Kafka] fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN committed Aug 8, 2024
1 parent 36ce947 commit c6f8cc2
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class ConsumerMetadata implements Serializable {
private String topic;
private boolean isPattern = false;
private Properties properties;
private String consumerGroup;
private StartMode startMode = StartMode.GROUP_OFFSETS;
private Map<TopicPartition, Long> specificStartOffsets;
private Long startOffsetsTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class KafkaPartitionSplitReader

private static final long POLL_TIMEOUT = 10000L;
private static final String CLIENT_ID_PREFIX = "seatunnel";
private final ConsumerMetadata metadata;
private final KafkaSourceConfig kafkaSourceConfig;

private final KafkaConsumer<byte[], byte[]> consumer;

Expand All @@ -74,11 +74,13 @@ public class KafkaPartitionSplitReader

private final Set<String> emptySplits = new HashSet<>();

public KafkaPartitionSplitReader(ConsumerMetadata metadata, SourceReader.Context context) {
this.metadata = metadata;
this.consumer = initConsumer(metadata, context.getIndexOfSubtask());
public KafkaPartitionSplitReader(
KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) {
this.kafkaSourceConfig = kafkaSourceConfig;
this.consumer = initConsumer(kafkaSourceConfig, context.getIndexOfSubtask());
this.stoppingOffsets = new HashMap<>();
this.groupId = metadata.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG);
this.groupId =
kafkaSourceConfig.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG);
}

@Override
Expand Down Expand Up @@ -310,6 +312,7 @@ private void parseStartingOffsets(
Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
TopicPartition tp = split.getTopicPartition();
// Parse starting offsets.
ConsumerMetadata metadata = kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
if (metadata.getStartMode() == StartMode.EARLIEST) {
partitionsStartingFromEarliest.add(tp);
} else if (metadata.getStartMode() == StartMode.LATEST) {
Expand Down Expand Up @@ -338,26 +341,28 @@ public void notifyCheckpointComplete(
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}

private KafkaConsumer<byte[], byte[]> initConsumer(ConsumerMetadata metadata, int subtaskId) {
private KafkaConsumer<byte[], byte[]> initConsumer(
KafkaSourceConfig kafkaSourceConfig, int subtaskId) {

try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(metadata.getClass().getClassLoader())) {
TemporaryClassLoaderContext.of(kafkaSourceConfig.getClass().getClassLoader())) {
Properties props = new Properties();
metadata.getProperties()
kafkaSourceConfig
.getProperties()
.forEach(
(key, value) ->
props.setProperty(String.valueOf(key), String.valueOf(value)));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, metadata.getConsumerGroup());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getConsumerGroup());
props.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, metadata.getBootstrapServers());
if (this.metadata.getProperties().get("client.id") == null) {
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrap());
if (this.kafkaSourceConfig.getProperties().get("client.id") == null) {
props.setProperty(
ConsumerConfig.CLIENT_ID_CONFIG,
CLIENT_ID_PREFIX + "-consumer-" + subtaskId);
} else {
props.setProperty(
ConsumerConfig.CLIENT_ID_CONFIG,
this.metadata.getProperties().get("client.id").toString()
this.kafkaSourceConfig.getProperties().get("client.id").toString()
+ "-"
+ subtaskId);
}
Expand All @@ -369,7 +374,7 @@ private KafkaConsumer<byte[], byte[]> initConsumer(ConsumerMetadata metadata, in
ByteArrayDeserializer.class.getName());
props.setProperty(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(metadata.isCommitOnCheckpoint()));
String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint()));

// Disable auto create topics feature
props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
Expand All @@ -31,20 +32,21 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

public class KafkaRecordEmitter
implements RecordEmitter<
ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState> {

private static final Logger logger = LoggerFactory.getLogger(KafkaRecordEmitter.class);
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
private final Map<TablePath, ConsumerMetadata> mapMetadata;
private final OutputCollector<SeaTunnelRow> outputCollector;
private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;

public KafkaRecordEmitter(
DeserializationSchema<SeaTunnelRow> deserializationSchema,
Map<TablePath, ConsumerMetadata> mapMetadata,
MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
this.deserializationSchema = deserializationSchema;
this.mapMetadata = mapMetadata;
this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
this.outputCollector = new OutputCollector<>();
}
Expand All @@ -56,6 +58,9 @@ public void emitRecord(
KafkaSourceSplitState splitState)
throws Exception {
outputCollector.output = collector;
// todo there is an additional loss in this place for non-multi-table scenarios
DeserializationSchema<SeaTunnelRow> deserializationSchema =
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
try {
if (deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) {
((CompatibleKafkaConnectDeserializationSchema) deserializationSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

import com.google.common.base.Supplier;
import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
Expand Down Expand Up @@ -69,7 +69,9 @@ public String getPluginName() {

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
return kafkaSourceConfig.getMapMetadata().values().stream()
.map(ConsumerMetadata::getCatalogTable)
.collect(Collectors.toList());
}

@Override
Expand All @@ -80,45 +82,37 @@ public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
new LinkedBlockingQueue<>();

Supplier<KafkaPartitionSplitReader> kafkaPartitionSplitReaderSupplier =
() -> new KafkaPartitionSplitReader(kafkaSourceConfig.getMetadata(), readerContext);
() -> new KafkaPartitionSplitReader(kafkaSourceConfig, readerContext);

KafkaSourceFetcherManager kafkaSourceFetcherManager =
new KafkaSourceFetcherManager(
elementsQueue, kafkaPartitionSplitReaderSupplier::get);
KafkaRecordEmitter kafkaRecordEmitter =
new KafkaRecordEmitter(
kafkaSourceConfig.getDeserializationSchema(),
kafkaSourceConfig.getMapMetadata(),
kafkaSourceConfig.getMessageFormatErrorHandleWay());

return new KafkaSourceReader(
elementsQueue,
kafkaSourceFetcherManager,
kafkaRecordEmitter,
new SourceReaderOptions(readonlyConfig),
kafkaSourceConfig.getMetadata(),
kafkaSourceConfig.getDeserializationSchema(),
readerContext,
kafkaSourceConfig.getMessageFormatErrorHandleWay());
kafkaSourceConfig,
readerContext);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
kafkaSourceConfig.getDiscoveryIntervalMillis());
return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
checkpointState,
kafkaSourceConfig.getDiscoveryIntervalMillis());
kafkaSourceConfig, enumeratorContext, checkpointState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class KafkaSourceConfig implements Serializable {
@Getter private final Properties properties;
@Getter private final long discoveryIntervalMillis;
@Getter private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
@Getter private final String consumerGroup;

public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS);
Expand All @@ -95,6 +96,7 @@ public KafkaSourceConfig(ReadonlyConfig readonlyConfig) {
this.discoveryIntervalMillis = readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
this.messageFormatErrorHandleWay =
readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP);
}

private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) {
Expand Down Expand Up @@ -128,7 +130,6 @@ private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
ConsumerMetadata consumerMetadata = new ConsumerMetadata();
consumerMetadata.setTopic(readonlyConfig.get(TOPIC));
consumerMetadata.setPattern(readonlyConfig.get(PATTERN));
consumerMetadata.setConsumerGroup(readonlyConfig.get(CONSUMER_GROUP));
consumerMetadata.setProperties(new Properties());
// Create a catalog
CatalogTable catalogTable = createCatalogTable(readonlyConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -54,7 +52,8 @@ public class KafkaSourceReader

private static final Logger logger = LoggerFactory.getLogger(KafkaSourceReader.class);
private final SourceReader.Context context;
private final ConsumerMetadata metadata;

private final KafkaSourceConfig kafkaSourceConfig;
private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> checkpointOffsetMap;

private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;
Expand All @@ -66,12 +65,10 @@ public class KafkaSourceReader
RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState>
recordEmitter,
SourceReaderOptions options,
ConsumerMetadata metadata,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
Context context,
MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
KafkaSourceConfig kafkaSourceConfig,
Context context) {
super(elementsQueue, splitFetcherManager, recordEmitter, options, context);
this.metadata = metadata;
this.kafkaSourceConfig = kafkaSourceConfig;
this.context = context;
this.checkpointOffsetMap = Collections.synchronizedSortedMap(new TreeMap<>());
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -102,7 +99,7 @@ protected KafkaSourceSplit toSplitType(String splitId, KafkaSourceSplitState spl
@Override
public List<KafkaSourceSplit> snapshotState(long checkpointId) {
List<KafkaSourceSplit> sourceSplits = super.snapshotState(checkpointId);
if (!metadata.isCommitOnCheckpoint()) {
if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
return sourceSplits;
}
if (sourceSplits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
Expand All @@ -127,7 +124,7 @@ public List<KafkaSourceSplit> snapshotState(long checkpointId) {
@Override
public void notifyCheckpointComplete(long checkpointId) {
logger.debug("Committing offsets for checkpoint {}", checkpointId);
if (!metadata.isCommitOnCheckpoint()) {
if (!kafkaSourceConfig.isCommitOnCheckpoint()) {
logger.debug("Submitting offsets after snapshot completion is prohibited");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ private void setPartitionStartOffset() throws ExecutionException, InterruptedExc
listOffsets(topicPartitions, OffsetSpec.earliest()));
break;
case GROUP_OFFSETS:
topicPartitionOffsets.putAll(
listConsumerGroupOffsets(topicPartitions, metadata));
topicPartitionOffsets.putAll(listConsumerGroupOffsets(topicPartitions));
break;
case LATEST:
topicPartitionOffsets.putAll(listOffsets(topicPartitions, OffsetSpec.latest()));
Expand Down Expand Up @@ -366,13 +365,12 @@ private Map<TopicPartition, Long> listOffsets(
.get();
}

public Map<TopicPartition, Long> listConsumerGroupOffsets(
Collection<TopicPartition> partitions, ConsumerMetadata metadata)
public Map<TopicPartition, Long> listConsumerGroupOffsets(Collection<TopicPartition> partitions)
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsOptions options =
new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<>(partitions));
return adminClient
.listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
.listConsumerGroupOffsets(kafkaSourceConfig.getConsumerGroup(), options)
.partitionsToOffsetAndMetadata()
.thenApply(
result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class KafkaSourceSplitState extends KafkaSourceSplit {

public KafkaSourceSplitState(KafkaSourceSplit sourceSplit) {
super(
sourceSplit.getTablePath(),
sourceSplit.getTopicPartition(),
sourceSplit.getStartOffset(),
sourceSplit.getEndOffset());
Expand All @@ -38,6 +39,7 @@ public void setCurrentOffset(long currentOffset) {
}

public KafkaSourceSplit toKafkaSourceSplit() {
return new KafkaSourceSplit(getTopicPartition(), getCurrentOffset(), getEndOffset());
return new KafkaSourceSplit(
getTablePath(), getTopicPartition(), getCurrentOffset(), getEndOffset());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ env {
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
consumer.group = "ogg_multi_group"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

env {
parallelism = 1
job.mode = "BATCH"
job.mode = "STREAMING"
checkpoint.interval = 1000
}

source {
Expand Down

0 comments on commit c6f8cc2

Please sign in to comment.