Skip to content

Commit

Permalink
Introduce a configuration option to specify the expected number of co…
Browse files Browse the repository at this point in the history
…nnection pools a ConnectionProvider should create. Once the expected number is exceeded, a warning message is logged.
  • Loading branch information
jchenga committed Dec 29, 2024
1 parent 3c9728c commit 11bb896
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -394,6 +394,9 @@ interface AllocationStrategy<A extends AllocationStrategy<A>> {
final class Builder extends ConnectionPoolSpec<Builder> {

static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO;
static final int EXPECTED_CONNECTION_POOLS_DISABLED = -1;

int expectedConnectionPools = EXPECTED_CONNECTION_POOLS_DISABLED;

String name;
Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED;
Expand All @@ -417,6 +420,7 @@ private Builder(String name) {
this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval;
this.poolInactivity = copy.poolInactivity;
this.disposeTimeout = copy.disposeTimeout;
this.expectedConnectionPools = copy.expectedConnectionPools;
copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec)));
}

Expand Down Expand Up @@ -488,6 +492,18 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer<HostSpecif
return this;
}

/**
* Specifies the expected number of connection pools that the provider can create.
* If the number of connection pools created exceeds this value, a warning message is logged.
* The value must be positive; otherwise, the connection pools check is ignored.
*
* @param expectedConnectionPools the number of connection pools expected to be created.
* @return the current {@link Builder} instance with the updated configuration.
*/
public Builder expectedConnectionPools(int expectedConnectionPools) {
this.expectedConnectionPools = expectedConnectionPools;
return this;
}
/**
* Builds new ConnectionProvider.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -90,6 +91,8 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration inactivePoolDisposeInterval;
final Duration poolInactivity;
final Duration disposeTimeout;
final int expectedConnectionPools;
final AtomicInteger connectionPoolCount = new AtomicInteger(0);
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

Expand All @@ -104,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) {
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.expectedConnectionPools = builder.expectedConnectionPools;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
Expand Down Expand Up @@ -131,6 +135,13 @@ public final Mono<? extends Connection> acquire(
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}

if (expectedConnectionPools > Builder.EXPECTED_CONNECTION_POOLS_DISABLED && connectionPoolCount.incrementAndGet() > expectedConnectionPools) {
if (log.isWarnEnabled()) {
log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(),
expectedConnectionPools);
}
}

boolean metricsEnabled = poolFactory.metricsEnabled || config.metricsRecorder() != null;
String id = metricsEnabled ? poolKey.hashCode() + "" : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -119,6 +122,7 @@
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -142,6 +146,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Mockito.times;

/**
* This test class verifies {@link HttpClient}.
Expand Down Expand Up @@ -608,6 +613,97 @@ void sslExchangeRelativeGet() throws SSLException {
assertThat(responseString).isEqualTo("hello /foo");
}

@Test
void expectedConnectionPoolsEnabled() throws SSLException {
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").expectedConnectionPools(1).build();

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

Loggers.resetLoggerFactory();


Mockito.verify(spyLogger).warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1));
assertThat(argumentCaptor.getValue()).isEqualTo("Connection pool creation limit exceeded: {} pools created, maximum expected is {}");

connectionProvider.dispose();
disposableServer.dispose();

}

@Test
void expectedConnectionPoolsNotEnabled() throws SSLException {
Logger spyLogger = Mockito.spy(log);
Loggers.useCustomLoggers(s -> spyLogger);

SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.build();


disposableServer =
createServer()
.secure(ssl -> ssl.sslContext(sslServer))
.handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri())))
.bindNow();

ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").build();

StepVerifier.create(
Flux.range(1, 2)
.flatMap(i -> createClient(connectionProvider, disposableServer::address)
.secure(ssl -> ssl.sslContext(createClientSslContext()))
.get()
.uri("/foo")
.responseContent()
.aggregate()
.asString()))
.thenConsumeWhile(s -> true)
.verifyComplete();

Loggers.resetLoggerFactory();


Mockito.verify(spyLogger, times(0)).warn(Mockito.eq("Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), Mockito.eq(2), Mockito.eq(1));

connectionProvider.dispose();
disposableServer.dispose();

}

private SslContext createClientSslContext() {
try {
return SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}

@Test
void sslExchangeAbsoluteGet() throws SSLException {
SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
Expand Down

0 comments on commit 11bb896

Please sign in to comment.