Skip to content

Commit

Permalink
Update RxResultCursorImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives committed Nov 9, 2024
1 parent d70759c commit d8aa4ae
Show file tree
Hide file tree
Showing 6 changed files with 641 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public CompletionStage<RxResultCursor> runRx(
apiTelemetryWork.setEnabled(!telemetryDisabled);
var runFailed = new AtomicBoolean(false);
var responseHandler =
new RunRxResponseHandler(connection, query, this::handleNewBookmark, runFailed);
new RunRxResponseHandler(logging, connection, query, this::handleNewBookmark, runFailed);
var cursorStage = apiTelemetryWork
.pipelineTelemetryIfEnabled(connection)
.thenCompose(conn -> conn.runInAutoCommitTransaction(
Expand Down Expand Up @@ -808,6 +808,7 @@ public AuthToken overrideAuthToken() {

public static class RunRxResponseHandler implements ResponseHandler {
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
private final Logging logging;
private final BoltConnection connection;
private final Query query;
private final Consumer<DatabaseBookmark> bookmarkConsumer;
Expand All @@ -817,10 +818,12 @@ public static class RunRxResponseHandler implements ResponseHandler {
private int ignoredCount;

public RunRxResponseHandler(
Logging logging,
BoltConnection connection,
Query query,
Consumer<DatabaseBookmark> bookmarkConsumer,
AtomicBoolean runFailed) {
this.logging = logging;
this.connection = connection;
this.query = query;
this.bookmarkConsumer = bookmarkConsumer;
Expand Down Expand Up @@ -867,11 +870,11 @@ public void onComplete() {
query,
runSummary,
error,
() -> null,
bookmarkConsumer,
(ignored) -> {},
true,
() -> null));
() -> null,
logging));
} else {
var message = ignoredCount > 0
? "Run exchange contains ignored messages."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.util.Futures;

public class ResultCursorsHolder {
private final List<CompletionStage<? extends FailableCursor>> cursorStages = new ArrayList<>();
Expand All @@ -35,8 +36,11 @@ void add(CompletionStage<? extends FailableCursor> cursorStage) {
cursorStages.add(cursorStage);
}
cursorStage.thenCompose(FailableCursor::consumed).whenComplete((ignored, throwable) -> {
throwable = Futures.completionExceptionCause(throwable);
synchronized (this) {
cursorStages.remove(cursorStage);
if (throwable == null) {
cursorStages.remove(cursorStage);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private enum State {
"Can't rollback, transaction has been requested to be committed";
private static final EnumSet<State> OPEN_STATES = EnumSet.of(State.ACTIVE, State.TERMINATED);

private final Logging logging;
private final TerminationAwareBoltConnection connection;
private final Consumer<DatabaseBookmark> bookmarkConsumer;
private final ResultCursorsHolder resultCursors;
Expand Down Expand Up @@ -153,6 +154,7 @@ protected UnmanagedTransaction(
NotificationConfig notificationConfig,
ApiTelemetryWork apiTelemetryWork,
Logging logging) {
this.logging = logging;
this.connection = new TerminationAwareBoltConnection(connection, this);
this.databaseName = databaseName;
this.accessMode = accessMode;
Expand Down Expand Up @@ -254,6 +256,7 @@ public CompletionStage<RxResultCursor> runRx(Query query) {
ensureCanRunQueries();
var parameters = query.parameters().asMap(Values::value);
var responseHandler = new RunRxResponseHandler(
logging,
apiTelemetryWork,
() -> executeWithLock(lock, () -> causeOfTermination),
this::markTerminated,
Expand Down Expand Up @@ -673,6 +676,7 @@ public void onComplete() {

private static class RunRxResponseHandler implements ResponseHandler {
final CompletableFuture<RxResultCursor> cursorFuture = new CompletableFuture<>();
private final Logging logging;
private final ApiTelemetryWork apiTelemetryWork;
private final Supplier<Throwable> termSupplier;
private final Consumer<Throwable> markTerminated;
Expand All @@ -685,13 +689,15 @@ private static class RunRxResponseHandler implements ResponseHandler {
private int ignoredCount;

private RunRxResponseHandler(
Logging logging,
ApiTelemetryWork apiTelemetryWork,
Supplier<Throwable> termSupplier,
Consumer<Throwable> markTerminated,
CompletableFuture<UnmanagedTransaction> beginFuture,
UnmanagedTransaction transaction,
BoltConnection connection,
Query query) {
this.logging = logging;
this.apiTelemetryWork = apiTelemetryWork;
this.termSupplier = termSupplier;
this.markTerminated = markTerminated;
Expand Down Expand Up @@ -747,11 +753,11 @@ public void onComplete() {
query,
null,
error,
termSupplier,
bookmark -> {},
transaction::markTerminated,
false,
termSupplier));
termSupplier,
logging));
}
} else {
if (runSummary != null) {
Expand All @@ -760,11 +766,11 @@ public void onComplete() {
query,
runSummary,
null,
termSupplier,
bookmark -> {},
transaction::markTerminated,
false,
termSupplier));
termSupplier,
logging));
} else {
var throwable = termSupplier.get();
if (throwable == null) {
Expand Down
Loading

0 comments on commit d8aa4ae

Please sign in to comment.