Skip to content

Commit

Permalink
fix(webserver): added validations for the cluster name (#1104)
Browse files Browse the repository at this point in the history
close #1094

Co-authored-by: Ludovic DEHON <[email protected]>
  • Loading branch information
rdrck47 and tchiotludo authored May 26, 2022
1 parent 77233c5 commit 676d89a
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 30 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/akhq/controllers/ErrorController.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.micronaut.security.authentication.AuthorizationException;
import io.micronaut.security.rules.SecurityRule;
import lombok.extern.slf4j.Slf4j;
import org.akhq.modules.InvalidClusterException;
import org.apache.kafka.common.errors.ApiException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
Expand Down Expand Up @@ -108,4 +109,11 @@ public HttpResponse<?> notFound(HttpRequest<?> request) throws URISyntaxExceptio
return HttpResponse.<JsonError>notFound()
.body(error);
}
@Error(global = true)
public HttpResponse<?> error(HttpRequest<?> request, InvalidClusterException e) {
JsonError error = new JsonError(e.getMessage())
.link(Link.SELF, Link.of(request.getUri()));

return HttpResponse.status(HttpStatus.CONFLICT).body(error);
}
}
7 changes: 7 additions & 0 deletions src/main/java/org/akhq/modules/InvalidClusterException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.akhq.modules;

public class InvalidClusterException extends RuntimeException {
public InvalidClusterException(String message) {
super(message);
}
}
125 changes: 95 additions & 30 deletions src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.exceptions.HttpStatusException;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.akhq.configs.AbstractProperties;
Expand Down Expand Up @@ -46,7 +48,15 @@ public List<String> getClustersList() {
.collect(Collectors.toList());
}

public Connection getConnection(String cluster) {
public boolean clusterExists(String cluster){
return this.getClustersList().contains(cluster);
}

public Connection getConnection(String cluster) throws InvalidClusterException {
if (!this.clusterExists(cluster)) {
throw new InvalidClusterException("Invalid cluster '" + cluster + "'");
}

return this.connections
.stream()
.filter(r -> r.getName().equals(cluster))
Expand All @@ -60,30 +70,40 @@ private Properties getDefaultsProperties(List<? extends AbstractProperties> curr
current
.stream()
.filter(r -> r.getName().equals(type))
.forEach(r -> r.getProperties()
.forEach(properties::put)
);
.forEach(r -> properties.putAll(r.getProperties()));

return properties;
}

private Properties getConsumerProperties(String clusterId) {
private Properties getConsumerProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "consumer"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));

return props;
}

private Properties getProducerProperties(String clusterId) {
private Properties getProducerProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "producer"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));

return props;
}

private Properties getAdminProperties(String clusterId) {
private Properties getAdminProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "admin"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));
Expand All @@ -93,23 +113,35 @@ private Properties getAdminProperties(String clusterId) {

private final Map<String, AdminClient> adminClient = new HashMap<>();

public AdminClient getAdminClient(String clusterId) {
public AdminClient getAdminClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.adminClient.containsKey(clusterId)) {
this.adminClient.put(clusterId, AdminClient.create(this.getAdminProperties(clusterId)));
}

return this.adminClient.get(clusterId);
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId) {
public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

return new KafkaConsumer<>(
this.getConsumerProperties(clusterId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) {
public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = this.getConsumerProperties(clusterId);
props.putAll(properties);

Expand All @@ -122,7 +154,11 @@ public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties pr

private final Map<String, KafkaProducer<byte[], byte[]>> producers = new HashMap<>();

public KafkaProducer<byte[], byte[]> getProducer(String clusterId) {
public KafkaProducer<byte[], byte[]> getProducer(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.producers.containsKey(clusterId)) {
this.producers.put(clusterId, new KafkaProducer<>(
this.getProducerProperties(clusterId),
Expand All @@ -134,7 +170,11 @@ public KafkaProducer<byte[], byte[]> getProducer(String clusterId) {
return this.producers.get(clusterId);
}

public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
public AvroSchemaProvider getAvroSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
avroSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
Expand All @@ -143,27 +183,39 @@ public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
return avroSchemaProvider;
}

public JsonSchemaProvider getJsonSchemaProvider(String clusterId) {
public JsonSchemaProvider getJsonSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
jsonSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 1000)
));

return jsonSchemaProvider;
return jsonSchemaProvider;
}

public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) {
public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
protobufSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 1000)
));

return protobufSchemaProvider;
return protobufSchemaProvider;
}

public RestService getRegistryRestClient(String clusterId) {
public RestService getRegistryRestClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Connection connection = this.getConnection(clusterId);

if (connection.getSchemaRegistry() != null) {
Expand All @@ -172,12 +224,16 @@ public RestService getRegistryRestClient(String clusterId) {
);

if (connection.getSchemaRegistry().getProperties() != null
&& !connection.getSchemaRegistry().getProperties().isEmpty()) {
&& !connection.getSchemaRegistry().getProperties().isEmpty()) {

Map<String, Object> sslConfigs =
connection.getSchemaRegistry().getProperties().entrySet().stream()
.filter(e -> e.getKey().startsWith("schema.registry."))
.collect(Collectors.toMap(e -> e.getKey().substring("schema.registry.".length()), Map.Entry::getValue));
connection
.getSchemaRegistry()
.getProperties()
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith("schema.registry."))
.collect(Collectors.toMap(e -> e.getKey().substring("schema.registry.".length()), Map.Entry::getValue));

SslFactory sslFactory = new SslFactory(sslConfigs);
if (sslFactory != null && sslFactory.sslContext() != null) {
Expand Down Expand Up @@ -205,13 +261,18 @@ public RestService getRegistryRestClient(String clusterId) {
}
return restService;
}

return null;
}

private final Map<String, SchemaRegistryClient> registryClient = new HashMap<>();


public SchemaRegistryClient getRegistryClient(String clusterId) {
public SchemaRegistryClient getRegistryClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.registryClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

Expand All @@ -237,36 +298,40 @@ public SchemaRegistryClient getRegistryClient(String clusterId) {

private final Map<String, Map<String, KafkaConnectClient>> connectRestClient = new HashMap<>();

public Map<String, KafkaConnectClient> getConnectRestClient(String clusterId) {
public Map<String, KafkaConnectClient> getConnectRestClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.connectRestClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

if (connection.getConnect() != null && !connection.getConnect().isEmpty()) {

Map<String,KafkaConnectClient> mapConnects = new HashMap<>();
Map<String, KafkaConnectClient> mapConnects = new HashMap<>();
connection.getConnect().forEach(connect -> {

URIBuilder uri = URIBuilder.fromString(connect.getUrl().toString());
Configuration configuration = new Configuration(uri.toNormalizedURI(false).toString());

if (connect.getBasicAuthUsername() != null) {
configuration.useBasicAuth(
connect.getBasicAuthUsername(),
connect.getBasicAuthPassword()
connect.getBasicAuthUsername(),
connect.getBasicAuthPassword()
);
}

if (connect.getSslTrustStore() != null) {
configuration.useTrustStore(
new File(connect.getSslTrustStore()),
connect.getSslTrustStorePassword()
new File(connect.getSslTrustStore()),
connect.getSslTrustStorePassword()
);
}

if (connect.getSslKeyStore() != null) {
configuration.useKeyStore(
new File(connect.getSslKeyStore()),
connect.getSslKeyStorePassword()
new File(connect.getSslKeyStore()),
connect.getSslKeyStorePassword()
);
}
mapConnects.put(connect.getName(), new KafkaConnectClient(configuration));
Expand Down

0 comments on commit 676d89a

Please sign in to comment.