From 1a1af941190be51f662b4aec794c8f770ed832a3 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 2 Feb 2023 07:45:35 -0500 Subject: [PATCH] fix #4788 moving retry logic into the standard layer --- CHANGELOG.md | 1 + .../client/http/StandardHttpClient.java | 147 ++++++++++++----- .../ExponentialBackoffIntervalCalculator.java | 19 ++- .../client/http/StandardHttpClientTest.java | 148 ++++++++++++++++-- .../dsl/internal/AbstractWatchManager.java | 6 +- .../client/dsl/internal/OperationSupport.java | 51 +----- .../dsl/internal/WatchConnectionManager.java | 12 +- .../client/dsl/internal/WatchHTTPManager.java | 13 +- .../informers/impl/cache/Reflector.java | 5 +- .../internal/AbstractWatchManagerTest.java | 12 +- .../dsl/internal/BaseOperationTest.java | 69 -------- 11 files changed, 274 insertions(+), 209 deletions(-) rename {kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal => kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils}/ExponentialBackoffIntervalCalculator.java (75%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ffa113e93c..38eccbca821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * Fix #4739: honor optimistic concurrency control semantics in the mock server for `PUT` and `PATCH` requests. * Fix #4644: generate CRDs in parallel and optimize code * Fix #4795: don't print warning message when service account token property is unset +* Fix #4788: moved retry logic into the standard client so that it applies to all requests, including websockets #### Dependency Upgrade * Fix #4655: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.26.0 diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java index b42d2ee35a3..64a635ac595 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java @@ -16,19 +16,31 @@ package io.fabric8.kubernetes.client.http; +import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.http.AsyncBody.Consumer; import io.fabric8.kubernetes.client.http.WebSocket.Listener; +import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator; import io.fabric8.kubernetes.client.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; public abstract class StandardHttpClient> implements HttpClient { + private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class); + protected StandardHttpClientBuilder builder; protected StandardHttpClient(StandardHttpClientBuilder builder) { @@ -50,15 +62,27 @@ public DerivedClientBuilder newBuilder() { @Override public CompletableFuture> sendAsync(HttpRequest request, Class type) { CompletableFuture> upstream = HttpResponse.SupportedResponses.from(type).sendAsync(request, this); - return withUpstreamCancellation(upstream, b -> { - if (b instanceof Closeable) { - Utils.closeQuietly((Closeable) b); + final CompletableFuture> result = new CompletableFuture<>(); + upstream.whenComplete(completeOrCancel(r -> { + if (r.body() instanceof Closeable) { + Utils.closeQuietly((Closeable) r.body()); } - }); + }, result)); + return result; } @Override public CompletableFuture> consumeBytes(HttpRequest request, Consumer> consumer) { + CompletableFuture> result = new CompletableFuture<>(); + + retryWithExponentialBackoff(result, () -> consumeBytesOnce(request, consumer), request.uri(), r -> r.code(), + r -> r.body().cancel(), + null); + return result; + } + + private CompletableFuture> consumeBytesOnce(HttpRequest request, + Consumer> consumer) { StandardHttpRequest standardHttpRequest = (StandardHttpRequest) request; StandardHttpRequest.Builder copy = standardHttpRequest.newBuilder(); for (Interceptor interceptor : builder.getInterceptors().values()) { @@ -85,24 +109,69 @@ public CompletableFuture> consumeBytes(HttpRequest reque return CompletableFuture.completedFuture(response); }); } - - return withUpstreamCancellation(cf, AsyncBody::cancel); + return cf; } - static CompletableFuture> withUpstreamCancellation(CompletableFuture> cf, - java.util.function.Consumer cancel) { - final CompletableFuture> result = new CompletableFuture<>(); - cf.whenComplete((r, t) -> { + private static BiConsumer completeOrCancel(java.util.function.Consumer cancel, + final CompletableFuture result) { + return (r, t) -> { if (t != null) { result.completeExceptionally(t); } else { - // if already completed, take responsibility to proactively close if (!result.complete(r)) { - cancel.accept(r.body()); + cancel.accept(r); } } - }); - return result; + }; + } + + /** + * Will retry the action if needed based upon the retry settings. A calculator will be created on the first + * call and passed to subsequent retries to keep track of the attempts. + */ + protected void retryWithExponentialBackoff(CompletableFuture result, + Supplier> action, URI uri, Function codeExtractor, + java.util.function.Consumer cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator) { + + if (retryIntervalCalculator == null) { + Config requestConfig = this.builder.getRequestConfig(); + int requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL; + int requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT; + if (requestConfig != null) { + requestRetryBackoffInterval = requestConfig.getRequestRetryBackoffInterval(); + requestRetryBackoffLimit = requestConfig.getRequestRetryBackoffLimit(); + } + retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, requestRetryBackoffLimit); + } + + final ExponentialBackoffIntervalCalculator backoff = retryIntervalCalculator; + action.get() + .whenComplete((response, throwable) -> { + if (backoff.shouldRetry() && !result.isDone()) { + long retryInterval = backoff.nextReconnectInterval(); + boolean retry = false; + if (response != null) { + Integer code = codeExtractor.apply(response); + if (code != null && code >= 500) { + LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis", + uri, code, retryInterval); + retry = true; + } + } else if (throwable instanceof IOException) { + LOG.debug(String.format("HTTP operation on url: %s should be retried after %d millis because of IOException", + uri, retryInterval), throwable); + retry = true; + } + if (retry) { + Utils.schedule(Runnable::run, + () -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, backoff), + retryInterval, + TimeUnit.MILLISECONDS); + return; + } + } + completeOrCancel(cancel, result).accept(response, throwable); + }); } @Override @@ -119,6 +188,29 @@ public HttpRequest.Builder newHttpRequestBuilder() { final CompletableFuture buildWebSocket(StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) { + CompletableFuture intermediate = new CompletableFuture<>(); + + retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener), + standardWebSocketBuilder.asHttpRequest().uri(), + r -> Optional.ofNullable(r.wshse).map(WebSocketHandshakeException::getResponse).map(HttpResponse::code).orElse(null), + r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)), + null); + + CompletableFuture result = new CompletableFuture<>(); + + // map to a websocket + intermediate.whenComplete((r, t) -> { + if (t != null) { + result.completeExceptionally(t); + } else { + completeOrCancel(w -> w.sendClose(1000, null), result).accept(r.webSocket, r.wshse); + } + }); + return result; + } + + private CompletableFuture buildWebSocketOnce(StandardWebSocketBuilder standardWebSocketBuilder, + Listener listener) { final StandardWebSocketBuilder copy = standardWebSocketBuilder.newBuilder(); builder.getInterceptors().values().stream().map(Interceptor.useConfig(builder.requestConfig)) .forEach(i -> i.before(copy, copy.asHttpRequest())); @@ -138,32 +230,7 @@ final CompletableFuture buildWebSocket(StandardWebSocketBuilder stand return CompletableFuture.completedFuture(response); }); } - - final CompletableFuture result = new CompletableFuture<>(); - // map back to the expected convention with the future completed by the response exception - cf.whenComplete(onWebSocketComplete(result)); - return result; - - } - - private static BiConsumer onWebSocketComplete(CompletableFuture result) { - return (r, t) -> { - if (t != null) { - result.completeExceptionally(t); - } else if (r != null) { - if (r.wshse != null) { - result.completeExceptionally(r.wshse); - } else { - // if already completed, take responsibility to proactively close - if (!result.complete(r.webSocket)) { - r.webSocket.sendClose(1000, null); - } - } - } else { - // shouldn't happen - result.complete(null); - } - }; + return cf; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/ExponentialBackoffIntervalCalculator.java similarity index 75% rename from kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java rename to kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/ExponentialBackoffIntervalCalculator.java index 4dc34a5a272..bbf1136b7e3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/ExponentialBackoffIntervalCalculator.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.fabric8.kubernetes.client.utils.internal; +package io.fabric8.kubernetes.client.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,13 +24,20 @@ public class ExponentialBackoffIntervalCalculator { private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class); + private static final int MAX_RETRY_INTERVAL_EXPONENT = 5; + + public static final int UNLIMITED_RETRIES = -1; + private final int initialInterval; - private final int maxRetryIntervalExponent; + // we were using the same default in multiple places, so it has been moved here for now + // other calculators express this as max wait + private final int maxRetryIntervalExponent = MAX_RETRY_INTERVAL_EXPONENT; + private final int maxRetries; final AtomicInteger currentReconnectAttempt = new AtomicInteger(); - public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) { + public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetries) { this.initialInterval = initialInterval; - this.maxRetryIntervalExponent = maxRetryIntervalExponent; + this.maxRetries = maxRetries; } public long getInterval(int retryIndex) { @@ -56,4 +63,8 @@ public int getCurrentReconnectAttempt() { return currentReconnectAttempt.get(); } + public boolean shouldRetry() { + return maxRetries < 0 || currentReconnectAttempt.get() < maxRetries; + } + } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java index 7787541540c..f29f088ee29 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java @@ -15,24 +15,35 @@ */ package io.fabric8.kubernetes.client.http; +import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.http.AsyncBody.Consumer; import io.fabric8.kubernetes.client.http.HttpClient.Factory; import io.fabric8.kubernetes.client.http.WebSocket.Listener; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; class StandardHttpClientTest { private static final class TestableStandardHttpClient extends StandardHttpClient> { - CompletableFuture wsFuture; - CompletableFuture> respFuture; + List> wsFutures = new ArrayList<>(); + int wsIndex; + List>> respFutures = new ArrayList<>(); + int respIndex; private TestableStandardHttpClient() { super(Mockito.mock(StandardHttpClientBuilder.class)); @@ -44,21 +55,31 @@ public void close() { } @Override - public CompletableFuture buildWebSocketDirect(StandardWebSocketBuilder standardWebSocketBuilder, + public synchronized CompletableFuture buildWebSocketDirect( + StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) { - this.wsFuture = new CompletableFuture<>(); - return wsFuture; + if (wsFutures.size() <= wsIndex) { + wsFutures.add(new CompletableFuture<>()); + } + return wsFutures.get(wsIndex++); } @Override - public CompletableFuture> consumeBytesDirect(StandardHttpRequest request, + public synchronized CompletableFuture> consumeBytesDirect(StandardHttpRequest request, Consumer> consumer) { - this.respFuture = new CompletableFuture<>(); - return respFuture; + if (respFutures.size() <= respIndex) { + respFutures.add(new CompletableFuture<>()); + } + return respFutures.get(respIndex++); } } - private TestableStandardHttpClient client = new TestableStandardHttpClient(); + private TestableStandardHttpClient client; + + @BeforeEach + void setup() { + client = new TestableStandardHttpClient(); + } @Test void webSocketFutureCancel() { @@ -70,7 +91,7 @@ void webSocketFutureCancel() { // cancel the future before the websocket response future.cancel(true); - client.wsFuture.complete(new WebSocketResponse(ws, null)); + client.wsFutures.get(0).complete(new WebSocketResponse(ws, null)); // ensure that the ws has been closed Mockito.verify(ws).sendClose(1000, null); @@ -91,7 +112,7 @@ public void consume(List value, AsyncBody asyncBody) throws Exceptio // cancel the future before the response consumeFuture.cancel(true); - client.respFuture.complete(asyncResp); + client.respFutures.get(0).complete(asyncResp); Mockito.verify(asyncResp.body()).cancel(); } @@ -106,8 +127,111 @@ void sendAsyncFutureCancel() { // cancel the future before the response sendAsyncFuture.cancel(true); - client.respFuture.complete(asyncResp); + client.respFutures.get(0).complete(asyncResp); Mockito.verify(asyncResp.body()).cancel(); } + @Test + void testNoHttpRetryWithDefaultConfig() throws InterruptedException { + CompletableFuture sendAsyncFuture = client.sendAsync(client.newHttpRequestBuilder().uri("http://localhost").build(), + InputStream.class); + + client.respFutures.get(0).completeExceptionally(new IOException()); + + try { + sendAsyncFuture.get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + } + } + + @Test + void testHttpRetryWithMoreFailuresThanRetries() throws Exception { + Mockito.when(client.builder.getRequestConfig()) + .thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default") + .withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build()); + + CompletableFuture> consumeFuture = client.consumeBytes( + client.newHttpRequestBuilder().uri("http://localhost").build(), + new Consumer>() { + @Override + public void consume(List value, AsyncBody asyncBody) throws Exception { + + } + }); + + HttpResponse error = Mockito.mock(HttpResponse.class); + Mockito.when(error.code()).thenReturn(500); + long start = System.currentTimeMillis(); + client.respFutures.get(0).completeExceptionally(new IOException()); + client.respFutures.add(client.respFutures.get(0)); + client.respFutures.add(client.respFutures.get(0)); + client.respFutures.add(CompletableFuture.completedFuture(error)); + + // should ultimately error with the final 500 + assertEquals(500, consumeFuture.get().code()); + long stop = System.currentTimeMillis(); + + // should take longer than the delay + assertTrue(stop - start >= 350); //50+100+200 + + // only 4 requests issued + assertEquals(4, client.respFutures.size()); + } + + @Test + void testHttpRetryWithLessFailuresThanRetries() throws Exception { + Mockito.when(client.builder.getRequestConfig()) + .thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default") + .withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build()); + + HttpResponse error = Mockito.mock(HttpResponse.class); + Mockito.when(error.code()).thenReturn(500); + client.respFutures.add(CompletableFuture.completedFuture(error)); + client.respFutures.add(CompletableFuture.completedFuture(error)); + client.respFutures.add(CompletableFuture.completedFuture(error)); + HttpResponse success = Mockito.mock(HttpResponse.class); + Mockito.when(error.code()).thenReturn(200); + client.respFutures.add(CompletableFuture.completedFuture(success)); + + CompletableFuture> consumeFuture = client.consumeBytes( + client.newHttpRequestBuilder().uri("http://localhost").build(), + new Consumer>() { + @Override + public void consume(List value, AsyncBody asyncBody) throws Exception { + + } + }); + + // should ultimately succeed with the final 500 + assertEquals(200, consumeFuture.get().code()); + + // only 4 requests issued + assertEquals(4, client.respFutures.size()); + } + + @Test + void testWebSocketWithLessFailuresThanRetries() throws Exception { + Mockito.when(client.builder.getRequestConfig()) + .thenReturn(new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default") + .withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(50).build()); + + WebSocket ws = Mockito.mock(WebSocket.class); + + CompletableFuture future = client.newWebSocketBuilder().uri(URI.create("ws://localhost")) + .buildAsync(new Listener() { + }); + + HttpResponse error = Mockito.mock(HttpResponse.class); + Mockito.when(error.code()).thenReturn(500); + client.wsFutures.get(0).completeExceptionally(new WebSocketHandshakeException(error)); + client.wsFutures.add(client.wsFutures.get(0)); + client.wsFutures.add(CompletableFuture.completedFuture((new WebSocketResponse(ws, null)))); + + future.get(); + + assertEquals(3, client.wsFutures.size()); + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index c923872640f..e806a06f5f3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -29,9 +29,9 @@ import io.fabric8.kubernetes.client.Watcher.Action; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator; import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.Utils; -import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +68,10 @@ public abstract class AbstractWatchManager implements Wat AbstractWatchManager( Watcher watcher, BaseOperation baseOperation, ListOptions listOptions, int reconnectLimit, - int reconnectInterval, int maxIntervalExponent, Supplier clientSupplier) throws MalformedURLException { + int reconnectInterval, Supplier clientSupplier) throws MalformedURLException { this.watcher = watcher; this.reconnectLimit = reconnectLimit; - this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent); + this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, reconnectLimit); this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.forceClosed = new AtomicBoolean(); this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks()); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java index 6e00568b9d4..36e34174a2f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java @@ -40,7 +40,6 @@ import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.URLUtils; import io.fabric8.kubernetes.client.utils.Utils; -import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +73,6 @@ public class OperationSupport { protected static final ObjectMapper JSON_MAPPER = Serialization.jsonMapper(); private static final Logger LOG = LoggerFactory.getLogger(OperationSupport.class); private static final String CLIENT_STATUS_FLAG = "CLIENT_STATUS_FLAG"; - private static final int MAX_RETRY_INTERVAL_EXPONENT = 5; protected OperationContext context; protected final HttpClient httpClient; @@ -85,8 +83,6 @@ public class OperationSupport { protected String apiGroupName; protected String apiGroupVersion; protected boolean dryRun; - private final int requestRetryBackoffLimit; - private final int requestRetryBackoffInterval; public OperationSupport(Client client) { this(new OperationContext().withClient(client)); @@ -108,14 +104,6 @@ public OperationSupport(OperationContext ctx) { } else { this.apiGroupVersion = "v1"; } - - if (ctx.getConfig() != null) { - requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval(); - this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit(); - } else { - requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL; - this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT; - } } public String getAPIGroupName() { @@ -577,12 +565,8 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest TypeReference type) { VersionUsageUtils.log(this.resourceT, this.apiGroupVersion); HttpRequest request = requestBuilder.build(); - CompletableFuture> futureResponse = new CompletableFuture<>(); - retryWithExponentialBackoff(futureResponse, - new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, MAX_RETRY_INTERVAL_EXPONENT), - Utils.getNonNullOrElse(client, httpClient), request); - return futureResponse.thenApply(response -> { + return client.sendAsync(request, byte[].class).thenApply(response -> { try { assertResponseCode(request, response); if (type != null && type.getType() != null) { @@ -598,39 +582,6 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest }); } - protected void retryWithExponentialBackoff(CompletableFuture> result, - ExponentialBackoffIntervalCalculator retryIntervalCalculator, - HttpClient client, HttpRequest request) { - client.sendAsync(request, byte[].class) - .whenComplete((response, throwable) -> { - int retries = retryIntervalCalculator.getCurrentReconnectAttempt(); - if (retries < requestRetryBackoffLimit) { - long retryInterval = retryIntervalCalculator.nextReconnectInterval(); - boolean retry = false; - if (response != null && response.code() >= 500) { - LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis", - request.uri(), response.code(), retryInterval); - retry = true; - } else if (throwable instanceof IOException) { - LOG.debug(String.format("HTTP operation on url: %s should be retried after %d millis because of IOException", - request.uri(), retryInterval), throwable); - retry = true; - } - if (retry) { - Utils.schedule(context.getExecutor(), - () -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, - TimeUnit.MILLISECONDS); - return; - } - } - if (throwable != null) { - result.completeExceptionally(throwable); - } else { - result.complete(response); - } - }); - } - /** * Checks if the response status code is the expected and throws the appropriate KubernetesClientException if not. * diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 5f8416b12db..7fc97203402 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -70,20 +70,12 @@ static void closeWebSocket(WebSocket webSocket) { public WatchConnectionManager(final HttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, - long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { - super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> client.newBuilder() + long websocketTimeout) throws MalformedURLException { + super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, () -> client.newBuilder() .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) .build()); } - public WatchConnectionManager(final HttpClient client, final BaseOperation baseOperation, - final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, - long websocketTimeout) throws MalformedURLException { - // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, - BACKOFF_MAX_EXPONENT); - } - @Override protected void closeRequest() { if (this.listener != null) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 7d41dd739ce..5c240381aed 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -45,23 +45,12 @@ public WatchHTTPManager(final HttpClient client, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit) throws MalformedURLException { - // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, 5); - } - - public WatchHTTPManager(final HttpClient client, - final BaseOperation baseOperation, - final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, - final int reconnectLimit, int maxIntervalExponent) - throws MalformedURLException { - super( - watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, () -> client.newBuilder() .readTimeout(0, TimeUnit.MILLISECONDS) .forStreaming() .build()); - } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index bc65b634421..7423c26468f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -22,11 +22,10 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; -import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; import io.fabric8.kubernetes.client.informers.ExceptionHandler; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; +import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator; import io.fabric8.kubernetes.client.utils.Utils; -import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +58,7 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this.store = store; this.watcher = new ReflectorWatcher(); this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), - WatchConnectionManager.BACKOFF_MAX_EXPONENT); + ExponentialBackoffIntervalCalculator.UNLIMITED_RETRIES); } public CompletableFuture start() { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index fc9b0aeb9b9..033e539ef6e 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -111,7 +111,7 @@ void closeWebSocket() { void nextReconnectInterval() throws MalformedURLException { // Given final WatchManager awm = new WatchManager<>( - null, mock(ListOptions.class), 0, 10, 5); + null, mock(ListOptions.class), 0, 10); // When-Then assertThat(awm.nextReconnectInterval()).isEqualTo(10); assertThat(awm.nextReconnectInterval()).isEqualTo(20); @@ -161,7 +161,7 @@ void reconnectRace() throws Exception { final WatcherAdapter watcher = new WatcherAdapter<>(); CompletableFuture done = new CompletableFuture(); final WatchManager awm = new WatchManager( - watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0) { + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0) { boolean first = true; @@ -217,7 +217,7 @@ void closeWithNonNullRunnerShouldCancelRunner() throws MalformedURLException { private static WatchManager withDefaultWatchManager(Watcher watcher) throws MalformedURLException { return new WatchManager<>( - watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0); + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0); } private static class WatcherAdapter implements Watcher { @@ -242,9 +242,9 @@ private static class WatchManager extends AbstractWatchMa private final AtomicInteger closeCount = new AtomicInteger(0); - public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, - int maxIntervalExponent) throws MalformedURLException { - super(watcher, Mockito.mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval) + throws MalformedURLException { + super(watcher, Mockito.mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval, () -> null); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationTest.java index dd604175bc9..0643cee23cf 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationTest.java @@ -270,75 +270,6 @@ private HttpClient newHttpClientWithSomeFailures(final AtomicInteger httpExecuti return mockClient; } - @Test - void testNoHttpRetryWithDefaultConfig() { - final AtomicInteger httpExecutionCounter = new AtomicInteger(0); - HttpClient mockClient = newHttpClientWithSomeFailures(httpExecutionCounter, 1000); - BaseOperation> baseOp = new BaseOperation(new OperationContext() - .withClient(mockClient(mockClient, - new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default").build())) - .withPlural("pods") - .withName("test-pod")); - baseOp.setType(Pod.class); - - // When - Exception exception = assertThrows(KubernetesClientException.class, () -> { - Pod result = baseOp.get(); - }); - - // Then - assertTrue(exception.getCause().getMessage().contains("For example java.net.ConnectException"), - "As the first failure is an IOException the message of the causedBy expected to contain the given text: 'For example java.net.ConnectException'!"); - assertEquals(1, httpExecutionCounter.get()); - } - - @Test - void testHttpRetryWithMoreFailuresThanRetries() { - final AtomicInteger httpExecutionCounter = new AtomicInteger(0); - HttpClient mockClient = newHttpClientWithSomeFailures(httpExecutionCounter, 1000); - long start = System.currentTimeMillis(); - BaseOperation> baseOp = new BaseOperation(new OperationContext() - .withClient(mockClient(mockClient, - new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default") - .withRequestRetryBackoffLimit(3).withRequestRetryBackoffInterval(100).build())) - .withPlural("pods") - .withName("test-pod")); - baseOp.setType(Pod.class); - - // When - Exception exception = assertThrows(KubernetesClientException.class, () -> { - Pod result = baseOp.get(); - }); - - long stop = System.currentTimeMillis(); - - // Then - assertTrue(stop - start >= 700); //100+200+400 - assertTrue(exception.getMessage().contains("Internal Server Error"), - "As the last failure, the 3rd one, is not an IOException the message expected to contain: 'Internal Server Error'!"); - assertEquals(4, httpExecutionCounter.get(), "Expected 4 calls: one normal try and 3 backoff retries!"); - } - - @Test - void testHttpRetryWithLessFailuresThanRetries() { - final AtomicInteger httpExecutionCounter = new AtomicInteger(0); - HttpClient mockClient = newHttpClientWithSomeFailures(httpExecutionCounter, 2); - BaseOperation> baseOp = new BaseOperation(new OperationContext() - .withClient(mockClient(mockClient, - new ConfigBuilder().withMasterUrl("https://172.17.0.2:8443").withNamespace("default") - .withRequestRetryBackoffLimit(3).build())) - .withPlural("pods") - .withName("test-pod")); - baseOp.setType(Pod.class); - - // When - Pod result = baseOp.get(); - - // Then - assertNotNull(result); - assertEquals(3, httpExecutionCounter.get(), "Expected 3 calls: 2 failures and 1 success!"); - } - @Test void testMissingNamespace() { BaseOperation> baseOp = new BaseOperation<>(new OperationContext()