Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to Data.read #11269

Merged
merged 8 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Enso_Cloud.Data_Link.Data_Link
import project.Enso_Cloud.Data_Link_Helpers
import project.Error.Error
import project.Errors.Deprecated.Deprecated
import project.Errors.Problem_Behavior.Problem_Behavior
import project.IO
import project.Math
import project.Metadata.Display
import project.Metadata.Widget
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Error.HTTP_Error
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.URI.URI
import project.Runtime.Thread
import project.Warning.Warning
from project.Data import Raw_Response
from project.Data.Boolean import Boolean, False, True
Expand All @@ -28,8 +33,22 @@ looks_like_uri path:Text -> Boolean =
A common implementation for fetching a resource and decoding it,
following encountered data links.
fetch_following_data_links (uri:URI) (method:HTTP_Method = HTTP_Method.Get) (headers:Vector = []) format =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format
fetch_and_decode =
response = HTTP.fetch uri method headers
decode_http_response_following_data_links response format

error_handler attempt =
hubertp marked this conversation as resolved.
Show resolved Hide resolved
caught_error ->
case method of
HTTP_Method.Get ->
if attempt > 2 then Error.throw caught_error else
sleep_time = Math.min (100 * (2 ^ (attempt - 1))) 5000
hubertp marked this conversation as resolved.
Show resolved Hide resolved
Thread.sleep sleep_time
fetch_and_decode . catch HTTP_Error (error_handler (attempt + 1))
_ ->
Error.throw caught_error.payload

fetch_and_decode . catch HTTP_Error (error_handler 0)

## PRIVATE
Decodes a HTTP response, handling data link access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Internal threading utilities used for working with threads.

import project.Any.Any
import project.Data.Numbers.Integer

## PRIVATE
ADVANCED
Expand All @@ -19,3 +20,18 @@ import project.Any.Any
Thread.with_interrupt_handler (1 + 1) <| IO.println "I died!"
with_interrupt_handler : Any -> Any -> Any
with_interrupt_handler ~action ~interrupt_handler = @Builtin_Method "Thread.with_interrupt_handler"

## PRIVATE
ADVANCED
Temporarily cease execution of the current thread.

Arguments:
- time: amount of milliseconds to sleep

> Example
Sleep the current thread for 1 second.

Thread.sleep 1000 <| IO.println "I continue!"
sleep : Integer
sleep time_in_milliseconds = @Builtin_Method "Thread.sleep"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to get this as a builtin, I was always polyglot java import java.lang.Thread, but this seems so much nicer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather limit builtins to a bare minimum. Especially something that doesn't have to be fast as sleep doesn't indicate any need to be a builtin.

hubertp marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.enso.interpreter.node.expression.builtin.thread;

import com.oracle.truffle.api.dsl.Fallback;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.nodes.Node;
import org.enso.interpreter.dsl.BuiltinMethod;
import org.enso.interpreter.runtime.EnsoContext;
import org.enso.interpreter.runtime.control.ThreadInterruptedException;
import org.enso.interpreter.runtime.error.PanicException;

@BuiltinMethod(
type = "Thread",
name = "sleep",
description = "Sleep in the current thread.",
autoRegister = false,
inlineable = true)
public abstract class SleepNode extends Node {

public abstract Object execute(Object timeInMilliseconds);

public static SleepNode build() {
return SleepNodeGen.create();
}

@Specialization
Object doLong(long timeInMilliseconds) {
try {
Thread.sleep(timeInMilliseconds);
return EnsoContext.get(this).getBuiltins().nothing();
} catch (InterruptedException e) {
throw new ThreadInterruptedException();
}
}

@Specialization
Object doDouble(double timeInMilliseconds) {
try {
Thread.sleep(Double.valueOf(timeInMilliseconds).longValue());
return EnsoContext.get(this).getBuiltins().nothing();
} catch (InterruptedException e) {
throw new ThreadInterruptedException();
}
}
hubertp marked this conversation as resolved.
Show resolved Hide resolved

@Fallback
Object doOther(Object timeInMilliseconds) {
var builtins = EnsoContext.get(this).getBuiltins();
var intType = builtins.number().getInteger();
throw new PanicException(
builtins.error().makeTypeError(intType, timeInMilliseconds, "timeInMilliseconds"), this);
}
}
18 changes: 14 additions & 4 deletions test/Base_Tests/src/Network/Http_Spec.enso
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ add_specs suite_builder =

