Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: remote flows do not cancel promptly upon query cancellation #64916

Closed
jordanlewis opened this issue May 8, 2021 · 12 comments · Fixed by #65073
Closed

sql: remote flows do not cancel promptly upon query cancellation #64916

jordanlewis opened this issue May 8, 2021 · 12 comments · Fixed by #65073
Assignees
Labels
C-bug Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.

Comments

@jordanlewis
Copy link
Member

jordanlewis commented May 8, 2021

Setup:

  1. create 4-node roachprod cluster
  2. import tpch (workload init tpch --data-loader=import)
  3. run querybench with a simple distsql query (select count(*) from tpch.lineitem) at concurrency 1024 (workload run querybench --query-file=queries --tolerate-errors --verbose)
  4. cancel workload

Observe custom chart of queued and active flows:

image
image

In this chart, the load was cancelled between 14:53 and 14:54, but it took over 5 minutes for the cluster to stop processing remote flows.

I would expect that when the load is cancelled, the remote nodes should be able to drop their flows on the floor very quickly.

@jordanlewis jordanlewis added the C-bug Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior. label May 8, 2021
@jordanlewis
Copy link
Member Author

The problem appears to be related to the problems we were looking at in ca6457e again. What happens in this situation is that flows are promptly cancelled via the "outbox watchdog" goroutine (the one in outbox.RunWithStream that polls the flow stream by calling Recv in a loop), which gets an io.EOF as soon as the load generator is turned off and the gateway node cancels its queries.

But, we do not cancel the flow's context when we get io.EOF, only when we get non-io.EOF. What do you think of this problem @yuzefovich? If we get io.EOF, that means the stream has gracefully shut down, which means that we ought to also gracefully shut ourselves down... right?

With the following tiny patch, the graph is fixed:

diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go
index 48176767f5..d23de2316f 100644
--- a/pkg/sql/colflow/colrpc/outbox.go
+++ b/pkg/sql/colflow/colrpc/outbox.go
@@ -358,16 +358,18 @@ func (o *Outbox) runWithStream(
                for {
                        msg, err := stream.Recv()
                        if err != nil {
+                               log.VEventf(ctx, 1, "Outbox watchdog received err: %+v", err)
                                if err != io.EOF {
                                        log.Warningf(ctx, "Outbox calling flowCtxCancel after Recv connection error: %+v", err)
-                                       flowCtxCancel()
                                }
+                               flowCtxCancel()
                                break
                        }

image

@jordanlewis
Copy link
Member Author

Here's another graph with 1024 concurrency, the last one was accidentally 512 concurrency. So I think this patch is an improvement, but it's interesting that we still have to dequeue flows just to set them up, call FlowStream back to the gateway, which will check the map and discover that the flow is gone and return an error. It feels like we could have a faster path here, but I don't know. I would expect that perhaps we could send a cancellation message from the gateway as soon as cancellation happens, so we can proactively clean up any queued flows before they even establish new FlowStream calls.

image

@jordanlewis
Copy link
Member Author

We definitely should find a way to do fast cancellation. Look what happens when the gateway completely swamps the cluster with a more heavy workload for a longer time (this is tpch query 1 with high concurrency).

The queue grows and grows until the load is turned off. At that point, the system has to dequeue a flow, send a FlowStream RPC, and wait for either a .Recv() on the watchdog thread to get an EOF, or a .Send() on the sendBatches() thread to do it. But! We start the watchdog thread and the sendBatches() thread concurrently, which means that there's a good chance that we'll actually queue and do real work before noticing that the flow is dead. This wastes resources and takes significant time, which is why in the following picture it took a long time to drain the queue.

I think we need some way of detecting flow cancellation actively. Imagine an RPC from the gateway that informs remote notes that flows xyz are cancelled. The remote nodes then can either filter the queue to remove dead flows, or at worst, keep the dead flow IDs in a map to use when flows are dequeued, so they can at least be fast-path finished.

image

@yuzefovich
Copy link
Member

I think we need some way of detecting flow cancellation actively. Imagine an RPC from the gateway that informs remote notes that flows xyz are cancelled. The remote nodes then can either filter the queue to remove dead flows, or at worst, keep the dead flow IDs in a map to use when flows are dequeued, so they can at least be fast-path finished.

Yeah, something like this makes sense to me. We definitely need the participation of the gateway node (the one that called SetupFlow RPC) in order to kill all scheduled remote flows that are now dead.

@yuzefovich yuzefovich self-assigned this May 11, 2021
craig bot pushed a commit that referenced this issue May 11, 2021
64940: colrpc: shut down outbox tree on a graceful termination of a stream r=yuzefovich a=jordanlewis

Previously, if a query were gracefully cancelled that had related remote
flows, those remote flows would not be actively cancelled. They would
cancel themselves only when they next tried to write to the network and
got an io.EOF.

Now, the outbox "watchdog goroutine" will cancel the tree rooted in the
outbox actively once it sees an io.EOF. Note that such behavior is
reasonable because the inbox stream handler must have exited, so we
don't need to concern ourselves with doing any other work; however, the
flow context is not canceled since other trees planned on the same node
might still be doing useful work on behalf of other streams.

Addresses #64916.

Release note (sql change): improve cancellation behavior for DistSQL
flows.

Co-authored-by: Jordan Lewis <[email protected]>
@yuzefovich
Copy link
Member

yuzefovich commented May 12, 2021

I have thought about implementing this eager cancellation mechanism of the scheduled flows. We will need to introduce another RPC to the DistSQL, and I think we can go two ways:

  1. the RPC will take in a single FlowID to cancel on the receiving node
  2. the RPC will take a list of FlowIDs to cancel on the receiving node.

The first option is a lot simpler to implement, but it would obviously have higher overhead if multiple queries are canceled at once. I think I'll try implementing the second option. The complication there is splitting up the cancelFlowRequests on the gateway between different remote nodes and batching the flowIDs together. Another question for the second option is when do we actually perform the cancellation? I think I'll try an approach with limited number of "canceling worker" goroutines, each issue an RPC against a single node with all flowIDs for that node accumulated so far.

Another question is when and how do we decide to try canceling a scheduled flow on the remote node. One alternative is to track which remote flows initiated FlowStream RPC and cancel those that haven't. (If the remote node initiated FlowStream RPC, then that remote flow must have been started and is no longer in the queue, so we will let it run and be canceled via FlowStream RPC ungraceful shutdown.) Second alternative is always try to cancel all remote flows if the DistSQLReceiver's resultWriter has an error set - the reasoning being is that we don't expect errors to occur, and we will treat any error as if the remote flows were scheduled but were canceled before they had a chance to start. I guess both of these alternatives could be complimentary to each other.

@yuzefovich
Copy link
Member

@jordanlewis I'm curious what were the parameters of the setup where you observed 13k flows queued up? I tried to use 3 node roachprod cluster with fourth server issuing the queries, imported TPCH data sets of scale factors of 10 and 1, and with concurrency=1024 of query 1, the cluster OOMs.

@yuzefovich
Copy link
Member

I guess I'll just use machines with larger RAM, not the default ones.

@jordanlewis
Copy link
Member Author

@yuzefovich I used a default setup, with init --data-loader=imports on the default SF 1 dataset.

I used 2 test experiments, one with query 1 and one with query 18. The experiment with the nice pyramid graph above was with query 1.

One thing I'll note is that I did have a patch on the database that was not default that I assumed wasn't doing anything, but perhaps it did improve OOM behavior somewhat: #64906 (comment)

Once I applied that patch and this cancellation patch, I was unable to easily OOM the database with query 1 concurrency=1024. But it still OOMed with query 18.

@yuzefovich
Copy link
Member

Hm, that patch does seem to improve things, but I still get OOMs before the queueing of flows occurs (without the patch it is OOMing sooner). It's possible that my WIP on cancellation is to blame, but for now I'll just use machines with more RAM.

@yuzefovich
Copy link
Member

I'm having hard time trying to manually reproduce the same scenario even after going to machines with 26GB of RAM and lower workmem to 4MiB using Q1 (which forces the input rows tracking to spill to disk) - the nodes become overwhelmed with 1024 concurrency. I'll cherry-pick the fetcher patch on top of master to make sure it's not my WIP making things worse.

@yuzefovich
Copy link
Member

Ok, it is my patch to blame - can repro easily on master with the fetcher patch but without my WIP.

@yuzefovich
Copy link
Member

Alright, I'm not sure why I had harder time reproducing this queueing behavior. Eventually I lowered the sql.distsql.max_running_flows cluster setting (default 500) to see how my WIP patch behaves, and I think it works quite well.

Here, I had the setting at 20 (meaning 60 active flows on the whole 3 node cluster) and with 128 concurrency for about 2 minutes we get this behavior (first peak is with patch):
Screen Shot 2021-05-13 at 5 26 37 PM

Here, I had the setting at 100 and with 400 concurrency, also for about 2 minutes (first peak is without patch):
Screen Shot 2021-05-13 at 5 35 30 PM

Will clean the patch up, add a unit test, and push up for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-bug Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants