From ee5cee8c6cfbae098c1a808e7152c166867267fc Mon Sep 17 00:00:00 2001 From: Jeremy Kong Date: Fri, 11 Feb 2022 11:48:06 +0000 Subject: [PATCH] [PDS-243785] Count Attempted Hosts per DC (#5903) Cassandra host routing now distributes retry attempts between datacenters as evenly as possible. --- .../cassandra/pool/CassandraService.java | 18 ++++++++++++++---- .../cassandra/pool/CassandraServiceTest.java | 19 +++++++++++++++++++ changelog/@unreleased/pr-5903.v2.yml | 6 ++++++ 3 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 changelog/@unreleased/pr-5903.v2.yml diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java index c72d99f9f73..68e32dfec95 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraService.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -306,19 +307,28 @@ public Optional getRandomGoodHostForPredicate( Set hostsMatchingPredicate = pools.keySet().stream().filter(predicate).collect(Collectors.toSet()); - Set triedDatacenters = triedHosts.stream() + Map triedDatacenters = triedHosts.stream() .map(hostToDatacenter::get) .filter(Objects::nonNull) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + Optional maximumAttemptsPerDatacenter = + triedDatacenters.values().stream().max(Long::compareTo); + Set maximallyAttemptedDatacenters = KeyedStream.stream(triedDatacenters) + .filter(attempts -> Objects.equals( + attempts, + maximumAttemptsPerDatacenter.orElseThrow(() -> new SafeIllegalStateException( + "Unexpectedly could not find the max attempts per datacenter")))) + .keys() .collect(Collectors.toSet()); - Set hostsInUntriedDatacenters = hostsMatchingPredicate.stream() + Set hostsInPermittedDatacenters = hostsMatchingPredicate.stream() .filter(pool -> { String datacenter = hostToDatacenter.get(pool); - return datacenter == null || !triedDatacenters.contains(datacenter); + return datacenter == null || !maximallyAttemptedDatacenters.contains(datacenter); }) .collect(Collectors.toSet()); Set filteredHosts = - hostsInUntriedDatacenters.isEmpty() ? hostsMatchingPredicate : hostsInUntriedDatacenters; + hostsInPermittedDatacenters.isEmpty() ? hostsMatchingPredicate : hostsInPermittedDatacenters; if (filteredHosts.isEmpty()) { log.info("No hosts match the provided predicate."); diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java index c97b47b8f62..c3bd847ab11 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/pool/CassandraServiceTest.java @@ -47,6 +47,7 @@ public class CassandraServiceTest { private static final InetSocketAddress HOST_3 = InetSocketAddress.createUnresolved(HOSTNAME_3, DEFAULT_PORT); private static final String DC_1 = "london"; private static final String DC_2 = "singapore"; + private static final String DC_3 = "zurich"; private CassandraKeyValueServiceConfig config; private Blacklist blacklist; @@ -165,6 +166,24 @@ public void selectsHostsInAnotherDatacenter() { cassandra.getRandomGoodHostForPredicate(address -> true, ImmutableSet.of(HOST_1)), HOST_2); } + @Test + public void choosesTheHostInTheLeastAttemptedDatacenter() { + CassandraService cassandra = clientPoolWithServers(ImmutableSet.of(HOST_1, HOST_2, HOST_3)); + cassandra.overrideHostToDatacenterMapping(ImmutableMap.of(HOST_1, DC_1, HOST_2, DC_2, HOST_3, DC_1)); + assertContainerHasHost( + cassandra.getRandomGoodHostForPredicate(address -> true, ImmutableSet.of(HOST_1, HOST_2, HOST_3)), + HOST_2); + } + + @Test + public void distributesAttemptsWhenMultipleDatacentersAreLeastAttempted() { + CassandraService cassandra = clientPoolWithServers(ImmutableSet.of(HOST_1, HOST_2, HOST_3)); + cassandra.overrideHostToDatacenterMapping(ImmutableMap.of(HOST_1, DC_1, HOST_2, DC_2, HOST_3, DC_3)); + Set suggestedHosts = + getRecommendedHostsFromAThousandTrials(cassandra, ImmutableSet.of(HOST_1)); + assertThat(suggestedHosts).containsExactlyInAnyOrder(HOST_2, HOST_3); + } + @Test public void selectsAnyHostIfAllDatacentersAlreadyTried() { ImmutableSet allHosts = ImmutableSet.of(HOST_1, HOST_2); diff --git a/changelog/@unreleased/pr-5903.v2.yml b/changelog/@unreleased/pr-5903.v2.yml new file mode 100644 index 00000000000..9c8493bdc3e --- /dev/null +++ b/changelog/@unreleased/pr-5903.v2.yml @@ -0,0 +1,6 @@ +type: improvement +improvement: + description: Cassandra host routing now distributes retry attempts between datacenters + as evenly as possible. + links: + - https://github.com/palantir/atlasdb/pull/5903