Skip to content

Commit

Permalink
Use Netty default allocator whenever it is pooled otherwise fallback …
Browse files Browse the repository at this point in the history
…to adaptive allocator.

Motivation:

Vert.x should use Netty's default allocator whenever possible in order to minimize the resources for pooled allocation (thread-local direct buffers, arenas).

Changes:

VertxByteBufAllocator.POOLED_ALLOCATOR reuses ByteBufAllocator.DEFAULT when it is pooled otherwise uses AdaptiveByteBufAllocator.DEFAULT.

TCP server/client should use VertxByteBufAllocator.POOLED_ALLOCATOR instead of PooledByteBufAllocator.DEFAULT.
  • Loading branch information
vietj committed Nov 5, 2024
1 parent cd5a3a2 commit 02316d5
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 22 deletions.
45 changes: 45 additions & 0 deletions vertx-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,51 @@
</includes>
</configuration>
</execution>
<execution>
<id>adaptive-allocator</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/buffer/TcpAllocationTest.java</include>
</includes>
<systemProperties>
<io.netty.allocator.type>adaptive</io.netty.allocator.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>pooled-allocator</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/buffer/TcpAllocationTest.java</include>
</includes>
<systemProperties>
<io.netty.allocator.type>pooled</io.netty.allocator.type>
</systemProperties>
</configuration>
</execution>
<execution>
<id>unpooled-allocator</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/buffer/TcpAllocationTest.java</include>
</includes>
<systemProperties>
<io.netty.allocator.type>unpooled</io.netty.allocator.type>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
*/
package io.vertx.core.impl.buffer;

import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.*;
import io.netty.util.internal.PlatformDependent;
import io.vertx.core.buffer.impl.VertxHeapByteBuf;
import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf;
Expand All @@ -24,7 +20,15 @@ public abstract class VertxByteBufAllocator extends AbstractByteBufAllocator {
/**
* Vert.x pooled allocator.
*/
public static final ByteBufAllocator POOLED_ALLOCATOR = new PooledByteBufAllocator(true);
public static final ByteBufAllocator POOLED_ALLOCATOR;

static {
ByteBufAllocator pooledAllocator = ByteBufAllocator.DEFAULT;
if (pooledAllocator instanceof UnpooledByteBufAllocator) {
pooledAllocator = new AdaptiveByteBufAllocator();
}
POOLED_ALLOCATOR = pooledAllocator;
}

/**
* Vert.x shared un-pooled allocator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.CloseSequence;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetClientInternal;
Expand Down Expand Up @@ -282,7 +282,7 @@ private void connectInternal2(ConnectOptions connectOptions,
Objects.requireNonNull(connectHandler, "No null connectHandler accepted");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.option(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);

SocketAddress remoteAddress = connectOptions.getRemoteAddress();
if (remoteAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.core.net.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
Expand All @@ -32,6 +31,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.tls.SslContextManager;
Expand Down Expand Up @@ -510,7 +510,7 @@ private void bind(
if (options.isSsl()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
} else {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, VertxByteBufAllocator.POOLED_ALLOCATOR);
}

bootstrap.childHandler(channelBalancer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@

package io.vertx.core.net.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -49,7 +46,11 @@ private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
* @return a buffer safe
*/
public static ByteBuf safeBuffer(ByteBuf byteBuf) {
if (byteBuf != Unpooled.EMPTY_BUFFER && (byteBuf.alloc() instanceof PooledByteBufAllocator || byteBuf instanceof CompositeByteBuf)) {
Class<?> allocClass;
if (byteBuf != Unpooled.EMPTY_BUFFER &&
((allocClass = byteBuf.alloc().getClass()) == AdaptiveByteBufAllocator.class
|| allocClass == PooledByteBufAllocator.class
|| byteBuf instanceof CompositeByteBuf)) {
try {
if (byteBuf.isReadable()) {
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes());
Expand Down
68 changes: 68 additions & 0 deletions vertx-core/src/test/java/io/vertx/it/buffer/TcpAllocationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.it.buffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;

public class TcpAllocationTest extends VertxTestBase {

@Test
public void testByteBufOriginateFromDefaultByteBufAllocator() {
NetServer server = vertx.createNetServer();
server.connectHandler(so -> {
NetSocketInternal soi = (NetSocketInternal) so;
soi.messageHandler(msg -> {
try {
ByteBuf bbuf = (ByteBuf) msg;
assertSame(VertxByteBufAllocator.POOLED_ALLOCATOR, bbuf.alloc());
} finally {
ReferenceCountUtil.release(msg);
}
testComplete();
});
});
server.listen(1234, "localhost").await();
NetClient client = vertx.createNetClient();
NetSocket so = client.connect(1234, "localhost").await();
so.write(Buffer.buffer("ping"));
await();
}

@Test
public void testByteBufCopyAndRelease() {
NetServer server = vertx.createNetServer();
server.connectHandler(so -> {
so.handler(buff -> {
ByteBuf byteBuf = ((BufferImpl)buff).byteBuf();
assertFalse(byteBuf.isDirect());
assertFalse(byteBuf.alloc().isDirectBufferPooled());
testComplete();
});
});
server.listen(1234, "localhost").await();
NetClient client = vertx.createNetClient();
NetSocket so = client.connect(1234, "localhost").await();
so.write(Buffer.buffer("ping"));
await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

package io.vertx.tests.buffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.*;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.buffer.impl.VertxHeapByteBuf;
import io.vertx.core.buffer.impl.VertxUnsafeHeapByteBuf;
import io.vertx.core.impl.buffer.VertxByteBufAllocator;
import io.vertx.core.internal.buffer.BufferInternal;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class VertxBufferTest {

Expand Down Expand Up @@ -62,4 +63,30 @@ public void testDuplicate() {
assertEquals(0, byteBuf.readerIndex());
}

@Test
public void testSafeBuffer() {
assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertCopyAndRelease(AdaptiveByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertCopyAndRelease(PooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertCopyAndRelease(new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 10).writeByte('A'));
assertWrap(Unpooled.buffer().writeByte('A'));
assertWrap(VertxByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertWrap(VertxByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
assertWrap(UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeByte('A'));
assertWrap(UnpooledByteBufAllocator.DEFAULT.directBuffer().writeByte('A'));
}

private static void assertCopyAndRelease(ByteBuf bbuf) {
BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf);
assertNotSame(bbuf, buffer.byteBuf());
assertEquals(0, bbuf.refCnt());
}

private static void assertWrap(ByteBuf bbuf) {
BufferImpl buffer = (BufferImpl) BufferInternal.safeBuffer(bbuf);
assertSame(bbuf, buffer.byteBuf());
assertEquals(1, bbuf.refCnt());
bbuf.release();
}
}
6 changes: 2 additions & 4 deletions vertx-core/src/test/java/io/vertx/tests/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

package io.vertx.tests.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand Down Expand Up @@ -45,7 +44,6 @@
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.spi.tls.SslContextFactory;
import io.vertx.test.core.CheckingSender;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.netty.TestLoggerFactory;
Expand Down Expand Up @@ -3637,7 +3635,7 @@ private void testNetClientInternal_(HttpServerOptions options, boolean expectSSL
case 1:
assertTrue(obj instanceof LastHttpContent);
ByteBuf content = ((LastHttpContent) obj).content();
assertEquals(!expectSSL, content.isDirect());
assertTrue(content.isDirect());
assertEquals(1, content.refCnt());
String val = content.toString(StandardCharsets.UTF_8);
assertTrue(content.release());
Expand Down

0 comments on commit 02316d5

Please sign in to comment.