Skip to content

Commit

Permalink
Experiment work stealing pools
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Jul 12, 2024
1 parent dc1875d commit a048f99
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 38 deletions.
4 changes: 4 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public class Metrics {
*/
public static final String PENDING_STREAMS = ".pending.streams";

/**
* The number of HTTP/2 stream acquisitions steal count.
*/
public static final String STEAL_STREAMS = ".steal.streams";

// ByteBufAllocator Metrics
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,20 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
}

public InstrumentedPool<T> newPool(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory);
}

public InstrumentedPool<T> newPool(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
Expand All @@ -540,6 +554,21 @@ public InstrumentedPool<T> newPool(
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
}

public InstrumentedPool<T> newPool(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
PoolMetricsRecorder poolMetricsRecorder,
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
if (disposeTimeout != null) {
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
}
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Publisher<T> allocator,
Function<T, Publisher<Void>> destroyHandler,
Expand All @@ -552,11 +581,22 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
PoolBuilder<T, PoolConfig<T>> poolBuilder =
PoolBuilder.from(allocator)
.destroyHandler(destroyHandler)
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);
return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder);
}

PoolBuilder<T, PoolConfig<T>> newPoolInternal(
PoolBuilder<T, PoolConfig<T>> poolBuilder,
int maxConnections,
@Nullable AllocationStrategy<?> allocationStrategy,
Function<T, Publisher<Void>> destroyHandler,
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections;
allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy;
poolBuilder = poolBuilder
.destroyHandler(destroyHandler)
.maxPendingAcquire(pendingAcquireMaxCount)
.evictInBackground(evictionInterval);

if (this.evictionPredicate != null) {
poolBuilder = poolBuilder.evictionPredicate(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,23 @@ public interface Builder {
* @return {@code this}
*/
Builder minConnections(int minConnections);

/**
* Enables or disables work stealing mode for managing HTTP2 Connection Pools.
* <p>
* By default, a single Connection Pool is used by multiple Netty event loop threads.
* When work stealing is enabled, each Netty event loop will maintain its own
* HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
* pools using a work stealing strategy. This approach maximizes throughput and
* resource utilization in a multithreaded environment.
*
* @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes
* is starting to get some pendingg acquisitions request, then enable one more
* pool until all available pools are enabled).
*
* @return {@code this}
*/
Builder enableWorkStealing(boolean progressive);
}

/**
Expand All @@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() {
return new Http2AllocationStrategy.Build();
}

/**
* Creates a builder for {@link Http2AllocationStrategy} and initialize it
* with an existing strategy. This method can be used to create a mutated version
* of an existing strategy.
*
* @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
* allocation strategy.
*/
public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
return new Http2AllocationStrategy.Build(existing);
}

@Override
public Http2AllocationStrategy copy() {
return new Http2AllocationStrategy(this);
Expand Down Expand Up @@ -141,9 +170,14 @@ public void returnPermits(int returned) {
}
}

public boolean enableWorkStealing() {
return enableWorkStealing;
}

final long maxConcurrentStreams;
final int maxConnections;
final int minConnections;
final boolean enableWorkStealing;

volatile int permits;
static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
Expand All @@ -152,13 +186,15 @@ public void returnPermits(int returned) {
this.maxConcurrentStreams = build.maxConcurrentStreams;
this.maxConnections = build.maxConnections;
this.minConnections = build.minConnections;
this.enableWorkStealing = build.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Http2AllocationStrategy(Http2AllocationStrategy copy) {
this.maxConcurrentStreams = copy.maxConcurrentStreams;
this.maxConnections = copy.maxConnections;
this.minConnections = copy.minConnections;
this.enableWorkStealing = copy.enableWorkStealing;
PERMITS.lazySet(this, this.maxConnections);
}

Expand All @@ -170,6 +206,17 @@ static final class Build implements Builder {
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
int maxConnections = DEFAULT_MAX_CONNECTIONS;
int minConnections = DEFAULT_MIN_CONNECTIONS;
boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");

Build() {
}

Build(Http2AllocationStrategy existing) {
this.maxConcurrentStreams = existing.maxConcurrentStreams;
this.minConnections = existing.minConnections;
this.maxConnections = existing.maxConnections;
this.enableWorkStealing = existing.enableWorkStealing;
}

@Override
public Http2AllocationStrategy build() {
Expand Down Expand Up @@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) {
this.minConnections = minConnections;
return this;
}

@Override
public Builder enableWorkStealing(boolean progressive) {
this.enableWorkStealing = true;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.ClientTransportConfig;
Expand All @@ -51,13 +54,20 @@
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.getChannelContext;
Expand Down Expand Up @@ -565,12 +575,56 @@ static final class PooledConnectionAllocator {
this.config = (HttpClientConfig) config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));

Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;

if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
this.pool = id == null ?
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
}
else {
// Create one connection allocator (it will be shared by all Http2Pool instances)
Publisher<Connection> allocator = connectChannel();

List<Executor> execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
.limit(http2Strategy.maxConnections)
.collect(Collectors.toList());
Iterator<Executor> execsIter = execs.iterator();

MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
AtomicInteger subPoolIndex = new AtomicInteger();

this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator,
(PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder) -> {
int index = subPoolIndex.getAndIncrement();
int minDiv = http2Strategy.minConnections / execs.size();
int minMod = http2Strategy.minConnections % execs.size();
int maxDiv = http2Strategy.maxConnections / execs.size();
int maxMod = http2Strategy.maxConnections % execs.size();

int minConn = index < minMod ? minDiv + 1 : minDiv;
int maxConn = index < maxMod ? maxDiv + 1 : maxDiv;

Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
.minConnections(minConn)
.maxConnections(maxConn)
.build();

InstrumentedPool<Connection> pool =
id == null ?
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
micrometerRecorder,
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
return Tuples.of(pool, execsIter.next());
});
}
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,26 @@ public Meter.Type getType() {
}
},

/**
* The number of HTTP/2 stream acquisition steal count.
*/
STEAL_STREAMS {
@Override
public String getName() {
return "reactor.netty.connection.provider.steal.streams";
}

@Override
public KeyName[] getKeyNames() {
return Http2ConnectionProviderMetersTags.values();
}

@Override
public Meter.Type getType() {
return Meter.Type.COUNTER;
}
},

/**
* The number of the idle connections in the connection pool.
*/
Expand Down
Loading

0 comments on commit a048f99

Please sign in to comment.