Skip to content

Commit

Permalink
Consume records quickly
Browse files Browse the repository at this point in the history
- Fetch in parallel for consume newest
- Reduce default poll timeout and add a configuration variable
- relate to #23
  • Loading branch information
tchiotludo committed Apr 13, 2019
1 parent 0044474 commit 44cdbc0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
1 change: 1 addition & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ kafkahq:
topic-data:
sort: OLDEST # default sort order (OLDEST, NEWEST) (default: OLDEST)
size: 50 # max record per page (default: 50)
poll-timeout: 1000 # The time, in milliseconds, spent waiting in poll if data is not available in the buffer.

# Auth & Roles (optionnal)
security:
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/kafkahq/models/LogDir.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;

@ToString
Expand All @@ -17,7 +18,7 @@ public class LogDir {
private final long offsetLag;
private final boolean isFuture;

public LogDir(Integer brokerId, String path, org.apache.kafka.common.TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
public LogDir(Integer brokerId, String path, TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
this.brokerId = brokerId;
this.path = path;
this.topic = topicPartition.topic();
Expand Down
54 changes: 30 additions & 24 deletions src/main/java/org/kafkahq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
Expand Down Expand Up @@ -35,6 +36,9 @@ public class RecordRepository extends AbstractRepository {
private final TopicRepository topicRepository;
private final SchemaRegistryRepository schemaRegistryRepository;

@Value("${kafkahq.topic-data.poll-timeout}")
protected int pollTimeout;

@Inject
public RecordRepository(KafkaModule kafkaModule, TopicRepository topicRepository, SchemaRegistryRepository schemaRegistryRepository) {
this.kafkaModule = kafkaModule;
Expand Down Expand Up @@ -145,37 +149,39 @@ private Map<TopicPartition, Long> getTopicPartitionForSortOldest(Topic topic, Op
private List<Record> consumeNewest(Topic topic, Options options) {
int pollSizePerPartition = pollSizePerPartition(topic, options);

KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(
options.clusterId,
new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(pollSizePerPartition));
}}
);

List<Record> collect = topic
return topic
.getPartitions()
.stream()
.map(partition -> getOffsetForSortNewest(consumer, partition, options, pollSizePerPartition)
.map(offset -> offset.withTopicPartition(
new TopicPartition(
partition.getTopic(),
partition.getId()
)
))
.parallelStream()
.map(partition -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(
options.clusterId,
new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(pollSizePerPartition));
}}
);

return getOffsetForSortNewest(consumer, partition, options, pollSizePerPartition)
.map(offset -> offset.withTopicPartition(
new TopicPartition(
partition.getTopic(),
partition.getId()
)
));
}
)
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(topicPartitionOffset -> {
consumer.assign(Collections.singleton(topicPartitionOffset.getTopicPartition()));
consumer.seek(topicPartitionOffset.getTopicPartition(), topicPartitionOffset.getBegin());
topicPartitionOffset.getConsumer().assign(Collections.singleton(topicPartitionOffset.getTopicPartition()));
topicPartitionOffset.getConsumer().seek(topicPartitionOffset.getTopicPartition(), topicPartitionOffset.getBegin());

List<Record> list = new ArrayList<>();
int emptyPoll = 0;

do {
ConsumerRecords<byte[], byte[]> records;

records = this.poll(consumer);
records = this.poll(topicPartitionOffset.getConsumer());

if (records.isEmpty()) {
emptyPoll++;
Expand All @@ -196,14 +202,12 @@ private List<Record> consumeNewest(Topic topic, Options options) {

Collections.reverse(list);

topicPartitionOffset.getConsumer().close();

return Stream.of(list);
})
.flatMap(List::stream)
.collect(Collectors.toList());

consumer.close();

return collect;
}

private int pollSizePerPartition(Topic topic, Options options) {
Expand Down Expand Up @@ -274,6 +278,7 @@ private Optional<EndOffsetBound> getOffsetForSortNewest(KafkaConsumer<byte[], by
}

return EndOffsetBound.builder()
.consumer(consumer)
.begin(first)
.end(last)
.build();
Expand All @@ -289,7 +294,7 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
// First one wait for metadata and send records
// Hack bellow can be used to wait for metadata
*/
return consumer.poll(5000);
return consumer.poll(this.pollTimeout);

/*
if (!records.isEmpty()) {
Expand Down Expand Up @@ -596,5 +601,6 @@ private static class EndOffsetBound {
private final TopicPartition topicPartition;
private final long begin;
private final long end;
private final KafkaConsumer<byte[], byte[]> consumer;
}
}
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ kafkahq:
topic-data:
sort: OLDEST
size: 50
poll-timeout: 1000

security:
default-roles:
Expand Down

0 comments on commit 44cdbc0

Please sign in to comment.