## Checking this error partially as a warning - I spent a lot of time debugging why I'm getting such an error.
Apparently it happens when the httpbin server was crashing without sending any response.
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <| Test.with_retries <|
group_builder.specify "should be able to handle server crash resulting in no response" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash")
err.should_fail_with Request_Error
err.catch.error_type . should_equal "java.io.IOException"
Expand All @@ -600,10 +600,20 @@ add_specs suite_builder =
I think it may be worth adding, because it may be really quite confusing for end users who get that kind of error.
err.catch.message . should_equal "HTTP/1.1 header parser received no bytes"

group_builder.specify "should be able to handle IO errors" pending="TODO: Currently I was unable to figure out a way to test such errors" <|
# how to trigger this error???
err = Data.fetch "TODO"
group_builder.specify "should be able to handle occasional server crashes and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?success_every=2")
r1.should_succeed
r1.should_be_a Response

group_builder.specify "should be able to handle server crash that closes stream abruptly" pending=pending_has_url <|
err = Data.fetch (base_url_with_slash+"crash?type=stream")
err.should_fail_with HTTP_Error
err.catch.message . should_equal "An IO error has occurred: java.io.IOException: closed"

group_builder.specify "should be able to handle occasional abrupt stream closures and retry" pending=pending_has_url <|
r1 = Data.fetch (base_url_with_slash+"crash?type=stream&success_every=2")
r1.should_succeed
r1.should_be_a Response

suite_builder.group "Http Auth" group_builder->
group_builder.specify "should support Basic user+password authentication" pending=pending_has_url <| Test.with_retries <|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,81 @@

import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.http.client.utils.URIBuilder;
import org.enso.shttp.SimpleHttpHandler;

public class CrashingTestHandler extends SimpleHttpHandler {
private int requests = 0;

@Override
protected void doHandle(HttpExchange exchange) throws IOException {
// This exception will be logged by SimpleHttpHandler, but that's OK - let's know that this
// Exceptions will be logged by SimpleHttpHandler, but that's OK - let's know that this
// crash is happening.
throw new RuntimeException("This handler crashes on purpose.");

URI uri = exchange.getRequestURI();
URIBuilder builder = new URIBuilder(uri);
final String successEveryParam = "success_every";
final String crashTypeParam = "type";

int successEvery = 0;
CrashType crashType = CrashType.RUNTIME;

for (var queryPair : builder.getQueryParams()) {
if (queryPair.getName().equals(successEveryParam)) {
successEvery = Integer.decode(queryPair.getValue());
} else if (queryPair.getName().equals(crashTypeParam)) {
crashType =
switch (queryPair.getValue()) {
case "stream" -> CrashType.STREAM;
default -> CrashType.RUNTIME;
};
}
}
if (successEvery == 0) {
// Reset counter
requests = 0;
}

switch (crashType) {
case RUNTIME:
if (successEvery == (requests + 1)) {
// return OK, reset
requests = 0;
exchange.sendResponseHeaders(200, -1);
exchange.close();
break;
} else {
requests += 1;
throw new RuntimeException("This handler crashes on purpose.");
}

case STREAM:
byte[] responseData = "Crash and Burn".getBytes();
exchange.sendResponseHeaders(200, responseData.length);
try {
if (successEvery == (requests + 1)) {
hubertp marked this conversation as resolved.
Show resolved Hide resolved
requests = 0;
// return OK, reset
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length);
}
} else {
requests += 1;
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseData, 0, responseData.length / 2);
}
}
} finally {
exchange.close();
}
break;
}
}

enum CrashType {
RUNTIME,
STREAM
hubertp marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading