Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance support for Glue Schema Registry #1673

Merged
merged 15 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions client/public/index.html
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<base href="%PUBLIC_URL%/">
<link rel="icon" href="%PUBLIC_URL%/favicon.ico">
<link rel="manifest" href="%PUBLIC_URL%/manifest.json">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#33b5e5">
<meta name="html-head" content="replace">
<meta charset="utf-8" />
<base href="%PUBLIC_URL%/" />
<link rel="icon" href="%PUBLIC_URL%/favicon.ico" />
<link rel="manifest" href="%PUBLIC_URL%/manifest.json" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="theme-color" content="#33b5e5" />
<meta name="html-head" content="replace" />

<!--
The AKHQ_PREFIX_PATH placeholder magic value is replaced during serving by server
Expand Down
4 changes: 4 additions & 0 deletions client/src/containers/SideBar/Sidebar.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Sidebar extends Component {
showConnects: false,
showKsqlDBs: false,
enableRegistry: false,
registryType: '',
enableConnect: false,
enableKsqlDB: false,
roles: JSON.parse(sessionStorage.getItem('roles')),
Expand Down Expand Up @@ -95,6 +96,7 @@ class Sidebar extends Component {
const enableKsqlDB = cluster.ksqldbs !== undefined;
let newState = {
enableRegistry: cluster.registry,
registryType: cluster.registryType,
enableConnect: enableConnects,
allConnects: [],
selectedConnect: '',
Expand Down Expand Up @@ -284,6 +286,7 @@ class Sidebar extends Component {
selectedTab,
height,
enableRegistry,
registryType,
enableConnect,
enableKsqlDB
} = this.state;
Expand Down Expand Up @@ -355,6 +358,7 @@ class Sidebar extends Component {
roles.ACL.includes('READ') &&
this.renderMenuItem('fa fa-fw fa-key', constants.ACLS, 'ACLS')}
{enableRegistry &&
registryType !== 'GLUE' &&
roles &&
roles.SCHEMA &&
roles.SCHEMA.includes('READ') &&
Expand Down
5 changes: 4 additions & 1 deletion client/src/containers/Topic/Topic/Topic.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Topic extends Root {
state = {
clusterId: this.props.clusterId,
topicId: this.props.topicId,
registryType: '',
topic: {},
selectedTab: '',
showDeleteModal: false,
Expand Down Expand Up @@ -50,11 +51,12 @@ class Topic extends Root {

const roles = this.state.roles || {};
const tabSelected = getSelectedTab(this.props, this.tabs);

const registryType = this.props.clusters.find(el => el.id === clusterId).registryType;
this.setState(
{
clusterId,
topicId,
registryType,
selectedTab:
roles.TOPIC_DATA && roles.TOPIC_DATA.includes('READ') ? tabSelected : 'configs',
topicInternal: this.props.location.internal
Expand Down Expand Up @@ -210,6 +212,7 @@ class Topic extends Root {
location={location}
isAllTopicDataSelected={this.state.isAllTopicDataSelected}
onSelectAllCheckboxChange={this._handleSelectAllCheckboxChange}
registryType={this.state.registryType}
/>
);
case 'partitions':
Expand Down
29 changes: 21 additions & 8 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TopicData extends Root {
deleteMessage: '',
compactMessageToDelete: '',
selectedCluster: this.props.clusterId,
registryType: this.props.registryType,
selectedTopic: this.props.topicId,
cleanupPolicy: '',
datetime: '',
Expand Down Expand Up @@ -115,11 +116,11 @@ class TopicData extends Root {
const { clusterId, topicId } = this.props.match.params;
const query = new URLSearchParams(this.props.location.search);
const uiOptions = await getClusterUIOptions(clusterId);

this.setState(
prevState => ({
selectedCluster: clusterId,
selectedTopic: topicId,
registryType: this.props.registryType,
sortBy: query.get('sort')
? query.get('sort')
: uiOptions && uiOptions.topicData && uiOptions.topicData.sort
Expand Down Expand Up @@ -517,7 +518,11 @@ class TopicData extends Root {
partition: JSON.stringify(message.partition) || '',
offset: JSON.stringify(message.offset) || '',
headers: message.headers || [],
schema: { key: message.keySchemaId, value: message.valueSchemaId },
schema: {
key: message.keySchemaId,
value: message.valueSchemaId,
registryType: this.state.registryType
},
exceptions: message.exceptions || []
};

Expand Down Expand Up @@ -1206,9 +1211,13 @@ class TopicData extends Root {
{obj[col.accessor].key !== undefined && (
<span
className="badge badge-primary clickable"
onClick={() => {
this._redirectToSchema(obj.schema.key);
}}
onClick={
obj[col.accessor].registryType !== 'GLUE'
? () => {
this._redirectToSchema(obj.schema.key);
}
: undefined
}
>
Key: {obj[col.accessor].key}
</span>
Expand All @@ -1217,9 +1226,13 @@ class TopicData extends Root {
{obj[col.accessor].value !== undefined && (
<span
className="badge badge-primary clickable schema-value"
onClick={() => {
this._redirectToSchema(obj.schema.value);
}}
onClick={
obj[col.accessor].registryType !== 'GLUE'
? () => {
this._redirectToSchema(obj.schema.value);
}
: undefined
}
>
Value: {obj[col.accessor].value}
</span>
Expand Down
6 changes: 5 additions & 1 deletion client/src/utils/Routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ class Routes extends Root {
)}

{roles && roles.TOPIC && roles.TOPIC.includes('READ') && (
<Route exact path="/ui/:clusterId/topic/:topicId/:tab?" component={Topic} />
<Route
exact
path="/ui/:clusterId/topic/:topicId/:tab?"
render={props => <Topic clusters={clusters} {...props} />}
/>
)}

{roles && roles.TOPIC && roles.TOPIC_DATA.includes('READ') && (
Expand Down
3 changes: 2 additions & 1 deletion docs/.vuepress/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ module.exports = {
text: 'Schema Registry',
children: [
'/docs/configuration/schema-registry/tibco.md',
'/docs/configuration/schema-registry/schema-references.md',
'/docs/configuration/schema-registry/glue.md',
'/docs/configuration/schema-registry/schema-references.md',
]
},
{
Expand Down
29 changes: 29 additions & 0 deletions docs/docs/configuration/schema-registry/glue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Glue schema registry
Currently ,glue schema registry support is limited to only de-serialisation of avro/protobuf/json serialized messages.
It can be configured as below.
```yaml
akhq:
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8085"
type: "glue"
glueSchemaRegistryName: Name of schema Registry
awsRegion: aws region
connect:
- name: "connect"
url: "http://connect:8083"
ports:
- 8080:8080
links:
- kafka
- repo
```
Please note that authentication is done using aws default credentials provider.

Url key is required to not break the flow.
2 changes: 2 additions & 0 deletions src/main/java/org/akhq/controllers/AkhqController.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public List<ClusterDefinition> list() {
.map(connection -> new ClusterDefinition(
connection.getName(),
connection.getSchemaRegistry() != null,
connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getType().name() : null,
(connection.getConnect() != null ? connection.getConnect() : new ArrayList<Connect>())
.stream()
.map(Connect::getName)
Expand Down Expand Up @@ -279,6 +280,7 @@ public static class AuthPermissions {
public static class ClusterDefinition {
private String id;
private boolean registry;
private String registryType;
private List<String> connects;
private List<String> ksqldbs;
}
Expand Down
47 changes: 31 additions & 16 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.akhq.models;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerDataParser;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -42,9 +44,9 @@ public class Record {
private ZonedDateTime timestamp;
@JsonIgnore
private TimestampType timestampType;
private Integer keySchemaId;
private String keySchemaId;
private String keySubject;
private Integer valueSchemaId;
private String valueSchemaId;
private String valueSubject;
private List<KeyValue<String, String>> headers = new ArrayList<>();
@JsonIgnore
Expand Down Expand Up @@ -89,22 +91,21 @@ public class Record {
@JsonIgnore
private Deserializer awsGlueKafkaDeserializer;


public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List<KeyValue<String, String>> headers, Topic topic, Deserializer awsGlueKafkaDeserializer) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
this.partition = record.partition();
this.offset = record.offset();
this.timestamp = ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
this.bytesKey = bytesKey;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.keySubject = getAvroSchemaSubject(this.keySchemaId);
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId);
this.headers = headers;
this.truncated = false;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Expand All @@ -122,6 +123,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.timestamp = ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
this.timestampType = record.timestampType();
this.bytesKey = record.key();
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.keySubject = getAvroSchemaSubject(this.keySchemaId);
this.bytesValue = bytesValue;
Expand All @@ -139,7 +141,6 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
this.truncated = false;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
}

public String getKey() {
Expand Down Expand Up @@ -179,20 +180,18 @@ public void setTruncated(Boolean truncated) {
this.truncated = truncated;
}

private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
private String convertToString(byte[] payload, String schemaId, boolean isKey) {
if (payload == null) {
return null;
}
else if (this.awsGlueKafkaDeserializer != null) {
return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString();

} else if (schemaId != null) {
try {

Object toType = null;

if (this.awsGlueKafkaDeserializer != null) {
return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString();
}
if (client != null) {
ParsedSchema schema = client.getSchemaById(schemaId);
ParsedSchema schema = client.getSchemaById(Integer.parseInt(schemaId));
if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) {
toType = kafkaProtoDeserializer.deserialize(topic.getName(), payload);
if (!(toType instanceof Message)) {
Expand Down Expand Up @@ -304,30 +303,46 @@ public Collection<String> getHeadersValues() {
.collect(Collectors.toList());
}

private Integer getAvroSchemaId(byte[] payload) {
private String getAvroSchemaId(byte[] payload) {
if (topic.isInternalTopic()) {
return null;
}
try {

if (awsGlueKafkaDeserializer!= null) {
ByteBuffer byteBuffer = ByteBuffer.wrap(payload);
GlueSchemaRegistryDeserializerDataParser dataParser = GlueSchemaRegistryDeserializerDataParser.getInstance();

UUID schemaVersionId = dataParser.getSchemaVersionId(byteBuffer);
return schemaVersionId.toString();
}

ByteBuffer buffer = ByteBuffer.wrap(payload);
byte magicBytes = buffer.get();
int schemaId = buffer.getInt();

if (magicBytes == MAGIC_BYTE && schemaId >= 0) {
return schemaId;
return String.valueOf(schemaId);
}
} catch (Exception ignore) {

}
return null;
}

private String getAvroSchemaSubject(Integer schemaId) {
private String getAvroSchemaSubject(String schemaId) {
if (schemaId == null || client == null) {
return null;
}
try {
ParsedSchema schemaById = client.getSchemaById(schemaId);
if(awsGlueKafkaDeserializer!= null) {
String[] schemaArnSplitted = ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer)
.getGlueSchemaRegistryDeserializationFacade()
.getSchemaRegistryClient().getSchemaVersionResponse(schemaId).schemaArn().split("/");
return schemaArnSplitted[schemaArnSplitted.length-1];
}

ParsedSchema schemaById = client.getSchemaById(Integer.parseInt(schemaId));
if (schemaById == null) {
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/akhq/controllers/AkhqControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void list() {
assertEquals(1, result.get(0).getKsqldbs().size());
assertEquals("ksqldb", result.get(0).getKsqldbs().get(0));
assertTrue(result.get(0).isRegistry());
assertEquals("CONFLUENT", result.get(0).getRegistryType());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, Interru
Record recordToAssert = consumedRecord.get();
assertEquals(recordToAssert.getKey(), keyJsonString);
assertEquals(recordToAssert.getValue(), recordAsJsonString);
assertEquals(recordToAssert.getValueSchemaId(), valueJsonSchema.getId());
assertEquals(recordToAssert.getValueSchemaId(), String.valueOf(valueJsonSchema.getId()));

// clear schema registry as it is shared between tests
schemaRegistryRepository.delete(KafkaTestCluster.CLUSTER_ID, keyJsonSchema.getSubject());
Expand Down
Loading