From 05cb9d84e27789e710801e2206c1739c95a14ce7 Mon Sep 17 00:00:00 2001 From: apatra Date: Fri, 12 Jan 2024 10:54:11 +0530 Subject: [PATCH 01/12] Add support for GLUE schema registry --- build.gradle | 4 ++++ .../java/org/akhq/configs/Connection.java | 3 ++- .../org/akhq/configs/SchemaRegistryType.java | 3 ++- .../org/akhq/controllers/TopicController.java | 5 ++-- src/main/java/org/akhq/models/Record.java | 12 ++++++++-- .../akhq/repositories/RecordRepository.java | 6 +++-- .../SchemaRegistryRepository.java | 24 +++++++++++++++++++ 7 files changed, 49 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index dd6d5ac37..2edc2dd5a 100644 --- a/build.gradle +++ b/build.gradle @@ -152,6 +152,10 @@ dependencies { //AWS MSK IAM Auth implementation group: 'software.amazon.msk', name: 'aws-msk-iam-auth', version: '2.0.0' + // AWS Glue serde + implementation ("software.amazon.glue:schema-registry-serde:1.1.15") + + implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.5.11' implementation 'io.jsonwebtoken:jjwt-impl:0.12.3' diff --git a/src/main/java/org/akhq/configs/Connection.java b/src/main/java/org/akhq/configs/Connection.java index 0b9ecb297..cf6109bc0 100644 --- a/src/main/java/org/akhq/configs/Connection.java +++ b/src/main/java/org/akhq/configs/Connection.java @@ -33,7 +33,8 @@ public static class SchemaRegistry { String basicAuthUsername; String basicAuthPassword; SchemaRegistryType type = SchemaRegistryType.CONFLUENT; - + String glueSchemaRegistryName; + String awsRegion; @MapFormat(transformation = MapFormat.MapTransformation.FLAT) Map properties; } diff --git a/src/main/java/org/akhq/configs/SchemaRegistryType.java b/src/main/java/org/akhq/configs/SchemaRegistryType.java index 3410c912a..d1d06556b 100644 --- a/src/main/java/org/akhq/configs/SchemaRegistryType.java +++ b/src/main/java/org/akhq/configs/SchemaRegistryType.java @@ -5,7 +5,8 @@ @Getter public enum SchemaRegistryType { CONFLUENT((byte) 0x0), - TIBCO((byte) 0x80); + TIBCO((byte) 0x80), + GLUE((byte) 0x0); private byte magicByte; diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 422f95e0d..025e67ae6 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -188,7 +188,7 @@ public List produce( key.map(String::getBytes).orElse(null), value.map(String::getBytes).orElse(null), headers, - targetTopic)) + targetTopic, null)) .collect(Collectors.toList()); } @@ -365,7 +365,8 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio Base64.getDecoder().decode(key), null, new ArrayList<>(), - topicRepository.findByName(cluster, topicName) + topicRepository.findByName(cluster, topicName), + null ); } diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index fc7884e0c..c66faf688 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -83,8 +83,10 @@ public class Record { private byte MAGIC_BYTE; private Boolean truncated; + private Deserializer awsKafkaDeserializer; - public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic) { + + public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic, Deserializer awsKafkaDeserializer) { this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; this.partition = record.partition(); @@ -98,11 +100,12 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.valueSubject = getAvroSchemaSubject(this.valueSchemaId); this.headers = headers; this.truncated = false; + this.awsKafkaDeserializer = awsKafkaDeserializer; } public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer, - ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic) { + ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsKafkaDeserializer) { if (schemaRegistryType == SchemaRegistryType.TIBCO) { this.MAGIC_BYTE = (byte) 0x80; } else { @@ -132,6 +135,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; this.truncated = false; + this.awsKafkaDeserializer = awsKafkaDeserializer; } public String getKey() { @@ -174,6 +178,10 @@ public void setTruncated(Boolean truncated) { private String convertToString(byte[] payload, Integer schemaId, boolean isKey) { if (payload == null) { return null; + } + else if (this.awsKafkaDeserializer != null) { + return this.awsKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); + } else if (schemaId != null) { try { diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 98f05ad94..277389c69 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -471,7 +471,8 @@ private Record newRecord(ConsumerRecord record, String clusterId this.customDeserializerRepository.getAvroToJsonDeserializer(clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(clusterId)), - topic + topic, + schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsKafkaDeserializer(clusterId): null )); } @@ -490,7 +491,8 @@ private Record newRecord(ConsumerRecord record, BaseOptions opti this.customDeserializerRepository.getAvroToJsonDeserializer(options.clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)), - topic + topic, + schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsKafkaDeserializer(options.getClusterId()): null )); } diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 1827bde42..688c42627 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -1,5 +1,8 @@ package org.akhq.repositories; +import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import com.fasterxml.jackson.databind.DeserializationFeature; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.rest.RestService; @@ -23,6 +26,9 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.apache.kafka.common.serialization.StringDeserializer; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; + import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.*; @@ -38,6 +44,8 @@ public class SchemaRegistryRepository extends AbstractRepository { private final Map kafkaAvroDeserializers = new HashMap<>(); private final Map kafkaJsonDeserializers = new HashMap<>(); private final Map kafkaProtoDeserializers = new HashMap<>(); + private final Map awsKafkaDeserializers = new HashMap<>(); + public PagedList list(String clusterId, Pagination pagination, Optional search, List filters) throws IOException, RestClientException, ExecutionException, InterruptedException { return PagedList.of(all(clusterId, search, filters), pagination, list -> this.toSchemasLatestVersion(list, clusterId)); @@ -310,6 +318,22 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) { } return schemaRegistryType; } + public Deserializer getAwsKafkaDeserializer(String clusterId) { + + if (!this.awsKafkaDeserializers.containsKey(clusterId)){ + Map params = new HashMap<>(); + params.put(AWSSchemaRegistryConstants.REGISTRY_NAME,"MetisSchemaRegistry" ); + params.put(AWSSchemaRegistryConstants.AWS_REGION,"eu-west-2" ); + params.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + params.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, StringDeserializer.class.getName()); + Map otherProps = kafkaModule.getConnection(clusterId).getProperties(); + if (otherProps != null) { + params.putAll(otherProps); + } + this.awsKafkaDeserializers.put(clusterId, new AWSKafkaAvroDeserializer(DefaultCredentialsProvider.builder().build(), params)); + } + return this.awsKafkaDeserializers.get(clusterId); + } static { JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); From 39813c7ef90cd98c7a8b1c9c4d88404fe990d276 Mon Sep 17 00:00:00 2001 From: apatra Date: Fri, 12 Jan 2024 11:28:13 +0530 Subject: [PATCH 02/12] refer properties from SchemaRegistry config --- .../java/org/akhq/repositories/SchemaRegistryRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 688c42627..86015d00a 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -326,7 +326,7 @@ public Deserializer getAwsKafkaDeserializer(String clusterId) { params.put(AWSSchemaRegistryConstants.AWS_REGION,"eu-west-2" ); params.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); params.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, StringDeserializer.class.getName()); - Map otherProps = kafkaModule.getConnection(clusterId).getProperties(); + Map otherProps = kafkaModule.getConnection(clusterId).getSchemaRegistry().getProperties(); if (otherProps != null) { params.putAll(otherProps); } From dc3de45d245bd555cc32f3459d54819110ceecf9 Mon Sep 17 00:00:00 2001 From: apatra Date: Fri, 12 Jan 2024 11:38:00 +0530 Subject: [PATCH 03/12] remove hard-coded schema registry name --- .../akhq/repositories/SchemaRegistryRepository.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 86015d00a..767ea3416 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -321,14 +321,17 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) { public Deserializer getAwsKafkaDeserializer(String clusterId) { if (!this.awsKafkaDeserializers.containsKey(clusterId)){ + Connection.SchemaRegistry schemaRegistry = kafkaModule.getConnection(clusterId).getSchemaRegistry(); Map params = new HashMap<>(); - params.put(AWSSchemaRegistryConstants.REGISTRY_NAME,"MetisSchemaRegistry" ); - params.put(AWSSchemaRegistryConstants.AWS_REGION,"eu-west-2" ); + params.put(AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistry.getGlueSchemaRegistryName()); + params.put(AWSSchemaRegistryConstants.AWS_REGION,schemaRegistry.getAwsRegion()); params.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + + // Adding secondary deserializer so that messages that aren't serialized using avro,proto or json are deserialized using StringDeserializer params.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, StringDeserializer.class.getName()); - Map otherProps = kafkaModule.getConnection(clusterId).getSchemaRegistry().getProperties(); - if (otherProps != null) { - params.putAll(otherProps); + Map otherParams = schemaRegistry.getProperties(); + if (otherParams != null) { + params.putAll(otherParams); } this.awsKafkaDeserializers.put(clusterId, new AWSKafkaAvroDeserializer(DefaultCredentialsProvider.builder().build(), params)); } From ea59f5b9536eb3ba77c98da4cd13b50f56de0e07 Mon Sep 17 00:00:00 2001 From: apatra Date: Fri, 12 Jan 2024 11:54:35 +0530 Subject: [PATCH 04/12] rename awsKafkaDeserializer to awsGlueKafkaDeserializer --- src/main/java/org/akhq/models/Record.java | 14 +++++++------- .../org/akhq/repositories/RecordRepository.java | 4 ++-- .../repositories/SchemaRegistryRepository.java | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index c66faf688..813bf6a94 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -83,10 +83,10 @@ public class Record { private byte MAGIC_BYTE; private Boolean truncated; - private Deserializer awsKafkaDeserializer; + private Deserializer awsGlueKafkaDeserializer; - public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic, Deserializer awsKafkaDeserializer) { + public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic, Deserializer awsGlueKafkaDeserializer) { this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; this.partition = record.partition(); @@ -100,12 +100,12 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.valueSubject = getAvroSchemaSubject(this.valueSchemaId); this.headers = headers; this.truncated = false; - this.awsKafkaDeserializer = awsKafkaDeserializer; + this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; } public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer, - ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsKafkaDeserializer) { + ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) { if (schemaRegistryType == SchemaRegistryType.TIBCO) { this.MAGIC_BYTE = (byte) 0x80; } else { @@ -135,7 +135,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; this.truncated = false; - this.awsKafkaDeserializer = awsKafkaDeserializer; + this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; } public String getKey() { @@ -179,8 +179,8 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey) if (payload == null) { return null; } - else if (this.awsKafkaDeserializer != null) { - return this.awsKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); + else if (this.awsGlueKafkaDeserializer != null) { + return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); } else if (schemaId != null) { try { diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 277389c69..511b8df2d 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -472,7 +472,7 @@ private Record newRecord(ConsumerRecord record, String clusterId avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(clusterId)), topic, - schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsKafkaDeserializer(clusterId): null + schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsGlueKafkaDeserializer(clusterId): null )); } @@ -492,7 +492,7 @@ private Record newRecord(ConsumerRecord record, BaseOptions opti avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)), topic, - schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsKafkaDeserializer(options.getClusterId()): null + schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsGlueKafkaDeserializer(options.getClusterId()): null )); } diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 767ea3416..26b5e853e 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -1,6 +1,6 @@ package org.akhq.repositories; -import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -44,7 +44,7 @@ public class SchemaRegistryRepository extends AbstractRepository { private final Map kafkaAvroDeserializers = new HashMap<>(); private final Map kafkaJsonDeserializers = new HashMap<>(); private final Map kafkaProtoDeserializers = new HashMap<>(); - private final Map awsKafkaDeserializers = new HashMap<>(); + private final Map awsGlueKafkaDeserializers = new HashMap<>(); public PagedList list(String clusterId, Pagination pagination, Optional search, List filters) throws IOException, RestClientException, ExecutionException, InterruptedException { @@ -318,9 +318,9 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) { } return schemaRegistryType; } - public Deserializer getAwsKafkaDeserializer(String clusterId) { + public Deserializer getAwsGlueKafkaDeserializer(String clusterId) { - if (!this.awsKafkaDeserializers.containsKey(clusterId)){ + if (!this.awsGlueKafkaDeserializers.containsKey(clusterId)){ Connection.SchemaRegistry schemaRegistry = kafkaModule.getConnection(clusterId).getSchemaRegistry(); Map params = new HashMap<>(); params.put(AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistry.getGlueSchemaRegistryName()); @@ -333,9 +333,9 @@ public Deserializer getAwsKafkaDeserializer(String clusterId) { if (otherParams != null) { params.putAll(otherParams); } - this.awsKafkaDeserializers.put(clusterId, new AWSKafkaAvroDeserializer(DefaultCredentialsProvider.builder().build(), params)); + this.awsGlueKafkaDeserializers.put(clusterId, new GlueSchemaRegistryKafkaDeserializer(DefaultCredentialsProvider.builder().build(), params)); } - return this.awsKafkaDeserializers.get(clusterId); + return this.awsGlueKafkaDeserializers.get(clusterId); } static { From 7645bc3d0978d8cf8e71f222dcede6fa116fbfa7 Mon Sep 17 00:00:00 2001 From: arindampatra33 <65240174+arindampatra33@users.noreply.github.com> Date: Fri, 16 Feb 2024 09:31:42 +0530 Subject: [PATCH 05/12] Update Record.java remove @JsonIgnore for topic . This is required to fix the tail functionality --- src/main/java/org/akhq/models/Record.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index a03dbfb82..c17f50e90 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -36,7 +36,6 @@ @Getter @NoArgsConstructor public class Record { - @JsonIgnore private Topic topic; private int partition; private long offset; From 15a480d4e4cf735b0acf77315ceaf8b7a1d706a7 Mon Sep 17 00:00:00 2001 From: apatra Date: Sat, 17 Feb 2024 13:42:36 +0530 Subject: [PATCH 06/12] get key/value schemaid for glue --- client/src/containers/SideBar/Sidebar.jsx | 5 ++- client/src/containers/Topic/Topic/Topic.jsx | 2 + .../Topic/Topic/TopicData/TopicData.jsx | 13 +++--- client/src/utils/Routes.js | 11 ++++- .../org/akhq/controllers/AkhqController.java | 2 + src/main/java/org/akhq/models/Record.java | 45 ++++++++++++------- .../models/SchemaRegistryClientWrapper.java | 2 + 7 files changed, 55 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/akhq/models/SchemaRegistryClientWrapper.java diff --git a/client/src/containers/SideBar/Sidebar.jsx b/client/src/containers/SideBar/Sidebar.jsx index 0d8484370..907b49f43 100644 --- a/client/src/containers/SideBar/Sidebar.jsx +++ b/client/src/containers/SideBar/Sidebar.jsx @@ -21,6 +21,7 @@ class Sidebar extends Component { showConnects: false, showKsqlDBs: false, enableRegistry: false, + registryType: '', enableConnect: false, enableKsqlDB: false, roles: JSON.parse(sessionStorage.getItem('roles')), @@ -95,6 +96,7 @@ class Sidebar extends Component { const enableKsqlDB = cluster.ksqldbs !== undefined; let newState = { enableRegistry: cluster.registry, + registryType: cluster.registryType, enableConnect: enableConnects, allConnects: [], selectedConnect: '', @@ -284,6 +286,7 @@ class Sidebar extends Component { selectedTab, height, enableRegistry, + registryType, enableConnect, enableKsqlDB } = this.state; @@ -354,7 +357,7 @@ class Sidebar extends Component { roles.ACL && roles.ACL.includes('READ') && this.renderMenuItem('fa fa-fw fa-key', constants.ACLS, 'ACLS')} - {enableRegistry && + {enableRegistry && registryType !== 'GLUE' && roles && roles.SCHEMA && roles.SCHEMA.includes('READ') && diff --git a/client/src/containers/Topic/Topic/Topic.jsx b/client/src/containers/Topic/Topic/Topic.jsx index 1ff1dc2f2..4df489e94 100644 --- a/client/src/containers/Topic/Topic/Topic.jsx +++ b/client/src/containers/Topic/Topic/Topic.jsx @@ -18,6 +18,7 @@ class Topic extends Root { state = { clusterId: this.props.clusterId, topicId: this.props.topicId, + registryType: this.props.registryType, topic: {}, selectedTab: '', showDeleteModal: false, @@ -210,6 +211,7 @@ class Topic extends Root { location={location} isAllTopicDataSelected={this.state.isAllTopicDataSelected} onSelectAllCheckboxChange={this._handleSelectAllCheckboxChange} + registryType={this.state.registryType} /> ); case 'partitions': diff --git a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx index a60f2123f..11c41bfb5 100644 --- a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx +++ b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx @@ -68,6 +68,7 @@ class TopicData extends Root { deleteMessage: '', compactMessageToDelete: '', selectedCluster: this.props.clusterId, + registryType: this.props.registryType, selectedTopic: this.props.topicId, cleanupPolicy: '', datetime: '', @@ -115,11 +116,11 @@ class TopicData extends Root { const { clusterId, topicId } = this.props.match.params; const query = new URLSearchParams(this.props.location.search); const uiOptions = await getClusterUIOptions(clusterId); - this.setState( prevState => ({ selectedCluster: clusterId, selectedTopic: topicId, + registryType: this.props.registryType, sortBy: query.get('sort') ? query.get('sort') : uiOptions && uiOptions.topicData && uiOptions.topicData.sort @@ -517,7 +518,7 @@ class TopicData extends Root { partition: JSON.stringify(message.partition) || '', offset: JSON.stringify(message.offset) || '', headers: message.headers || [], - schema: { key: message.keySchemaId, value: message.valueSchemaId }, + schema: { key: message.keySchemaId, value: message.valueSchemaId, registryType: this.state.registryType}, exceptions: message.exceptions || [] }; @@ -1206,9 +1207,9 @@ class TopicData extends Root { {obj[col.accessor].key !== undefined && ( { + onClick={ obj[col.accessor].registryType !=="GLUE" ? () => { this._redirectToSchema(obj.schema.key); - }} + }: undefined} > Key: {obj[col.accessor].key} @@ -1217,9 +1218,9 @@ class TopicData extends Root { {obj[col.accessor].value !== undefined && ( { + onClick={ obj[col.accessor].registryType !=="GLUE" ? () => { this._redirectToSchema(obj.schema.value); - }} + }: undefined} > Value: {obj[col.accessor].value} diff --git a/client/src/utils/Routes.js b/client/src/utils/Routes.js index bbecaa534..fdd3f4ce2 100644 --- a/client/src/utils/Routes.js +++ b/client/src/utils/Routes.js @@ -180,7 +180,16 @@ class Routes extends Root { )} {roles && roles.TOPIC && roles.TOPIC.includes('READ') && ( - + ( + el.id === clusterId).registryType} + {...props} + /> + )} + /> )} {roles && roles.TOPIC && roles.TOPIC_DATA.includes('READ') && ( diff --git a/src/main/java/org/akhq/controllers/AkhqController.java b/src/main/java/org/akhq/controllers/AkhqController.java index b716cc39a..3c94a4b67 100644 --- a/src/main/java/org/akhq/controllers/AkhqController.java +++ b/src/main/java/org/akhq/controllers/AkhqController.java @@ -73,6 +73,7 @@ public List list() { .map(connection -> new ClusterDefinition( connection.getName(), connection.getSchemaRegistry() != null, + connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getType().name() : null, (connection.getConnect() != null ? connection.getConnect() : new ArrayList()) .stream() .map(Connect::getName) @@ -279,6 +280,7 @@ public static class AuthPermissions { public static class ClusterDefinition { private String id; private boolean registry; + private String registryType; private List connects; private List ksqldbs; } diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index a03dbfb82..2c1fe7c0d 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -1,5 +1,7 @@ package org.akhq.models; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerDataParser; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.Message; @@ -36,16 +38,15 @@ @Getter @NoArgsConstructor public class Record { - @JsonIgnore private Topic topic; private int partition; private long offset; private ZonedDateTime timestamp; @JsonIgnore private TimestampType timestampType; - private Integer keySchemaId; + private String keySchemaId; private String keySubject; - private Integer valueSchemaId; + private String valueSchemaId; private String valueSubject; private List> headers = new ArrayList<>(); @JsonIgnore @@ -90,7 +91,6 @@ public class Record { @JsonIgnore private Deserializer awsGlueKafkaDeserializer; - public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic, Deserializer awsGlueKafkaDeserializer) { this.MAGIC_BYTE = schemaRegistryType.getMagicByte(); this.topic = topic; @@ -98,6 +98,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.offset = record.offset(); this.timestamp = ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault()); this.bytesKey = bytesKey; + this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; this.keySchemaId = getAvroSchemaId(this.bytesKey); this.keySubject = getAvroSchemaSubject(this.keySchemaId); this.bytesValue = bytesValue; @@ -105,7 +106,6 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.valueSubject = getAvroSchemaSubject(this.valueSchemaId); this.headers = headers; this.truncated = false; - this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; } public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, @@ -123,6 +123,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.timestamp = ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault()); this.timestampType = record.timestampType(); this.bytesKey = record.key(); + this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; this.keySchemaId = getAvroSchemaId(this.bytesKey); this.keySubject = getAvroSchemaSubject(this.keySchemaId); this.bytesValue = bytesValue; @@ -140,7 +141,6 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; this.truncated = false; - this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer; } public String getKey() { @@ -180,20 +180,18 @@ public void setTruncated(Boolean truncated) { this.truncated = truncated; } - private String convertToString(byte[] payload, Integer schemaId, boolean isKey) { + private String convertToString(byte[] payload, String schemaId, boolean isKey) { if (payload == null) { return null; - } - else if (this.awsGlueKafkaDeserializer != null) { - return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); - } else if (schemaId != null) { try { Object toType = null; - + if (this.awsGlueKafkaDeserializer != null) { + return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); + } if (client != null) { - ParsedSchema schema = client.getSchemaById(schemaId); + ParsedSchema schema = client.getSchemaById(Integer.valueOf(schemaId)); if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) { toType = kafkaProtoDeserializer.deserialize(topic.getName(), payload); if (!(toType instanceof Message)) { @@ -305,17 +303,26 @@ public Collection getHeadersValues() { .collect(Collectors.toList()); } - private Integer getAvroSchemaId(byte[] payload) { + private String getAvroSchemaId(byte[] payload) { if (topic.isInternalTopic()) { return null; } try { + + if (awsGlueKafkaDeserializer!= null) { + ByteBuffer byteBuffer = ByteBuffer.wrap(payload); + GlueSchemaRegistryDeserializerDataParser dataParser = GlueSchemaRegistryDeserializerDataParser.getInstance(); + + UUID schemaVersionId = dataParser.getSchemaVersionId(byteBuffer); + return schemaVersionId.toString(); + } + ByteBuffer buffer = ByteBuffer.wrap(payload); byte magicBytes = buffer.get(); int schemaId = buffer.getInt(); if (magicBytes == MAGIC_BYTE && schemaId >= 0) { - return schemaId; + return String.valueOf(schemaId); } } catch (Exception ignore) { @@ -323,12 +330,16 @@ private Integer getAvroSchemaId(byte[] payload) { return null; } - private String getAvroSchemaSubject(Integer schemaId) { + private String getAvroSchemaSubject(String schemaId) { if (schemaId == null || client == null) { return null; } try { - ParsedSchema schemaById = client.getSchemaById(schemaId); + if(awsGlueKafkaDeserializer!= null) { + return ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer).getGlueSchemaRegistryDeserializationFacade().getSchemaRegistryClient().getSchemaVersionResponse(schemaId).schemaArn(); + } + + ParsedSchema schemaById = client.getSchemaById(Integer.valueOf(schemaId)); if (schemaById == null) { return null; } diff --git a/src/main/java/org/akhq/models/SchemaRegistryClientWrapper.java b/src/main/java/org/akhq/models/SchemaRegistryClientWrapper.java new file mode 100644 index 000000000..b5e3ce7a6 --- /dev/null +++ b/src/main/java/org/akhq/models/SchemaRegistryClientWrapper.java @@ -0,0 +1,2 @@ +package org.akhq.models;public class SchemaRegistryClientWrapper { +} From 6e0cc6925708e52820e91bd1826d9a7f99076373 Mon Sep 17 00:00:00 2001 From: apatra Date: Sat, 17 Feb 2024 14:00:51 +0530 Subject: [PATCH 07/12] take schema name from arn for glue --- src/main/java/org/akhq/models/Record.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 2c1fe7c0d..95cba9151 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -191,7 +191,7 @@ private String convertToString(byte[] payload, String schemaId, boolean isKey) { return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString(); } if (client != null) { - ParsedSchema schema = client.getSchemaById(Integer.valueOf(schemaId)); + ParsedSchema schema = client.getSchemaById(Integer.parseInt(schemaId)); if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) { toType = kafkaProtoDeserializer.deserialize(topic.getName(), payload); if (!(toType instanceof Message)) { @@ -336,10 +336,13 @@ private String getAvroSchemaSubject(String schemaId) { } try { if(awsGlueKafkaDeserializer!= null) { - return ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer).getGlueSchemaRegistryDeserializationFacade().getSchemaRegistryClient().getSchemaVersionResponse(schemaId).schemaArn(); + String[] schemaArnSplitted = ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer) + .getGlueSchemaRegistryDeserializationFacade() + .getSchemaRegistryClient().getSchemaVersionResponse(schemaId).schemaArn().split("/"); + return schemaArnSplitted[schemaArnSplitted.length-1]; } - ParsedSchema schemaById = client.getSchemaById(Integer.valueOf(schemaId)); + ParsedSchema schemaById = client.getSchemaById(Integer.parseInt(schemaId)); if (schemaById == null) { return null; } From dba223db276fc680bc759ee93e7a8a077d09bc48 Mon Sep 17 00:00:00 2001 From: apatra Date: Sat, 17 Feb 2024 15:18:05 +0530 Subject: [PATCH 08/12] lint fix and test case update --- client/public/index.html | 14 +++++----- client/src/containers/SideBar/Sidebar.jsx | 3 ++- .../Topic/Topic/TopicData/TopicData.jsx | 26 ++++++++++++++----- .../akhq/controllers/AkhqControllerTest.java | 1 + 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/client/public/index.html b/client/public/index.html index da468b57a..cd84d4d12 100644 --- a/client/public/index.html +++ b/client/public/index.html @@ -1,13 +1,13 @@ - - - - - - - + + + + + + +