Skip to content

Commit

Permalink
fix(server): improving search and listing due to higher timeout and m…
Browse files Browse the repository at this point in the history
…ax.poll.records
  • Loading branch information
jonasvoelcker committed Jun 19, 2024
1 parent ff3acaf commit 2069145
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 32 deletions.
61 changes: 31 additions & 30 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import java.util.stream.StreamSupport;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
Expand Down Expand Up @@ -80,10 +81,10 @@ public class RecordRepository extends AbstractRepository {
@Inject
private MaskingUtils maskingUtils;

@Value("${akhq.topic-data.poll-timeout:1000}")
@Value("${akhq.topic-data.poll-timeout:10000}")
protected int pollTimeout;

@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}")
@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:25000}")
protected int maxPollRecords;

@Value("${akhq.topic-data.kafka-max-message-length:2147483647}")
Expand All @@ -99,9 +100,7 @@ public Map<String, Record> getLastRecord(String clusterId, List<String> topicsNa
.map(partition -> new TopicPartition(partition.getTopic(), partition.getId()))
.collect(Collectors.toList());

KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId, new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicPartitions.size() * 3);
}});
KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId);
consumer.assign(topicPartitions);

consumer
Expand Down Expand Up @@ -174,9 +173,10 @@ private List<Record> consumeOldest(Topic topic, Options options) {

consumer.close();

list.sort(Comparator.comparing(Record::getTimestamp));

return list;
return list.stream()
.sorted(Comparator.comparing(Record::getTimestamp))
.limit(options.size)
.toList();
}

public List<TimeOffset> getOffsetForTime(String clusterId, List<org.akhq.models.TopicPartition> partitions, Long timestamp) throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -264,12 +264,7 @@ private List<Record> consumeNewest(Topic topic, Options options) {
.getPartitions()
.parallelStream()
.map(partition -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(
options.clusterId,
new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(options.size));
}}
);
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);

return getOffsetForSortNewest(consumer, partition, options)
.map(offset -> offset.withTopicPartition(
Expand Down Expand Up @@ -652,11 +647,8 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition,
public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws ExecutionException, InterruptedException {
AtomicInteger matchesCount = new AtomicInteger();

Properties properties = new Properties();
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, options.getSize());

return Flowable.generate(() -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId, properties);
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

if (partitions.size() == 0) {
Expand Down Expand Up @@ -699,22 +691,32 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
currentEvent.emptyPoll = 0;
}

Comparator<Record> comparator = Comparator.comparing(Record::getTimestamp);

List<Record> sortedRecords = StreamSupport.stream(records.spliterator(), false)
.map(record -> newRecord(record, options, topic))
.sorted(Options.Sort.NEWEST.equals(options.sort) ? comparator.reversed() : comparator)
.toList();

List<Record> list = new ArrayList<>();

for (ConsumerRecord<byte[], byte[]> record : records) {
for (Record record : sortedRecords) {
if (matchesCount.get() >= options.size) {
break;
}

currentEvent.updateProgress(record);

Record current = newRecord(record, options, topic);
if (matchFilters(options, current)) {
list.add(current);
if (matchFilters(options, record)) {
list.add(record);
matchesCount.getAndIncrement();

log.trace(
"Record [topic: {}] [partition: {}] [offset: {}] [key: {}]",
record.topic(),
record.partition(),
record.offset(),
record.key()
record.getTopic(),
record.getPartition(),
record.getOffset(),
record.getKey()
);
}
}
Expand Down Expand Up @@ -927,10 +929,9 @@ public Event<SearchEvent> progress(Options options) {
return Event.of(this).name("searchBody");
}


private void updateProgress(ConsumerRecord<byte[], byte[]> record) {
Offset offset = this.offsets.get(record.partition());
offset.current = record.offset();
private void updateProgress(Record record) {
Offset offset = this.offsets.get(record.getPartition());
offset.current = record.getOffset();
}

@AllArgsConstructor
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ akhq:
clients-defaults:
consumer:
properties:
max.poll.records: 50
max.poll.records: 25000
isolation.level: read_committed
# group.id: Akhq
enable.auto.commit: "false"
Expand Down Expand Up @@ -130,7 +130,7 @@ akhq:

topic-data:
size: 50
poll-timeout: 1000
poll-timeout: 10000
kafka-max-message-length: 1000000

audit:
Expand Down

0 comments on commit 2069145

Please sign in to comment.