Skip to content

Commit

Permalink
fix fabric8io#4788 moving retry logic into the standard layer
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Feb 10, 2023
1 parent a5f327f commit cf761d2
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 209 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* Fix #4739: honor optimistic concurrency control semantics in the mock server for `PUT` and `PATCH` requests.
* Fix #4644: generate CRDs in parallel and optimize code
* Fix #4795: don't print warning message when service account token property is unset
* Fix #4788: moved retry logic into the standard client so that it applies to all requests, including websockets

#### Dependency Upgrade
* Fix #4655: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.26.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@

package io.fabric8.kubernetes.client.http;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class StandardHttpClient<C extends HttpClient, F extends HttpClient.Factory, T extends StandardHttpClientBuilder<C, F, ?>>
implements HttpClient {

private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

protected StandardHttpClientBuilder<C, F, T> builder;

protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder) {
Expand All @@ -50,15 +62,27 @@ public DerivedClientBuilder newBuilder() {
@Override
public <V> CompletableFuture<HttpResponse<V>> sendAsync(HttpRequest request, Class<V> type) {
CompletableFuture<HttpResponse<V>> upstream = HttpResponse.SupportedResponses.from(type).sendAsync(request, this);
return withUpstreamCancellation(upstream, b -> {
if (b instanceof Closeable) {
Utils.closeQuietly((Closeable) b);
final CompletableFuture<HttpResponse<V>> result = new CompletableFuture<>();
upstream.whenComplete(completeOrCancel(r -> {
if (r.body() instanceof Closeable) {
Utils.closeQuietly((Closeable) r.body());
}
});
}, result));
return result;
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, Consumer<List<ByteBuffer>> consumer) {
CompletableFuture<HttpResponse<AsyncBody>> result = new CompletableFuture<>();

retryWithExponentialBackoff(result, () -> consumeBytesOnce(request, consumer), request.uri(), r -> r.code(),
r -> r.body().cancel(),
null);
return result;
}

private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(HttpRequest request,
Consumer<List<ByteBuffer>> consumer) {
StandardHttpRequest standardHttpRequest = (StandardHttpRequest) request;
StandardHttpRequest.Builder copy = standardHttpRequest.newBuilder();
for (Interceptor interceptor : builder.getInterceptors().values()) {
Expand All @@ -85,24 +109,69 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
return CompletableFuture.completedFuture(response);
});
}

return withUpstreamCancellation(cf, AsyncBody::cancel);
return cf;
}

static <V> CompletableFuture<HttpResponse<V>> withUpstreamCancellation(CompletableFuture<HttpResponse<V>> cf,
java.util.function.Consumer<V> cancel) {
final CompletableFuture<HttpResponse<V>> result = new CompletableFuture<>();
cf.whenComplete((r, t) -> {
private static <V> BiConsumer<? super V, ? super Throwable> completeOrCancel(java.util.function.Consumer<V> cancel,
final CompletableFuture<V> result) {
return (r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
// if already completed, take responsibility to proactively close
if (!result.complete(r)) {
cancel.accept(r.body());
cancel.accept(r);
}
}
});
return result;
};
}

/**
* Will retry the action if needed based upon the retry settings. A calculator will be created on the first
* call and passed to subsequent retries to keep track of the attempts.
*/
protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator) {

if (retryIntervalCalculator == null) {
Config requestConfig = this.builder.getRequestConfig();
int requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
int requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
if (requestConfig != null) {
requestRetryBackoffInterval = requestConfig.getRequestRetryBackoffInterval();
requestRetryBackoffLimit = requestConfig.getRequestRetryBackoffLimit();
}
retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, requestRetryBackoffLimit);
}

final ExponentialBackoffIntervalCalculator backoff = retryIntervalCalculator;
action.get()
.whenComplete((response, throwable) -> {
if (backoff.shouldRetry() && !result.isDone()) {
long retryInterval = backoff.nextReconnectInterval();
boolean retry = false;
if (response != null) {
Integer code = codeExtractor.apply(response);
if (code != null && code >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
retry = true;
}
} else if (throwable instanceof IOException) {
LOG.debug(String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval), throwable);
retry = true;
}
if (retry) {
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, backoff),
retryInterval,
TimeUnit.MILLISECONDS);
return;
}
}
completeOrCancel(cancel, result).accept(response, throwable);
});
}

@Override
Expand All @@ -119,6 +188,29 @@ public HttpRequest.Builder newHttpRequestBuilder() {
final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {

CompletableFuture<WebSocketResponse> intermediate = new CompletableFuture<>();

retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener),
standardWebSocketBuilder.asHttpRequest().uri(),
r -> Optional.ofNullable(r.wshse).map(WebSocketHandshakeException::getResponse).map(HttpResponse::code).orElse(null),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)),
null);

CompletableFuture<WebSocket> result = new CompletableFuture<>();

// map to a websocket
intermediate.whenComplete((r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else {
completeOrCancel(w -> w.sendClose(1000, null), result).accept(r.webSocket, r.wshse);
}
});
return result;
}

private CompletableFuture<WebSocketResponse> buildWebSocketOnce(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {
final StandardWebSocketBuilder copy = standardWebSocketBuilder.newBuilder();
builder.getInterceptors().values().stream().map(Interceptor.useConfig(builder.requestConfig))
.forEach(i -> i.before(copy, copy.asHttpRequest()));
Expand All @@ -138,32 +230,7 @@ final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder stand
return CompletableFuture.completedFuture(response);
});
}

final CompletableFuture<WebSocket> result = new CompletableFuture<>();
// map back to the expected convention with the future completed by the response exception
cf.whenComplete(onWebSocketComplete(result));
return result;

}

private static BiConsumer<WebSocketResponse, Throwable> onWebSocketComplete(CompletableFuture<WebSocket> result) {
return (r, t) -> {
if (t != null) {
result.completeExceptionally(t);
} else if (r != null) {
if (r.wshse != null) {
result.completeExceptionally(r.wshse);
} else {
// if already completed, take responsibility to proactively close
if (!result.complete(r.webSocket)) {
r.webSocket.sendClose(1000, null);
}
}
} else {
// shouldn't happen
result.complete(null);
}
};
return cf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils.internal;
package io.fabric8.kubernetes.client.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,13 +24,20 @@ public class ExponentialBackoffIntervalCalculator {

private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class);

private static final int MAX_RETRY_INTERVAL_EXPONENT = 5;

public static final int UNLIMITED_RETRIES = -1;

private final int initialInterval;
private final int maxRetryIntervalExponent;
// we were using the same default in multiple places, so it has been moved here for now
// other calculators express this as max wait
private final int maxRetryIntervalExponent = MAX_RETRY_INTERVAL_EXPONENT;
private final int maxRetries;
final AtomicInteger currentReconnectAttempt = new AtomicInteger();

public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) {
public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetryIntervalExponent = maxRetryIntervalExponent;
this.maxRetries = maxRetries;
}

public long getInterval(int retryIndex) {
Expand All @@ -56,4 +63,8 @@ public int getCurrentReconnectAttempt() {
return currentReconnectAttempt.get();
}

public boolean shouldRetry() {
return maxRetries < 0 || currentReconnectAttempt.get() < maxRetries;
}

}
Loading

0 comments on commit cf761d2

Please sign in to comment.