Skip to content

Commit

Permalink
Migrating to mongo client 4.x (#2493)
Browse files Browse the repository at this point in the history
* Migrating to mongo client 4.x

* Events that were tracking the waitqueuesize directly were removed, we use `ConnectionCheckOutStartedEvent`, `ConnectionCheckedOutEvent` and `ConnectionCheckOutFailedEvent` to track it instead, see: https://developer.mongodb.com/community/forums/t/java-monitoring-waitqueue-size/98845
  • Loading branch information
jonatan-ivanov authored Mar 15, 2021
1 parent 584d384 commit 30b3a33
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 58 deletions.
2 changes: 1 addition & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def VERSIONS = [
'org.latencyutils:LatencyUtils:latest.release',
'org.mockito:mockito-core:latest.release',
'org.mockito:mockito-inline:latest.release',
'org.mongodb:mongo-java-driver:latest.release',
'org.mongodb:mongodb-driver-sync:latest.release',
'org.slf4j:slf4j-api:1.7.+',
'org.springframework:spring-context:latest.release',
'org.testcontainers:junit-jupiter:latest.release',
Expand Down
2 changes: 1 addition & 1 deletion micrometer-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {

optionalApi 'com.squareup.okhttp3:okhttp'

optionalApi 'org.mongodb:mongo-java-driver'
optionalApi 'org.mongodb:mongodb-driver-sync'

optionalApi 'org.jooq:jooq'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micrometer.core.instrument.binder.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoClient;
import com.mongodb.event.*;
import io.micrometer.core.annotation.Incubating;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -68,4 +68,3 @@ private void timeCommand(CommandEvent event, String status, long elapsedTimeInNa
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.micrometer.core.instrument.binder.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoClient;
import com.mongodb.connection.ServerId;
import com.mongodb.event.*;
import io.micrometer.core.annotation.Incubating;
Expand All @@ -35,18 +35,19 @@
* {@link ConnectionPoolListener} for collecting connection pool metrics from {@link MongoClient}.
*
* @author Christophe Bornet
* @author Jonatan Ivanov
* @since 1.2.0
*/
@NonNullApi
@NonNullFields
@Incubating(since = "1.2.0")
public class MongoMetricsConnectionPoolListener extends ConnectionPoolListenerAdapter {
public class MongoMetricsConnectionPoolListener implements ConnectionPoolListener {

private static final String METRIC_PREFIX = "mongodb.driver.pool.";

private final Map<ServerId, AtomicInteger> poolSize = new ConcurrentHashMap<>();
private final Map<ServerId, AtomicInteger> checkedOutCount = new ConcurrentHashMap<>();
private final Map<ServerId, AtomicInteger> waitQueueSize = new ConcurrentHashMap<>();
private final Map<ServerId, AtomicInteger> poolSizes = new ConcurrentHashMap<>();
private final Map<ServerId, AtomicInteger> checkedOutCounts = new ConcurrentHashMap<>();
private final Map<ServerId, AtomicInteger> waitQueueSizes = new ConcurrentHashMap<>();
private final Map<ServerId, List<Meter>> meters = new ConcurrentHashMap<>();

private final MeterRegistry registry;
Expand All @@ -56,14 +57,14 @@ public MongoMetricsConnectionPoolListener(MeterRegistry registry) {
}

@Override
public void connectionPoolOpened(ConnectionPoolOpenedEvent event) {
public void connectionPoolCreated(ConnectionPoolCreatedEvent event) {
List<Meter> connectionMeters = new ArrayList<>();
connectionMeters.add(registerGauge(event.getServerId(), METRIC_PREFIX + "size",
"the current size of the connection pool, including idle and and in-use members", poolSize));
"the current size of the connection pool, including idle and and in-use members", poolSizes));
connectionMeters.add(registerGauge(event.getServerId(), METRIC_PREFIX + "checkedout",
"the count of connections that are currently in use", checkedOutCount));
"the count of connections that are currently in use", checkedOutCounts));
connectionMeters.add(registerGauge(event.getServerId(), METRIC_PREFIX + "waitqueuesize",
"the current size of the wait queue for a connection from the pool", waitQueueSize));
"the current size of the wait queue for a connection from the pool", waitQueueSizes));
meters.put(event.getServerId(), connectionMeters);
}

Expand All @@ -74,58 +75,61 @@ public void connectionPoolClosed(ConnectionPoolClosedEvent event) {
registry.remove(meter);
}
meters.remove(serverId);
poolSize.remove(serverId);
checkedOutCount.remove(serverId);
waitQueueSize.remove(serverId);
poolSizes.remove(serverId);
checkedOutCounts.remove(serverId);
waitQueueSizes.remove(serverId);
}

@Override
public void connectionCheckedOut(ConnectionCheckedOutEvent event) {
AtomicInteger gauge = checkedOutCount.get(event.getConnectionId().getServerId());
if (gauge != null) {
gauge.incrementAndGet();
public void connectionCheckOutStarted(ConnectionCheckOutStartedEvent event) {
AtomicInteger waitQueueSize = waitQueueSizes.get(event.getServerId());
if (waitQueueSize != null) {
waitQueueSize.incrementAndGet();
}
}

@Override
public void connectionCheckedIn(ConnectionCheckedInEvent event) {
AtomicInteger gauge = checkedOutCount.get(event.getConnectionId().getServerId());
if (gauge != null) {
gauge.decrementAndGet();
public void connectionCheckedOut(ConnectionCheckedOutEvent event) {
AtomicInteger checkedOutCount = checkedOutCounts.get(event.getConnectionId().getServerId());
if (checkedOutCount != null) {
checkedOutCount.incrementAndGet();
}

AtomicInteger waitQueueSize = waitQueueSizes.get(event.getConnectionId().getServerId());
if (waitQueueSize != null) {
waitQueueSize.decrementAndGet();
}
}

@SuppressWarnings("deprecation")
@Override
public void waitQueueEntered(ConnectionPoolWaitQueueEnteredEvent event) {
AtomicInteger gauge = waitQueueSize.get(event.getServerId());
if (gauge != null) {
gauge.incrementAndGet();
public void connectionCheckOutFailed(ConnectionCheckOutFailedEvent event) {
AtomicInteger waitQueueSize = waitQueueSizes.get(event.getServerId());
if (waitQueueSize != null) {
waitQueueSize.decrementAndGet();
}
}

@SuppressWarnings("deprecation")
@Override
public void waitQueueExited(ConnectionPoolWaitQueueExitedEvent event) {
AtomicInteger gauge = waitQueueSize.get(event.getServerId());
if (gauge != null) {
gauge.decrementAndGet();
public void connectionCheckedIn(ConnectionCheckedInEvent event) {
AtomicInteger checkedOutCount = checkedOutCounts.get(event.getConnectionId().getServerId());
if (checkedOutCount != null) {
checkedOutCount.decrementAndGet();
}
}

@Override
public void connectionAdded(ConnectionAddedEvent event) {
AtomicInteger gauge = poolSize.get(event.getConnectionId().getServerId());
if (gauge != null) {
gauge.incrementAndGet();
public void connectionCreated(ConnectionCreatedEvent event) {
AtomicInteger poolSize = poolSizes.get(event.getConnectionId().getServerId());
if (poolSize != null) {
poolSize.incrementAndGet();
}
}

@Override
public void connectionRemoved(ConnectionRemovedEvent event) {
AtomicInteger gauge = poolSize.get(event.getConnectionId().getServerId());
if (gauge != null) {
gauge.decrementAndGet();
public void connectionClosed(ConnectionClosedEvent event) {
AtomicInteger poolSize = poolSizes.get(event.getConnectionId().getServerId());
if (poolSize != null) {
poolSize.decrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package io.micrometer.core.instrument.binder.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import com.mongodb.event.ClusterListenerAdapter;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
Expand All @@ -34,6 +35,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand All @@ -51,15 +53,16 @@ class MongoMetricsCommandListenerTest extends AbstractMongoDbTest {
void setup() {
registry = new SimpleMeterRegistry();
clusterId = new AtomicReference<>();
MongoClientOptions options = MongoClientOptions.builder()
MongoClientSettings settings = MongoClientSettings.builder()
.addCommandListener(new MongoMetricsCommandListener(registry))
.addClusterListener(new ClusterListenerAdapter() {
.applyToClusterSettings(builder -> builder.hosts(singletonList(new ServerAddress(HOST, port))))
.applyToClusterSettings(builder -> builder.addClusterListener(new ClusterListener() {
@Override
public void clusterOpening(ClusterOpeningEvent event) {
clusterId.set(event.getClusterId().getValue());
}
}).build();
mongo = new MongoClient(new ServerAddress(HOST, port), options);
})).build();
mongo = MongoClients.create(settings);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package io.micrometer.core.instrument.binder.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.ConnectionPoolSettings;
Expand All @@ -31,13 +32,15 @@

import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/**
* Tests for {@link MongoMetricsConnectionPoolListener}.
*
* @author Christophe Bornet
* @author Jonatan Ivanov
*/
class MongoMetricsConnectionPoolListenerTest extends AbstractMongoDbTest {

Expand All @@ -46,17 +49,20 @@ class MongoMetricsConnectionPoolListenerTest extends AbstractMongoDbTest {
@Test
void shouldCreatePoolMetrics() {
AtomicReference<String> clusterId = new AtomicReference<>();
MongoClientOptions options = MongoClientOptions.builder()
.minConnectionsPerHost(2)
.addConnectionPoolListener(new MongoMetricsConnectionPoolListener(registry))
.addClusterListener(new ClusterListenerAdapter() {
MongoClientSettings settings = MongoClientSettings.builder()
.applyToConnectionPoolSettings(builder -> builder
.minSize(2)
.addConnectionPoolListener(new MongoMetricsConnectionPoolListener(registry))
)
.applyToClusterSettings(builder -> builder.hosts(singletonList(new ServerAddress(HOST, port))))
.applyToClusterSettings(builder -> builder.addClusterListener(new ClusterListener() {
@Override
public void clusterOpening(ClusterOpeningEvent event) {
clusterId.set(event.getClusterId().getValue());
}
})
}))
.build();
MongoClient mongo = new MongoClient(new ServerAddress(HOST, port), options);
MongoClient mongo = MongoClients.create(settings);

mongo.getDatabase("test")
.createCollection("testCol");
Expand All @@ -83,7 +89,7 @@ void whenConnectionCheckedInAfterPoolClose_thenNoExceptionThrown() {
ServerId serverId = new ServerId(new ClusterId(), new ServerAddress(HOST, port));
ConnectionId connectionId = new ConnectionId(serverId);
MongoMetricsConnectionPoolListener listener = new MongoMetricsConnectionPoolListener(registry);
listener.connectionPoolOpened(new ConnectionPoolOpenedEvent(serverId, ConnectionPoolSettings.builder().build()));
listener.connectionPoolCreated(new ConnectionPoolCreatedEvent(serverId, ConnectionPoolSettings.builder().build()));
listener.connectionCheckedOut(new ConnectionCheckedOutEvent(connectionId));
listener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
assertThatCode(() -> listener.connectionCheckedIn(new ConnectionCheckedInEvent(connectionId)))
Expand Down

0 comments on commit 30b3a33

Please sign in to comment.