diff --git a/application.example.yml b/application.example.yml index 5199640a5..e086b56bf 100644 --- a/application.example.yml +++ b/application.example.yml @@ -165,6 +165,7 @@ akhq: topic-data: size: 50 # max record per page (default: 50) poll-timeout: 1000 # The time, in milliseconds, spent waiting in poll if data is not available in the buffer. + kafka-max-message-length: 1000000 # Max message length allowed to send to UI when retrieving a list of records in bytes. # Ui Global Options (optional) ui-options: diff --git a/client/src/components/Table/Table.jsx b/client/src/components/Table/Table.jsx index 3de78864f..41e6c0d4d 100644 --- a/client/src/components/Table/Table.jsx +++ b/client/src/components/Table/Table.jsx @@ -288,7 +288,7 @@ class Table extends Component { } renderActions(row) { - const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, idCol } = this.props; + const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, onDownload, idCol } = this.props; let idColVal = idCol ? row[this.props.idCol] : row.id; @@ -374,6 +374,18 @@ class Table extends Component { )} + {actions.find(el => el === constants.TABLE_DOWNLOAD) && ( + + { + onDownload && onDownload(row); + }} + > + + + + )} ); } diff --git a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx index 96491b6f2..9ce15f16a 100644 --- a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx +++ b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx @@ -64,7 +64,8 @@ class TopicData extends Root { roles: JSON.parse(sessionStorage.getItem('roles')), canDeleteRecords: false, percent: 0, - loading: true + loading: true, + canDownload: false }; searchFilterTypes = [ @@ -113,6 +114,7 @@ class TopicData extends Root { () => { if(query.get('single') !== null) { this._getSingleMessage(query.get('partition'), query.get('offset')); + this.setState({ canDownload: true }) } else { this._getMessages(); } @@ -346,10 +348,23 @@ class TopicData extends Root { console.error('Failed to copy: ', err); } + this.setState({ canDownload: true }) + this.props.history.push(pathToShare) this._getSingleMessage(row.partition, row.offset); } + _handleDownload({ key, value: data }) { + const hasKey = key && key !== null && key !== 'null'; + + const a = document.createElement('a'); + a.href = URL.createObjectURL( new Blob([data], { type:'text/json' }) ); + a.download = `${hasKey ? key : 'file'}.json`; + + a.click(); + a.remove(); + } + _showDeleteModal = deleteMessage => { this.setState({ showDeleteModal: true, deleteMessage }); }; @@ -383,7 +398,9 @@ class TopicData extends Root { messages.forEach(message => { let messageToPush = { key: message.key || 'null', - value: message.value || 'null', + value: message.truncated + ? message.value + '...\nToo large message. Full body in share button.' || 'null' + : message.value || 'null', timestamp: message.timestamp, partition: JSON.stringify(message.partition) || '', offset: JSON.stringify(message.offset) || '', @@ -642,9 +659,14 @@ class TopicData extends Root { datetime, isSearching, canDeleteRecords, + canDownload, percent, loading } = this.state; + + let actions = canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE] + if (canDownload) actions.push(constants.TABLE_DOWNLOAD) + let date = moment(datetime); const { history } = this.props; const firstColumns = [ @@ -965,7 +987,10 @@ class TopicData extends Root { onShare={row => { this._handleOnShare(row); }} - actions={canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE]} + onDownload={row => { + this._handleDownload(row); + }} + actions={actions} onExpand={obj => { return Object.keys(obj.headers).map(header => { return ( diff --git a/client/src/utils/constants.js b/client/src/utils/constants.js index 3b1be05ba..70c8f9eb3 100644 --- a/client/src/utils/constants.js +++ b/client/src/utils/constants.js @@ -24,6 +24,7 @@ export const TABLE_DETAILS = 'details'; export const TABLE_CONFIG = 'config'; export const TABLE_RESTART = 'restart'; export const TABLE_SHARE = 'share'; +export const TABLE_DOWNLOAD = 'download' // Tab names/route names export const CLUSTER = 'cluster'; @@ -65,6 +66,7 @@ export default { TABLE_CONFIG, TABLE_RESTART, TABLE_SHARE, + TABLE_DOWNLOAD, CLUSTER, NODE, TOPIC, diff --git a/docs/docs/configuration/akhq.md b/docs/docs/configuration/akhq.md index a17ae8b7b..5d8834041 100644 --- a/docs/docs/configuration/akhq.md +++ b/docs/docs/configuration/akhq.md @@ -22,6 +22,7 @@ These parameters are the default values used in the topic creation page. ## Topic Data * `akhq.topic-data.size`: max record per page (default: 50) * `akhq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the buffer (default: 1000). +* `akhq.topic-data.kafka-max-message-length`: Max message length allowed to send to UI when retrieving a list of records (dafault: 1000000 bytes). ## Ui Settings ### Topics diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 06018b3d4..53fd6fec6 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -8,6 +8,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.micronaut.context.annotation.Value; import kafka.coordinator.group.GroupMetadataManager; import kafka.coordinator.transaction.TransactionLog; import kafka.coordinator.transaction.TxnKey; @@ -71,12 +72,15 @@ public class Record { private byte[] bytesValue; @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) private String value; private final List exceptions = new ArrayList<>(); private byte MAGIC_BYTE; + private Boolean truncated; + public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map headers, Topic topic) { this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; @@ -88,6 +92,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.bytesValue = bytesValue; this.valueSchemaId = getAvroSchemaId(this.bytesValue); this.headers = headers; + this.truncated = false; } public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, @@ -118,6 +123,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.kafkaProtoDeserializer = kafkaProtoDeserializer; this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; + this.truncated = false; } public String getKey() { @@ -145,6 +151,14 @@ public String getValue() { return this.value; } + public void setValue(String value) { + this.value = value; + } + + public void setTruncated(Boolean truncated) { + this.truncated = truncated; + } + private String convertToString(byte[] payload, Integer schemaId, boolean isKey) { if (payload == null) { return null; diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 9b3a90185..3c1887336 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.codehaus.httpcache4j.uri.URIBuilder; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -76,6 +77,9 @@ public class RecordRepository extends AbstractRepository { @Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}") protected int maxPollRecords; + @Value("${akhq.topic-data.kafka-max-message-length}") + private int maxKafkaMessageLength; + public Map getLastRecord(String clusterId, List topicsName) throws ExecutionException, InterruptedException { Map topics = topicRepository.findByName(clusterId, topicsName).stream() .collect(Collectors.toMap(Topic::getName, Function.identity())); @@ -153,6 +157,7 @@ private List consumeOldest(Topic topic, Options options) { for (ConsumerRecord record : records) { Record current = newRecord(record, options, topic); if (searchFilter(options, current)) { + filterMessageLength(current); list.add(current); } } @@ -311,6 +316,7 @@ private List consumeNewest(Topic topic, Options options) { } Record current = newRecord(record, options, topic); if (searchFilter(options, current)) { + filterMessageLength(current); list.add(current); } } @@ -1268,5 +1274,14 @@ private static class EndOffsetBound { private final long end; private final KafkaConsumer consumer; } + + private void filterMessageLength(Record record) { + int bytesLength = record.getValue().getBytes(StandardCharsets.UTF_8).length; + if (bytesLength > maxKafkaMessageLength) { + int substringChars = maxKafkaMessageLength / 1000; + record.setValue(record.getValue().substring(0, substringChars)); + record.setTruncated(true); + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d88f8ee88..52d48a14e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -125,6 +125,7 @@ akhq: topic-data: size: 50 poll-timeout: 1000 + kafka-max-message-length: 1000000 security: default-group: admin