Skip to content

Commit

Permalink
feat(topicdata): add option to send tombstones (#1075)
Browse files Browse the repository at this point in the history
close #1072
  • Loading branch information
neoscaler authored Apr 20, 2022
1 parent b732a16 commit 24231a9
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 22 deletions.
6 changes: 4 additions & 2 deletions client/src/components/Form/Form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Form extends Root {
);
};

renderJSONInput = (name, label, onChange, textMode, options) => {
renderJSONInput = (name, label, onChange, textMode, options, rest) => {
const { formData, errors } = this.state;
const inputMode = textMode ? "text" : (formData.schemaType === "PROTOBUF" ? "protobuf" : "json")
return (
Expand All @@ -137,6 +137,7 @@ class Form extends Root {
editorProps={{ $blockScrolling: true }}
setOptions={options}
style={{ width: '100%', minHeight: '25vh' }}
{...rest}
/>
{errors[name] && <div className="alert alert-danger mt-1 p-1">{errors[name]}</div>}
</div>
Expand Down Expand Up @@ -241,7 +242,7 @@ class Form extends Root {
);
};

renderCheckbox = (name, label, isChecked, onChange, isDefaultChecked) => {
renderCheckbox = (name, label, isChecked, onChange, isDefaultChecked, rest) => {
return (
<input
type="checkbox"
Expand All @@ -251,6 +252,7 @@ class Form extends Root {
checked={isChecked}
onChange={onChange}
defaultChecked={ isDefaultChecked ? isDefaultChecked : false}
{...rest}
/>
);
};
Expand Down
49 changes: 42 additions & 7 deletions client/src/containers/Topic/TopicProduce/TopicProduce.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TopicProduce extends Form {
topics: [],
topicsSearchValue: '',
multiMessage: false,
tombstone: false,
valuePlaceholder: '{"param": "value"}'
};

Expand Down Expand Up @@ -157,20 +158,27 @@ class TopicProduce extends Form {
selectedValueSchema,
keySchema,
valueSchema,
multiMessage
multiMessage,
tombstone
} = this.state;
const { clusterId } = this.props.match.params;

let schemaKeyToSend = keySchema.find(key => key.subject === selectedKeySchema);
let schemaValueToSend = valueSchema.find(value => value.subject === selectedValueSchema);
let value;
if (tombstone) {
value = null;
} else {
value = multiMessage ? formData.value : JSON.parse(JSON.stringify(formData.value))
}
const topic = {
clusterId,
topicId,
topics: [topicId],
partition: formData.partition,
key: formData.key,
timestamp: datetime.toISOString(),
value: multiMessage ? formData.value : JSON.parse(JSON.stringify(formData.value)),
value: value,
keySchema: schemaKeyToSend ? schemaKeyToSend.id : '',
valueSchema: schemaValueToSend ? schemaValueToSend.id : '',
multiMessage: multiMessage,
Expand All @@ -196,7 +204,7 @@ class TopicProduce extends Form {
});
}

renderMultiMessage() {
renderMultiMessage(tombstone) {
const { formData, multiMessage } = this.state;

return (
Expand All @@ -211,7 +219,8 @@ class TopicProduce extends Form {
this.setState({multiMessage: !multiMessage,
valuePlaceholder: this.getPlaceholderValue(!multiMessage, formData.keyValueSeparator)})
},
false
false,
{ disabled: tombstone }
)}

<label className="col-auto col-form-label">Separator</label>
Expand All @@ -236,6 +245,28 @@ class TopicProduce extends Form {
);
}

renderTombstone(multiMessage) {
const { tombstone } = this.state;

return (
<div className="form-group row">
<label className="col-sm-2 col-form-label">Tombstone</label>
<div className="row khq-multiple col-sm-7">
{this.renderCheckbox(
'isTombstone',
'',
tombstone,
() => {
this.setState({tombstone: !tombstone} )
},
false,
{ disabled: multiMessage }
)}
</div>
</div>
);
}

getPlaceholderValue(isMultiMessage, keyValueSeparator) {
if(isMultiMessage) {
return 'key1' + keyValueSeparator + '{"param": "value1"}\n'
Expand Down Expand Up @@ -403,7 +434,8 @@ class TopicProduce extends Form {
valueSchema,
valueSchemaSearchValue,
selectedValueSchema,
multiMessage
multiMessage,
tombstone
} = this.state;
let date = moment(datetime);
return (
Expand Down Expand Up @@ -472,7 +504,9 @@ class TopicProduce extends Form {
)
)}

{this.renderMultiMessage()}
{this.renderMultiMessage(tombstone)}

{this.renderTombstone(multiMessage)}

{this.renderJSONInput('value', 'Value', value => {
this.setState({
Expand All @@ -482,7 +516,8 @@ class TopicProduce extends Form {
}
})},
multiMessage, // true -> 'text' mode; json, protobuff, ... mode otherwise
{ placeholder: this.getPlaceholderValue(multiMessage, formData.keyValueSeparator) }
{ placeholder: this.getPlaceholderValue(multiMessage, formData.keyValueSeparator) },
{ readOnly: tombstone }
)}
<div style={{ display: 'flex', flexDirection: 'row', width: '100%', padding: 0 }}>
<label
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public List<Record> produce(
HttpRequest<?> request,
String cluster,
String topicName,
String value,
Optional<String> value,
Optional<String> key,
Optional<Integer> partition,
Optional<String> timestamp,
Expand All @@ -167,7 +167,7 @@ public List<Record> produce(
.map(recordMetadata -> new Record(recordMetadata,
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
value.map(String::getBytes).orElse(null),
headers,
targetTopic))
.collect(Collectors.toList());
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
public List<RecordMetadata> produce(
String clusterId,
String topic,
String value,
Optional<String> value,
Map<String, String> headers,
Optional<String> key,
Optional<Integer> partition,
Expand All @@ -492,10 +492,10 @@ public List<RecordMetadata> produce(
List<RecordMetadata> produceResults = new ArrayList<>();

// Distinguish between single record produce, and multiple messages
if (multiMessage.booleanValue()) {
if (Boolean.TRUE.equals(multiMessage) && value.isPresent()) {
// Split key-value pairs and produce them
for (KeyValue<String, String> kvPair : splitMultiMessage(value, keyValueSeparator.orElseThrow())) {
produceResults.add(produce(clusterId, topic, kvPair.getValue(), headers, Optional.of(kvPair.getKey()),
for (KeyValue<String, String> kvPair : splitMultiMessage(value.get(), keyValueSeparator.orElseThrow())) {
produceResults.add(produce(clusterId, topic, Optional.of(kvPair.getValue()), headers, Optional.of(kvPair.getKey()),
partition, timestamp, keySchemaId, valueSchemaId));
}
} else {
Expand Down Expand Up @@ -590,7 +590,7 @@ private void deleteRecords(String clusterId, Map<TopicPartition, RecordsToDelete
public RecordMetadata produce(
String clusterId,
String topic,
String value,
Optional<String> value,
Map<String, String> headers,
Optional<String> key,
Optional<Integer> partition,
Expand All @@ -610,19 +610,19 @@ public RecordMetadata produce(
}
} else {
try {
if (Topic.isCompacted(configRepository.findByTopic(clusterId, value))) {
if (Topic.isCompacted(configRepository.findByTopic(clusterId, value.isEmpty() ? null : value.get()))) {
throw new IllegalArgumentException("Key missing for produce onto compacted topic");
}
} catch (ExecutionException ex) {
log.debug("Failed to determine if {} topic {} is compacted", clusterId, topic, ex);
}
}

if (value != null && valueSchemaId.isPresent()) {
if (value.isPresent() && valueSchemaId.isPresent()) {
SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, valueSchemaId.get());
valueAsBytes = valueSerializer.serialize(value);
valueAsBytes = valueSerializer.serialize(value.get());
} else {
valueAsBytes = value != null ? value.getBytes() : null;
valueAsBytes = value.map(String::getBytes).orElse(null);
}

return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp);
Expand Down
18 changes: 17 additions & 1 deletion src/test/java/org/akhq/controllers/TopicControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ void produce() {
assertEquals("1", response.get(0).getHeaders().get("my-header-1"));
}

@Test
@Order(3)
void produceTombstone() {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("value", null);
paramMap.put("key", "my-key-tomb");
paramMap.put("multiMessage", false);
List<Record> response = this.retrieveList(HttpRequest.POST(
CREATE_TOPIC_URL + "/data", paramMap
), Record.class);

assertEquals(1, response.size());
assertEquals("my-key-tomb", response.get(0).getKey());
assertNull(response.get(0).getValue());
}

@Test
@Order(4)
void dataGet() {
Expand All @@ -214,7 +230,7 @@ void dataDelete() {
),
Record.class
);
assertEquals(1, retrieve.getOffset());
assertEquals(2, retrieve.getOffset());

// get data
// @TODO: Failed to see the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, Interru
RecordMetadata producedRecordMetadata = repository.produce(
KafkaTestCluster.CLUSTER_ID,
KafkaTestCluster.TOPIC_JSON_SCHEMA,
recordAsJsonString,
Optional.of(recordAsJsonString),
Collections.emptyMap(),
Optional.of(keyJsonString),
Optional.empty(),
Expand Down

0 comments on commit 24231a9

Please sign in to comment.