From e25537d74c43ba7542e5604c041632f21a5baf5c Mon Sep 17 00:00:00 2001 From: "oliver.ko" Date: Sat, 28 Dec 2024 11:07:18 +0900 Subject: [PATCH] Supports compression level setting fix: https://github.com/reactor/reactor-netty/issues/3244 --- .../Http2StreamBridgeServerHandler.java | 4 ++ .../reactor/netty/http/server/Http3Codec.java | 10 +++- .../http/server/Http3ServerOperations.java | 3 +- .../Http3StreamBridgeServerHandler.java | 4 ++ .../reactor/netty/http/server/HttpServer.java | 21 +++++++ .../netty/http/server/HttpServerConfig.java | 59 +++++++++++++++---- .../http/server/HttpServerOperations.java | 8 ++- .../netty/http/server/HttpTrafficHandler.java | 5 ++ .../http/server/SimpleCompressionHandler.java | 28 ++++++++- .../HttpCompressionClientServerTests.java | 43 ++++++++++++++ .../netty/http/server/HttpServerTests.java | 2 + 11 files changed, 166 insertions(+), 21 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java index 6adbb42d34..d143ce8347 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java @@ -62,6 +62,7 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler { final BiPredicate compress; + final int compressionLevel; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpServerFormDecoderProvider formDecoderProvider; @@ -84,6 +85,7 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler { Http2StreamBridgeServerHandler( @Nullable BiPredicate compress, + int compressionLevel, ServerCookieDecoder decoder, ServerCookieEncoder encoder, HttpServerFormDecoderProvider formDecoderProvider, @@ -94,6 +96,7 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler { @Nullable Duration readTimeout, @Nullable Duration requestTimeout) { this.compress = compress; + this.compressionLevel = compressionLevel; this.cookieDecoder = decoder; this.cookieEncoder = encoder; this.formDecoderProvider = formDecoderProvider; @@ -140,6 +143,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { listener, request, compress, + compressionLevel, connectionInfo, cookieDecoder, cookieEncoder, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3Codec.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3Codec.java index 5378628d01..e9b8a02500 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3Codec.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3Codec.java @@ -63,6 +63,7 @@ final class Http3Codec extends ChannelInitializer { final Function methodTagValue; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; + final int compressionLevel; final ChannelOperations.OnSetup opsFactory; final Duration readTimeout; final Duration requestTimeout; @@ -83,6 +84,7 @@ final class Http3Codec extends ChannelInitializer { @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -101,6 +103,7 @@ final class Http3Codec extends ChannelInitializer { this.methodTagValue = methodTagValue; this.metricsRecorder = metricsRecorder; this.minCompressionSize = minCompressionSize; + this.compressionLevel = compressionLevel; this.opsFactory = opsFactory; this.readTimeout = readTimeout; this.requestTimeout = requestTimeout; @@ -118,13 +121,13 @@ protected void initChannel(QuicStreamChannel channel) { p.addLast(NettyPipeline.H3ToHttp11Codec, new Http3FrameToHttpObjectCodec(true, validate)) .addLast(NettyPipeline.HttpTrafficHandler, - new Http3StreamBridgeServerHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, + new Http3StreamBridgeServerHandler(compressPredicate, compressionLevel, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, readTimeout, requestTimeout)); boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; if (alwaysCompress) { - p.addLast(NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + p.addLast(NettyPipeline.CompressionHandler, SimpleCompressionHandler.create(compressionLevel)); } ChannelOperations.addReactiveBridge(channel, opsFactory, listener); @@ -166,6 +169,7 @@ static ChannelHandler newHttp3ServerConnectionHandler( @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -173,7 +177,7 @@ static ChannelHandler newHttp3ServerConnectionHandler( boolean validate) { return new Http3ServerConnectionHandler( new Http3Codec(accessLogEnabled, accessLog, compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler, - httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, + httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue, validate)); } } \ No newline at end of file diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3ServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3ServerOperations.java index deaa4ac6ee..8db8597a96 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3ServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3ServerOperations.java @@ -43,6 +43,7 @@ final class Http3ServerOperations extends HttpServerOperations { ConnectionObserver listener, HttpRequest nettyRequest, @Nullable BiPredicate compressionPredicate, + int compressionLevel, ConnectionInfo connectionInfo, ServerCookieDecoder decoder, ServerCookieEncoder encoder, @@ -54,7 +55,7 @@ final class Http3ServerOperations extends HttpServerOperations { @Nullable Duration requestTimeout, boolean secured, ZonedDateTime timestamp) { - super(c, listener, nettyRequest, compressionPredicate, connectionInfo, decoder, encoder, formDecoderProvider, + super(c, listener, nettyRequest, compressionPredicate, compressionLevel, connectionInfo, decoder, encoder, formDecoderProvider, httpMessageLogFactory, isHttp2, mapHandle, readTimeout, requestTimeout, secured, timestamp, true); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java index ff0cb00a85..2e386f4dfe 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java @@ -54,6 +54,7 @@ final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler { final BiPredicate compress; + final int compressionLevel; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpServerFormDecoderProvider formDecoderProvider; @@ -74,6 +75,7 @@ final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler { Http3StreamBridgeServerHandler( @Nullable BiPredicate compress, + int compressionLevel, ServerCookieDecoder decoder, ServerCookieEncoder encoder, HttpServerFormDecoderProvider formDecoderProvider, @@ -84,6 +86,7 @@ final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler { @Nullable Duration readTimeout, @Nullable Duration requestTimeout) { this.compress = compress; + this.compressionLevel = compressionLevel; this.cookieDecoder = decoder; this.cookieEncoder = encoder; this.formDecoderProvider = formDecoderProvider; @@ -131,6 +134,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { listener, request, compress, + compressionLevel, connectionInfo, cookieDecoder, cookieEncoder, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java index b8128dde7e..6e12527a4b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java @@ -35,6 +35,7 @@ import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.util.internal.ObjectUtil; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; @@ -301,15 +302,35 @@ public final HttpServer compress(BiPredicate uriTagValue() { int maxKeepAliveRequests; Function methodTagValue; int minCompressionSize; + int compressionLevel; HttpProtocol[] protocols; int _protocols; ProxyProtocolSupportType proxyProtocolSupportType; @@ -344,6 +354,7 @@ public Function uriTagValue() { this.httpMessageLogFactory = ReactorNettyHttpMessageLogFactory.INSTANCE; this.maxKeepAliveRequests = -1; this.minCompressionSize = -1; + this.compressionLevel = 6; this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11}; this._protocols = h11; this.proxyProtocolSupportType = ProxyProtocolSupportType.OFF; @@ -368,6 +379,7 @@ public Function uriTagValue() { this.maxKeepAliveRequests = parent.maxKeepAliveRequests; this.methodTagValue = parent.methodTagValue; this.minCompressionSize = parent.minCompressionSize; + this.compressionLevel = parent.compressionLevel; this.protocols = parent.protocols; this._protocols = parent._protocols; this.proxyProtocolSupportType = parent.proxyProtocolSupportType; @@ -492,6 +504,7 @@ static void addStreamHandlers(Channel ch, @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -502,14 +515,14 @@ static void addStreamHandlers(Channel ch, } pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, - new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider, + new Http2StreamBridgeServerHandler(compressPredicate, compressionLevel, decoder, encoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, readTimeout, requestTimeout)); boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; if (alwaysCompress) { - pipeline.addLast(NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + pipeline.addLast(NettyPipeline.CompressionHandler, SimpleCompressionHandler.create(compressionLevel)); } ChannelOperations.addReactiveBridge(ch, opsFactory, listener); @@ -605,6 +618,7 @@ static void configureHttp3Pipeline( @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -614,7 +628,7 @@ static void configureHttp3Pipeline( p.addLast(NettyPipeline.HttpCodec, newHttp3ServerConnectionHandler(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, - listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, + listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue, validate)); if (metricsRecorder != null) { @@ -640,6 +654,7 @@ static void configureH2Pipeline(ChannelPipeline p, @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -676,7 +691,7 @@ static void configureH2Pipeline(ChannelPipeline p, .addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, - mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue))); + mapHandle, methodTagValue, metricsRecorder, minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue))); IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout); @@ -710,6 +725,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -728,7 +744,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, - minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); + minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); @@ -743,7 +759,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, NettyPipeline.H2CUpgradeHandler, h2cUpgradeHandler) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, - new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, + new HttpTrafficHandler(compressPredicate, compressionLevel, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, readTimeout, requestTimeout, decoder.validateHeaders())); @@ -754,7 +770,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; if (alwaysCompress) { - p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, SimpleCompressionHandler.create(compressionLevel)); } if (metricsRecorder != null) { @@ -798,6 +814,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @Nullable Function uriTagValue) { @@ -814,7 +831,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, new HttpServerCodec(decoderConfig)) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, - new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, + new HttpTrafficHandler(compressPredicate, compressionLevel, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, readTimeout, requestTimeout, decoder.validateHeaders())); @@ -825,7 +842,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; if (alwaysCompress) { - p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler()); + p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, SimpleCompressionHandler.create(compressionLevel)); } if (metricsRecorder != null) { @@ -1008,6 +1025,7 @@ static final class H2Codec extends ChannelInitializer { final Function methodTagValue; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; + final int compressionLevel; final ChannelOperations.OnSetup opsFactory; final Duration readTimeout; final Duration requestTimeout; @@ -1027,6 +1045,7 @@ static final class H2Codec extends ChannelInitializer { @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -1044,6 +1063,7 @@ static final class H2Codec extends ChannelInitializer { this.methodTagValue = methodTagValue; this.metricsRecorder = metricsRecorder; this.minCompressionSize = minCompressionSize; + this.compressionLevel = compressionLevel; this.opsFactory = opsFactory; this.readTimeout = readTimeout; this.requestTimeout = requestTimeout; @@ -1055,7 +1075,7 @@ protected void initChannel(Channel ch) { ch.pipeline().remove(this); addStreamHandlers(ch, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, - minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); + minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue); } } @@ -1078,6 +1098,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final Function methodTagValue; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; + final int compressionLevel; final ChannelOperations.OnSetup opsFactory; final Duration readTimeout; final Duration requestTimeout; @@ -1100,6 +1121,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer @Nullable Function methodTagValue, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + int compressionLevel, ChannelOperations.OnSetup opsFactory, @Nullable Duration readTimeout, @Nullable Duration requestTimeout, @@ -1140,6 +1162,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.methodTagValue = methodTagValue; this.metricsRecorder = metricsRecorder; this.minCompressionSize = minCompressionSize; + this.compressionLevel = compressionLevel; this.opsFactory = opsFactory; this.readTimeout = readTimeout; this.requestTimeout = requestTimeout; @@ -1154,7 +1177,7 @@ protected void initChannel(Channel ch) { ch.pipeline().remove(this); addStreamHandlers(ch, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, methodTagValue, - metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); + metricsRecorder, minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue); } @Override @@ -1213,6 +1236,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final Function methodTagValue; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; + final int compressionLevel; final ChannelOperations.OnSetup opsFactory; final Duration readTimeout; final Duration requestTimeout; @@ -1243,6 +1267,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.methodTagValue = initializer.methodTagValue; this.metricsRecorder = initializer.metricsRecorder; this.minCompressionSize = initializer.minCompressionSize; + this.compressionLevel = initializer.compressionLevel; this.opsFactory = initializer.opsFactory; this.readTimeout = initializer.readTimeout; this.requestTimeout = initializer.requestTimeout; @@ -1261,7 +1286,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, - listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, + listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize, compressionLevel, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); return; } @@ -1269,7 +1294,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (!supportOnlyHttp2 && ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, true, decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, - mapHandle, maxKeepAliveRequests, methodTagValue, metricsRecorder, minCompressionSize, readTimeout, requestTimeout, uriTagValue); + mapHandle, maxKeepAliveRequests, methodTagValue, metricsRecorder, minCompressionSize, compressionLevel, readTimeout, requestTimeout, uriTagValue); // When the server is configured with HTTP/1.1 and H2 and HTTP/1.1 is negotiated, // when channelActive event happens, this HttpTrafficHandler is still not in the pipeline, @@ -1302,6 +1327,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final Function methodTagValue; final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; + final int compressionLevel; final ChannelOperations.OnSetup opsFactory; final int protocols; final ProxyProtocolSupportType proxyProtocolSupportType; @@ -1329,6 +1355,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.methodTagValue = config.methodTagValue; this.metricsRecorder = config.metricsRecorderInternal(); this.minCompressionSize = config.minCompressionSize; + this.compressionLevel = config.compressionLevel; this.opsFactory = config.channelOperationsProvider(); this.protocols = config._protocols; this.proxyProtocolSupportType = config.proxyProtocolSupportType; @@ -1383,6 +1410,7 @@ else if ((protocols & h11) == h11) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, readTimeout, requestTimeout, uriTagValue); @@ -1414,6 +1442,7 @@ else if ((protocols & h2) == h2) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, opsFactory, readTimeout, requestTimeout, @@ -1437,6 +1466,7 @@ else if ((protocols & h3) == h3) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, opsFactory, readTimeout, requestTimeout, @@ -1466,6 +1496,7 @@ else if ((protocols & h3) == h3) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, opsFactory, readTimeout, requestTimeout, @@ -1491,6 +1522,7 @@ else if ((protocols & h11) == h11) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, readTimeout, requestTimeout, uriTagValue); @@ -1514,6 +1546,7 @@ else if ((protocols & h2c) == h2c) { methodTagValue, metricsRecorder, minCompressionSize, + compressionLevel, opsFactory, readTimeout, requestTimeout, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index a420862efa..8af626c6c6 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -138,6 +138,7 @@ class HttpServerOperations extends HttpOperations compressionPredicate; + int compressionLevel; boolean isWebsocket; Function> paramsResolver; String path; @@ -151,6 +152,7 @@ class HttpServerOperations extends HttpOperations compressionPredicate, + int compressionLevel, ConnectionInfo connectionInfo, ServerCookieDecoder decoder, ServerCookieEncoder encoder, @@ -192,6 +195,7 @@ class HttpServerOperations extends HttpOperations compress; + final int compressionLevel; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpServerFormDecoderProvider formDecoderProvider; @@ -112,6 +113,7 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable HttpTrafficHandler( @Nullable BiPredicate compress, + int compressionLevel, ServerCookieDecoder decoder, ServerCookieEncoder encoder, HttpServerFormDecoderProvider formDecoderProvider, @@ -128,6 +130,7 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; this.compress = compress; + this.compressionLevel = compressionLevel; this.cookieEncoder = encoder; this.cookieDecoder = decoder; this.httpMessageLogFactory = httpMessageLogFactory; @@ -243,6 +246,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { listener, request, compress, + compressionLevel, connectionInfo, cookieDecoder, cookieEncoder, @@ -595,6 +599,7 @@ public void run() { listener, nextRequest, compress, + compressionLevel, connectionInfo, cookieDecoder, cookieEncoder, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/SimpleCompressionHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/SimpleCompressionHandler.java index ea52b3e054..3e552a1b27 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/SimpleCompressionHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/SimpleCompressionHandler.java @@ -19,7 +19,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.compression.CompressionOptions; +import io.netty.handler.codec.compression.StandardCompressionOptions; +import io.netty.handler.codec.compression.Zstd; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; @@ -28,6 +31,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.ObjectUtil; import java.util.ArrayList; import java.util.List; @@ -42,8 +46,28 @@ final class SimpleCompressionHandler extends HttpContentCompressor { boolean decoded; HttpRequest request; - SimpleCompressionHandler() { - super((CompressionOptions[]) null); + private SimpleCompressionHandler(CompressionOptions ... options) { + super(options); + } + + static SimpleCompressionHandler create(int compressionLevel) { + ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel"); + + List options = new ArrayList<>(); + options.add(StandardCompressionOptions.gzip(compressionLevel, 15, 8)); + options.add(StandardCompressionOptions.deflate(compressionLevel, 15, 8)); + options.add(StandardCompressionOptions.snappy()); + + if(Zstd.isAvailable()) { + options.add(StandardCompressionOptions.zstd(compressionLevel, 15, 8)); + } + if(Brotli.isAvailable()) { + options.add(StandardCompressionOptions.brotli()); + } + + return new SimpleCompressionHandler( + options.toArray(new CompressionOptions[0]) + ); } @Override diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java index 89bbe61c96..4aa226fa06 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java @@ -35,7 +35,10 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.compression.Brotli; +import io.netty.handler.codec.compression.ZlibCodecFactory; +import io.netty.handler.codec.compression.ZlibWrapper; import io.netty.handler.codec.compression.Zstd; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; @@ -718,4 +721,44 @@ void serverCompressionEnabledResponseCompressionDisabled(HttpServer server, Http .expectComplete() .verify(Duration.ofSeconds(10)); } + + @ParameterizedCompressionTest + void serverCompressionEnabledByCompressionLevel(HttpServer server, HttpClient client) { + disposableServer = + server.compress(true, 4) + .handle((in, out) -> out.sendString(Mono.just("reply"))) + .bindNow(Duration.ofSeconds(10)); + + //don't activate compression on the client options to avoid auto-handling (which removes the header) + Tuple2 resp = + //edit the header manually to attempt to trigger compression on server side + client.port(disposableServer.port()) + .headers(h -> h.add("accept-encoding", "gzip")) + .get() + .uri("/test") + .responseSingle((res, buf) -> buf.asByteArray() + .zipWith(Mono.just(res.responseHeaders()))) + .block(Duration.ofSeconds(10)); + + EmbeddedChannel embeddedChannel = new EmbeddedChannel( + ZlibCodecFactory.newZlibEncoder( + ZlibWrapper.GZIP, + 4, + 15, + 8 + ) + ); + + ByteBuf byteBuf = Unpooled.directBuffer(32); + byteBuf.writeBytes("reply".getBytes()); + + embeddedChannel.writeOutbound(byteBuf); + ByteBuf encodedByteBuf = embeddedChannel.readOutbound(); + + byte[] result = new byte[encodedByteBuf.readableBytes()]; + encodedByteBuf.getBytes(encodedByteBuf.readerIndex(), result); + + assertThat(resp).isNotNull(); + assertThat(resp.getT1()).startsWith(result); // Ignore the original data size and crc checksum comparison + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java index 7edbaaed3a..87e5e75c20 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -2214,6 +2214,7 @@ private void doTestStatus(HttpResponseStatus status) { ConnectionObserver.emptyListener(), new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"), null, + 6, new ConnectionInfo(localSocketAddress, DEFAULT_HOST_NAME, DEFAULT_HTTP_PORT, remoteSocketAddress, "http", true), ServerCookieDecoder.STRICT, ServerCookieEncoder.STRICT, @@ -3272,6 +3273,7 @@ private void doTestIsFormUrlencoded(String headerValue, boolean expectation) { ConnectionObserver.emptyListener(), request, null, + -1, new ConnectionInfo(localSocketAddress, DEFAULT_HOST_NAME, DEFAULT_HTTP_PORT, remoteSocketAddress, "http", true), ServerCookieDecoder.STRICT, ServerCookieEncoder.STRICT,