diff --git a/pkg/storage/concurrency/concurrency_control.go b/pkg/storage/concurrency/concurrency_control.go index 0d0b08a07a27..7d1d6127def9 100644 --- a/pkg/storage/concurrency/concurrency_control.go +++ b/pkg/storage/concurrency/concurrency_control.go @@ -561,7 +561,7 @@ type lockTableGuard interface { // have an initial notification. Note that notifications are collapsed if // not retrieved, since it is not necessary for the waiter to see every // state transition. - NewStateChan() <-chan struct{} + NewStateChan() chan struct{} // CurState returns the latest waiting state. CurState() waitingState diff --git a/pkg/storage/concurrency/concurrency_manager_test.go b/pkg/storage/concurrency/concurrency_manager_test.go index 477fb423a083..fa13989568bf 100644 --- a/pkg/storage/concurrency/concurrency_manager_test.go +++ b/pkg/storage/concurrency/concurrency_manager_test.go @@ -20,7 +20,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "testing" "time" @@ -188,7 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { log.Event(ctx, "sequencing complete, returned no guard") } }) - return mon.waitAndCollect(t) + return c.waitAndCollect(t, mon) case "finish": var reqName string @@ -206,7 +205,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { delete(c.guardsByReqName, reqName) c.mu.Unlock() }) - return mon.waitAndCollect(t) + return c.waitAndCollect(t, mon) case "handle-write-intent-error": var reqName string @@ -234,7 +233,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { log.Eventf(ctx, "handling %v", err) guard = m.HandleWriterIntentError(ctx, guard, err) }) - return mon.waitAndCollect(t) + return c.waitAndCollect(t, mon) case "on-lock-acquired": var txnName string @@ -253,7 +252,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { up := roachpb.MakeLockUpdateWithDur(txn, span, lock.Unreplicated) m.OnLockAcquired(ctx, &up) }) - return mon.waitAndCollect(t) + return c.waitAndCollect(t, mon) case "on-txn-updated": var txnName string @@ -291,7 +290,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { d.Fatalf(t, err.Error()) } }) - return mon.waitAndCollect(t) + return c.waitAndCollect(t, mon) case "debug-latch-manager": global, local := m.LatchMetrics() @@ -312,6 +311,10 @@ func TestConcurrencyManagerBasic(t *testing.T) { if err := c.reset(); err != nil { d.Fatalf(t, "could not reset cluster: %v", err) } + // Reset request and txn namespace? + if d.HasArg("namespace") { + c.resetNamespace() + } return "" default: @@ -340,16 +343,22 @@ type cluster struct { mu syncutil.Mutex guardsByReqName map[string]*concurrency.Guard txnRecords map[uuid.UUID]*txnRecord + txnPushes map[uuid.UUID]*txnPush } type txnRecord struct { mu syncutil.Mutex - cond sync.Cond + sig chan struct{} txn *roachpb.Transaction // immutable, modify fields below updatedStatus roachpb.TransactionStatus updatedTimestamp hlc.Timestamp } +type txnPush struct { + ctx context.Context + pusher, pushee uuid.UUID +} + func newCluster() *cluster { return &cluster{ nodeDesc: &roachpb.NodeDescriptor{NodeID: 1}, @@ -359,13 +368,14 @@ func newCluster() *cluster { requestsByName: make(map[string]concurrency.Request), guardsByReqName: make(map[string]*concurrency.Guard), txnRecords: make(map[uuid.UUID]*txnRecord), + txnPushes: make(map[uuid.UUID]*txnPush), } } func (c *cluster) makeConfig() concurrency.Config { st := clustersettings.MakeTestingClusterSettings() - concurrency.LockTableLivenessPushDelay.Override(&st.SV, 1*time.Millisecond) - concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 1*time.Millisecond) + concurrency.LockTableLivenessPushDelay.Override(&st.SV, 0*time.Millisecond) + concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0*time.Millisecond) return concurrency.Config{ NodeDesc: c.nodeDesc, RangeDesc: c.rangeDesc, @@ -384,11 +394,23 @@ func (c *cluster) PushTransaction( if err != nil { return roachpb.Transaction{}, roachpb.NewError(err) } - // Wait for the transaction to be pushed. - pusheeRecord.mu.Lock() - defer pusheeRecord.mu.Unlock() + var pusherRecord *txnRecord + if h.Txn != nil { + pusherID := h.Txn.ID + pusherRecord, err = c.getTxnRecord(pusherID) + if err != nil { + return roachpb.Transaction{}, roachpb.NewError(err) + } + + push, err := c.registerPush(ctx, pusherID, pushee.ID) + if err != nil { + return roachpb.Transaction{}, roachpb.NewError(err) + } + defer c.unregisterPush(push) + } for { - pusheeTxn := pusheeRecord.asTxnLocked() + // Is the pushee pushed? + pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn() var pushed bool switch pushType { case roachpb.PUSH_TIMESTAMP: @@ -401,7 +423,24 @@ func (c *cluster) PushTransaction( if pushed { return pusheeTxn, nil } - pusheeRecord.cond.Wait() + // Or the pusher aborted? + var pusherRecordSig chan struct{} + if pusherRecord != nil { + var pusherTxn roachpb.Transaction + pusherTxn, pusherRecordSig = pusherRecord.asTxn() + if pusherTxn.Status == roachpb.ABORTED { + log.Eventf(ctx, "detected pusher aborted") + err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED) + return roachpb.Transaction{}, roachpb.NewError(err) + } + } + // Wait until either record is updated. + select { + case <-pusheeRecordSig: + case <-pusherRecordSig: + case <-ctx.Done(): + return roachpb.Transaction{}, roachpb.NewError(ctx.Err()) + } } } @@ -424,8 +463,7 @@ func (c *cluster) registerTxn(name string, txn *roachpb.Transaction) { c.mu.Lock() defer c.mu.Unlock() c.txnsByName[name] = txn - r := &txnRecord{txn: txn} - r.cond.L = &r.mu + r := &txnRecord{txn: txn, sig: make(chan struct{})} c.txnRecords[txn.ID] = r } @@ -452,17 +490,89 @@ func (c *cluster) updateTxnRecord( defer r.mu.Unlock() r.updatedStatus = status r.updatedTimestamp = ts - r.cond.Broadcast() + // Notify all listeners. This is a poor man's composable cond var. + close(r.sig) + r.sig = make(chan struct{}) return nil } -func (r *txnRecord) asTxnLocked() roachpb.Transaction { +func (r *txnRecord) asTxn() (roachpb.Transaction, chan struct{}) { + r.mu.Lock() + defer r.mu.Unlock() txn := *r.txn if r.updatedStatus > txn.Status { txn.Status = r.updatedStatus } txn.WriteTimestamp.Forward(r.updatedTimestamp) - return txn + return txn, r.sig +} + +func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*txnPush, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.txnPushes[pusher]; ok { + return nil, errors.Errorf("txn %v already pushing", pusher) + } + p := &txnPush{ + ctx: ctx, + pusher: pusher, + pushee: pushee, + } + c.txnPushes[pusher] = p + return p, nil +} + +func (c *cluster) unregisterPush(push *txnPush) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.txnPushes, push.pusher) +} + +// detectDeadlocks looks at all in-flight transaction pushes and determines +// whether any are blocked due to dependency cycles within transactions. If so, +// the method logs an event on the contexts of each of the members of the cycle. +func (c *cluster) detectDeadlocks() { + // This cycle detection algorithm it not particularly efficient - at worst + // it runs in O(n ^ 2) time. However, it's simple and effective at assigning + // each member of each cycle a unique view of the cycle that it's a part of. + // This works because we currently only allow a transaction to push a single + // other transaction at a time. + c.mu.Lock() + defer c.mu.Unlock() + var chain []uuid.UUID + seen := make(map[uuid.UUID]struct{}) + for orig, origPush := range c.txnPushes { + pusher := orig + chain = append(chain[:0], orig) + for id := range seen { + delete(seen, id) + } + seen[pusher] = struct{}{} + for { + push, ok := c.txnPushes[pusher] + if !ok { + break + } + pusher = push.pushee + chain = append(chain, pusher) + if _, ok := seen[pusher]; ok { + // Cycle detected! + if pusher == orig { + // The cycle we were looking for (i.e. starting at orig). + var chainBuf strings.Builder + for i, id := range chain { + if i > 0 { + chainBuf.WriteString("->") + } + chainBuf.WriteString(id.Short()) + } + log.Eventf(origPush.ctx, "dependency cycle detected %s", chainBuf.String()) + } + break + } + seen[pusher] = struct{}{} + } + } } // reset clears all request state in the cluster. This avoids portions of tests @@ -494,6 +604,17 @@ func (c *cluster) reset() error { return nil } +// resetNamespace resets the entire cluster namespace, clearing both request +// definitions and transaction definitions. +func (c *cluster) resetNamespace() { + c.mu.Lock() + defer c.mu.Unlock() + c.txnCounter = 0 + c.txnsByName = make(map[string]*roachpb.Transaction) + c.requestsByName = make(map[string]concurrency.Request) + c.txnRecords = make(map[uuid.UUID]*txnRecord) +} + // collectSpans collects the declared spans for a set of requests. // Its logic mirrors that in Replica.collectSpans. func (c *cluster) collectSpans( @@ -520,6 +641,12 @@ func (c *cluster) collectSpans( return latchSpans, lockSpans } +func (c *cluster) waitAndCollect(t *testing.T, m *monitor) string { + m.waitForAsyncGoroutinesToStall(t) + c.detectDeadlocks() + return m.collectRecordings() +} + // monitor tracks a set of running goroutines as they execute and captures // tracing recordings from them. It is capable of watching its set of goroutines // until they all mutually stall. @@ -589,11 +716,6 @@ func (m *monitor) resetSeqNums() { m.seq = 0 } -func (m *monitor) waitAndCollect(t *testing.T) string { - m.waitForAsyncGoroutinesToStall(t) - return m.collectRecordings() -} - func (m *monitor) collectRecordings() string { // Collect trace recordings from each goroutine. type logRecord struct { diff --git a/pkg/storage/concurrency/lock_table.go b/pkg/storage/concurrency/lock_table.go index 2d36eb47c982..b7347da1d1cd 100644 --- a/pkg/storage/concurrency/lock_table.go +++ b/pkg/storage/concurrency/lock_table.go @@ -294,7 +294,7 @@ func (g *lockTableGuardImpl) ShouldWait() bool { return g.mu.startWait } -func (g *lockTableGuardImpl) NewStateChan() <-chan struct{} { +func (g *lockTableGuardImpl) NewStateChan() chan struct{} { g.mu.Lock() defer g.mu.Unlock() return g.mu.signal diff --git a/pkg/storage/concurrency/lock_table_waiter.go b/pkg/storage/concurrency/lock_table_waiter.go index 09f5e113e3b8..3ac39dcca2b0 100644 --- a/pkg/storage/concurrency/lock_table_waiter.go +++ b/pkg/storage/concurrency/lock_table_waiter.go @@ -12,6 +12,7 @@ package concurrency import ( "context" + "math" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -82,7 +83,8 @@ type lockTableWaiterImpl struct { stopper *stop.Stopper ir IntentResolver - // When set, WriteIntentError are propagated instead of pushing. + // When set, WriteIntentError are propagated instead of pushing + // conflicting transactions. disableTxnPushing bool } @@ -109,6 +111,7 @@ func (w *lockTableWaiterImpl) WaitOn( newStateC := guard.NewStateChan() ctxDoneC := ctx.Done() shouldQuiesceC := w.stopper.ShouldQuiesce() + // Used to delay liveness and deadlock detection pushes. var timer *timeutil.Timer var timerC <-chan time.Time var timerWaitingState waitingState @@ -117,72 +120,13 @@ func (w *lockTableWaiterImpl) WaitOn( case <-newStateC: timerC = nil state := guard.CurState() - if !state.held { - // If the lock is not held and instead has a reservation, we don't - // want to push the reservation transaction. A transaction push will - // block until the pushee transaction has either committed, aborted, - // pushed, or rolled back savepoints, i.e., there is some state - // change that has happened to the transaction record that unblocks - // the pusher. It will not unblock merely because a request issued - // by the pushee transaction has completed and released a - // reservation. Note that: - // - reservations are not a guarantee that the lock will be acquired. - // - the following two reasons to push do not apply to requests - // holding reservations: - // 1. competing requests compete at exactly one lock table, so there - // is no possibility of distributed deadlock due to reservations. - // 2. the lock table can prioritize requests based on transaction - // priorities. - // - // TODO(sbhola): remove the need for this by only notifying waiters - // for held locks and never for reservations. - // TODO(sbhola): now that we never push reservation holders, we - // should stop special-casing non-transactional writes and let them - // acquire reservations. - switch state.stateKind { - case waitFor, waitForDistinguished: - continue - case waitElsewhere: - return nil - } - } - switch state.stateKind { - case waitFor: + case waitFor, waitForDistinguished: // waitFor indicates that the request is waiting on another // transaction. This transaction may be the lock holder of a // conflicting lock or the head of a lock-wait queue that the // request is a part of. - - // For non-transactional requests, there's no need to perform - // deadlock detection and the other "distinguished" (see below) - // pusher will already push to detect coordinator failures and - // abandoned locks, so there's no need to do anything in this - // state. - if req.Txn == nil { - continue - } - - // For transactional requests, the request should push to - // resolve this conflict and detect deadlocks, but only after - // delay. This delay avoids unnecessary push traffic when the - // conflicting transaction is continuing to make forward - // progress. - delay := LockTableDeadlockDetectionPushDelay.Get(&w.st.SV) - if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) { - // However, if the pushee has the minimum priority or if the - // pusher has the maximum priority, push immediately. - delay = 0 - } - if timer == nil { - timer = timeutil.NewTimer() - defer timer.Stop() - } - timer.Reset(delay) - timerC = timer.C - timerWaitingState = state - - case waitForDistinguished: + // // waitForDistinguished is like waitFor, except it instructs the // waiter to quickly push the conflicting transaction after a short // liveness push delay instead of waiting out the full deadlock @@ -197,15 +141,52 @@ func (w *lockTableWaiterImpl) WaitOn( // had a cache of aborted transaction IDs that allowed us to notice // and quickly resolve abandoned intents then we might be able to // get rid of this state. - delay := minDuration( - LockTableLivenessPushDelay.Get(&w.st.SV), - LockTableDeadlockDetectionPushDelay.Get(&w.st.SV), - ) + livenessPush := state.stateKind == waitForDistinguished + deadlockPush := true + + // If the conflict is a reservation holder and not a held lock then + // there's no need to perform a liveness push - the request must be + // alive or its context would have been canceled and it would have + // exited its lock wait-queues. + if !state.held { + livenessPush = false + } + + // For non-transactional requests, there's no need to perform + // deadlock detection because a non-transactional request can + // not be part of a dependency cycle. Non-transactional requests + // cannot hold locks or reservations. + if req.Txn == nil { + deadlockPush = false + } + + // If the request doesn't want to perform a push for either + // reason, continue waiting. + if !livenessPush && !deadlockPush { + continue + } + + // The request should push to detect abandoned locks due to + // failed transaction coordinators, detect deadlocks between + // transactions, or both, but only after delay. This delay + // avoids unnecessary push traffic when the conflicting + // transaction is continuing to make forward progress. + delay := time.Duration(math.MaxInt64) + if livenessPush { + delay = minDuration(delay, LockTableLivenessPushDelay.Get(&w.st.SV)) + } + if deadlockPush { + delay = minDuration(delay, LockTableDeadlockDetectionPushDelay.Get(&w.st.SV)) + } + + // However, if the pushee has the minimum priority or if the + // pusher has the maximum priority, push immediately. + // TODO(nvanbenschoten): flesh these interactions out more and + // add some testing. if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) { - // However, if the pushee has the minimum priority or if the - // pusher has the maximum priority, push immediately. delay = 0 } + if timer == nil { timer = timeutil.NewTimer() defer timer.Stop() @@ -216,14 +197,20 @@ func (w *lockTableWaiterImpl) WaitOn( case waitElsewhere: // The lockTable has hit a memory limit and is no longer maintaining - // proper lock wait-queues. However, the waiting request is still - // not safe to proceed with evaluation because there is still a - // transaction holding the lock. It should push the transaction it - // is blocked on immediately to wait in that transaction's - // txnWaitQueue. Once this completes, the request should stop - // waiting on this lockTableGuard, as it will no longer observe - // lock-table state transitions. - return w.pushTxn(ctx, req, state) + // proper lock wait-queues. + if !state.held { + // If the lock is not held, exit immediately. Requests will + // be ordered when acquiring latches. + return nil + } + // The waiting request is still not safe to proceed with + // evaluation because there is still a transaction holding the + // lock. It should push the transaction it is blocked on + // immediately to wait in that transaction's txnWaitQueue. Once + // this completes, the request should stop waiting on this + // lockTableGuard, as it will no longer observe lock-table state + // transitions. + return w.pushLockTxn(ctx, req, state) case waitSelf: // Another request from the same transaction is the reservation @@ -251,7 +238,43 @@ func (w *lockTableWaiterImpl) WaitOn( // has crashed. timerC = nil timer.Read = true - if err := w.pushTxn(ctx, req, timerWaitingState); err != nil { + + // If the request is conflicting with a held lock then it pushes its + // holder synchronously - there is no way it will be able to proceed + // until the lock's transaction undergoes a state transition (either + // completing or being pushed) and then updates the lock's state + // through intent resolution. The request has a dependency on the + // entire conflicting transaction. + // + // However, if the request is conflicting with another request (a + // reservation holder) then it pushes the reservation holder + // asynchronously while continuing to listen to state transition in + // the lockTable. This allows the request to cancel its push if the + // conflicting reservation exits the lock wait-queue without leaving + // behind a lock. In this case, the request has a dependency on the + // conflicting request but not necessarily the entire conflicting + // transaction. + var err *Error + if timerWaitingState.held { + err = w.pushLockTxn(ctx, req, timerWaitingState) + } else { + // It would be more natural to launch an async task for the push + // and continue listening on this goroutine for lockTable state + // transitions, but doing so is harder to test against. Instead, + // we launch an async task to listen to lockTable state and + // synchronously push. If the watcher goroutine detects a + // lockTable change, it cancels the context on the push. + pushCtx, pushCancel := context.WithCancel(ctx) + go w.watchForNotifications(pushCtx, pushCancel, newStateC) + err = w.pushRequestTxn(pushCtx, req, timerWaitingState) + pushCancel() + if pushCtx.Err() == context.Canceled { + // Ignore the context canceled error. If this was for the + // parent context then we'll notice on the next select. + err = nil + } + } + if err != nil { return err } @@ -264,13 +287,112 @@ func (w *lockTableWaiterImpl) WaitOn( } } -func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waitingState) *Error { +// pushLockTxn pushes the holder of the provided lock. +// +// The method blocks until the lock holder transaction experiences a state +// transition such that it no longer conflicts with the pusher's request. The +// method then synchronously updates the lock to trigger a state transition in +// the lockTable that will free up the request to proceed. If the method returns +// successfully then the caller can expect to have an updated waitingState. +func (w *lockTableWaiterImpl) pushLockTxn( + ctx context.Context, req Request, ws waitingState, +) *Error { if w.disableTxnPushing { return roachpb.NewError(&roachpb.WriteIntentError{ Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)}, }) } + // Determine which form of push to use. For read-write conflicts, try to + // push the lock holder's timestamp forward so the read request can read + // under the lock. For write-write conflicts, try to abort the lock holder + // entirely so the write request can revoke and replace the lock with its + // own lock. + var pushType roachpb.PushTxnType + switch ws.guardAccess { + case spanset.SpanReadOnly: + pushType = roachpb.PUSH_TIMESTAMP + case spanset.SpanReadWrite: + pushType = roachpb.PUSH_ABORT + } + + h := w.pushHeader(req) + pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + if err != nil { + return err + } + + // If the push succeeded then the lock holder transaction must have + // experienced a state transition such that it no longer conflicts with + // the pusher's request. This state transition could have been any of the + // following, each of which would be captured in the pusheeTxn proto: + // 1. the pushee was committed + // 2. the pushee was aborted + // 3. the pushee was pushed to a higher provisional commit timestamp such + // that once its locks are updated to reflect this, they will no longer + // conflict with the pusher request. This is only applicable if pushType + // is PUSH_TIMESTAMP. + // 4. the pushee rolled back all sequence numbers that it held the + // conflicting lock at. This allows the lock to be revoked entirely. + // TODO(nvanbenschoten): we do not currently detect this case. Doing so + // would not be useful until we begin eagerly updating a transaction's + // record upon rollbacks to savepoints. + // + // Update the conflicting lock to trigger the desired state transition in + // the lockTable itself, which will allow the request to proceed. + // + // We always poison due to limitations of the API: not poisoning equals + // clearing the AbortSpan, and if our pushee transaction first got pushed + // for timestamp (by us), then (by someone else) aborted and poisoned, and + // then we run the below code, we're clearing the AbortSpan illegaly. + // Furthermore, even if our pushType is not PUSH_ABORT, we may have ended up + // with the responsibility to abort the intents (for example if we find the + // transaction aborted). To do better here, we need per-intent information + // on whether we need to poison. + resolve := roachpb.MakeLockUpdateWithDur(&pusheeTxn, roachpb.Span{Key: ws.key}, ws.dur) + opts := intentresolver.ResolveOptions{Poison: true} + return w.ir.ResolveIntent(ctx, resolve, opts) +} + +// pushRequestTxn pushes the owner of the provided request. +// +// The method blocks until either the pusher's transaction is aborted or the +// pushee's transaction is finalized (committed or aborted). If the pusher's +// transaction is aborted then the method will send an error on the channel and +// the pusher should exit its lock wait-queues. If the pushee's transaction is +// finalized then the method will send no error on the channel. The pushee is +// expected to notice that it has been aborted during its next attempt to push +// another transaction and will exit its lock wait-queues. +// +// However, the method responds to context cancelation and will terminate the +// push attempt if its context is canceled. This allows the caller to revoke a +// push if it determines that the pushee is no longer blocking the request. The +// caller is expected to terminate the push if it observes any state transitions +// in the lockTable. As such, the push is only expected to be allowed to run to +// completion in cases where requests are truly deadlocked. +func (w *lockTableWaiterImpl) pushRequestTxn( + ctx context.Context, req Request, ws waitingState, +) *Error { + // Regardless of whether the waiting request is reading from or writing to a + // key, it always performs a PUSH_ABORT when pushing a conflicting request + // because it wants to block until either a) the pushee or the pusher is + // aborted due to a deadlock or b) the request exits the lock wait-queue and + // the caller of this function cancels the push. + pushType := roachpb.PUSH_ABORT + + h := w.pushHeader(req) + _, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) + // Even if the push succeeded and aborted the other transaction to break a + // deadlock, there's nothing for the pusher to clean up. The conflicting + // request will quickly exit the lock wait-queue and release its reservation + // once it notices that it is aborted and the pusher will be free to proceed + // because it was not waiting on any locks. If the pusher's request does end + // up hitting a lock which the pushee fails to clean up, it will perform the + // cleanup itself using pushLockTxn. + return err +} + +func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header { h := roachpb.Header{ Timestamp: req.Timestamp, UserPriority: req.Priority, @@ -293,36 +415,26 @@ func (w *lockTableWaiterImpl) pushTxn(ctx context.Context, req Request, ws waiti h.Timestamp.Forward(obsTS) } } + return h +} - var pushType roachpb.PushTxnType - switch ws.guardAccess { - case spanset.SpanReadOnly: - pushType = roachpb.PUSH_TIMESTAMP - case spanset.SpanReadWrite: - pushType = roachpb.PUSH_ABORT - } - - pusheeTxn, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType) - if err != nil { - return err - } - if !ws.held { - return nil +// watchForNotifications selects on the provided channel and watches for any +// updates. If the channel is ever notified, it calls the provided context +// cancelation function and exits. +func (w *lockTableWaiterImpl) watchForNotifications( + ctx context.Context, cancel func(), newStateC chan struct{}, +) { + select { + case <-newStateC: + // Re-signal the channel. + select { + case newStateC <- struct{}{}: + default: + } + // Cancel the context of the async task. + cancel() + case <-ctx.Done(): } - - // We always poison due to limitations of the API: not poisoning equals - // clearing the AbortSpan, and if our pushee transaction first got pushed - // for timestamp (by us), then (by someone else) aborted and poisoned, and - // then we run the below code, we're clearing the AbortSpan illegaly. - // Furthermore, even if our pushType is not PUSH_ABORT, we may have ended - // up with the responsibility to abort the intents (for example if we find - // the transaction aborted). - // - // To do better here, we need per-intent information on whether we need to - // poison. - resolve := roachpb.MakeLockUpdateWithDur(&pusheeTxn, roachpb.Span{Key: ws.key}, ws.dur) - opts := intentresolver.ResolveOptions{Poison: true} - return w.ir.ResolveIntent(ctx, resolve, opts) } func hasMinPriority(txn *enginepb.TxnMeta) bool { diff --git a/pkg/storage/concurrency/lock_table_waiter_test.go b/pkg/storage/concurrency/lock_table_waiter_test.go index 564315f869f2..204394da244b 100644 --- a/pkg/storage/concurrency/lock_table_waiter_test.go +++ b/pkg/storage/concurrency/lock_table_waiter_test.go @@ -49,8 +49,8 @@ type mockLockTableGuard struct { stateObserved chan struct{} } -func (g *mockLockTableGuard) ShouldWait() bool { return true } -func (g *mockLockTableGuard) NewStateChan() <-chan struct{} { return g.signal } +func (g *mockLockTableGuard) ShouldWait() bool { return true } +func (g *mockLockTableGuard) NewStateChan() chan struct{} { return g.signal } func (g *mockLockTableGuard) CurState() waitingState { s := g.state if g.stateObserved != nil { @@ -111,25 +111,7 @@ func TestLockTableWaiterWithTxn(t *testing.T) { }) t.Run("waitSelf", func(t *testing.T) { - w, _, g := setupLockTableWaiterTest() - defer w.stopper.Stop(ctx) - - // Set up an observer channel to detect when the current - // waiting state is observed. - g.state = waitingState{stateKind: waitSelf} - g.stateObserved = make(chan struct{}) - go func() { - g.notify() - <-g.stateObserved - g.notify() - <-g.stateObserved - g.state = waitingState{stateKind: doneWaiting} - g.notify() - <-g.stateObserved - }() - - err := w.WaitOn(ctx, makeReq(), g) - require.Nil(t, err) + testWaitNoopUntilDone(t, waitSelf, makeReq) }) t.Run("doneWaiting", func(t *testing.T) { @@ -187,26 +169,7 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) { t.Run("state", func(t *testing.T) { t.Run("waitFor", func(t *testing.T) { t.Log("waitFor does not cause non-transactional requests to push") - - w, _, g := setupLockTableWaiterTest() - defer w.stopper.Stop(ctx) - - // Set up an observer channel to detect when the current - // waiting state is observed. - g.state = waitingState{stateKind: waitFor} - g.stateObserved = make(chan struct{}) - go func() { - g.notify() - <-g.stateObserved - g.notify() - <-g.stateObserved - g.state = waitingState{stateKind: doneWaiting} - g.notify() - <-g.stateObserved - }() - - err := w.WaitOn(ctx, makeReq(), g) - require.Nil(t, err) + testWaitNoopUntilDone(t, waitFor, makeReq) }) t.Run("waitForDistinguished", func(t *testing.T) { @@ -268,6 +231,7 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h defer w.stopper.Stop(ctx) pusheeTxn := makeTxnProto("pushee") + req := makeReq() g.state = waitingState{ stateKind: k, txn: &pusheeTxn.TxnMeta, @@ -282,63 +246,57 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h } g.notify() - req := makeReq() - if lockHeld { - // We expect the holder to be pushed. - ir.pushTxn = func( - _ context.Context, - pusheeArg *enginepb.TxnMeta, - h roachpb.Header, - pushType roachpb.PushTxnType, - ) (roachpb.Transaction, *Error) { - require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) - require.Equal(t, req.Txn, h.Txn) - require.Equal(t, expPushTS, h.Timestamp) - if waitAsWrite { - require.Equal(t, roachpb.PUSH_ABORT, pushType) - } else { - require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) - } + // waitElsewhere does not cause a push if the lock is not held. + // It returns immediately. + if k == waitElsewhere && !lockHeld { + err := w.WaitOn(ctx, req, g) + require.Nil(t, err) + return + } - resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} - - // If the lock is held, we'll try to resolve it now that - // we know the holder is ABORTED. Otherwide, immediately - // tell the request to stop waiting. - if lockHeld { - ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { - require.Equal(t, keyA, intent.Key) - require.Equal(t, pusheeTxn.ID, intent.Txn.ID) - require.Equal(t, roachpb.ABORTED, intent.Status) - g.state = waitingState{stateKind: doneWaiting} - g.notify() - return nil - } - } else { - g.state = waitingState{stateKind: doneWaiting} - g.notify() - } - return resp, nil + // Non-transactional requests do not push reservations, only locks. + // They wait for doneWaiting. + if req.Txn == nil && !lockHeld { + defer notifyUntilDone(t, g)() + err := w.WaitOn(ctx, req, g) + require.Nil(t, err) + return + } + + ir.pushTxn = func( + _ context.Context, + pusheeArg *enginepb.TxnMeta, + h roachpb.Header, + pushType roachpb.PushTxnType, + ) (roachpb.Transaction, *Error) { + require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg) + require.Equal(t, req.Txn, h.Txn) + require.Equal(t, expPushTS, h.Timestamp) + if waitAsWrite || !lockHeld { + require.Equal(t, roachpb.PUSH_ABORT, pushType) + } else { + require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType) } - } else { - switch k { - case waitFor, waitForDistinguished: - // We don't expect the holder to be pushed. Set up an observer - // channel to detect when the current waiting state is observed. - g.stateObserved = make(chan struct{}) - go func() { - <-g.stateObserved - g.notify() - <-g.stateObserved + + resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED} + + // If the lock is held, we'll try to resolve it now that + // we know the holder is ABORTED. Otherwide, immediately + // tell the request to stop waiting. + if lockHeld { + ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error { + require.Equal(t, keyA, intent.Key) + require.Equal(t, pusheeTxn.ID, intent.Txn.ID) + require.Equal(t, roachpb.ABORTED, intent.Status) g.state = waitingState{stateKind: doneWaiting} g.notify() - <-g.stateObserved - }() - case waitElsewhere: - // Expect an immediate return. - default: - t.Fatalf("unexpected state: %v", k) + return nil + } + } else { + g.state = waitingState{stateKind: doneWaiting} + g.notify() } + return resp, nil } err := w.WaitOn(ctx, req, g) @@ -347,6 +305,36 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h }) } +func testWaitNoopUntilDone(t *testing.T, k stateKind, makeReq func() Request) { + ctx := context.Background() + w, _, g := setupLockTableWaiterTest() + defer w.stopper.Stop(ctx) + + g.state = waitingState{stateKind: k} + g.notify() + defer notifyUntilDone(t, g)() + + err := w.WaitOn(ctx, makeReq(), g) + require.Nil(t, err) +} + +func notifyUntilDone(t *testing.T, g *mockLockTableGuard) func() { + // Set up an observer channel to detect when the current + // waiting state is observed. + g.stateObserved = make(chan struct{}) + done := make(chan struct{}) + go func() { + <-g.stateObserved + g.notify() + <-g.stateObserved + g.state = waitingState{stateKind: doneWaiting} + g.notify() + <-g.stateObserved + close(done) + }() + return func() { <-done } +} + // TestLockTableWaiterIntentResolverError tests that the lockTableWaiter // propagates errors from its intent resolver when it pushes transactions // or resolves their intents. diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/basic b/pkg/storage/concurrency/testdata/concurrency_manager/basic index 43a4bb52aced..4aa452ab8e2c 100644 --- a/pkg/storage/concurrency/testdata/concurrency_manager/basic +++ b/pkg/storage/concurrency/testdata/concurrency_manager/basic @@ -106,7 +106,7 @@ sequence req=req3 [1] sequence req3: scanning lock table for conflicting locks [1] sequence req3: waiting in lock wait-queues [1] sequence req3: pushing txn 00000002 -[1] sequence req3: blocked on sync.Cond.Wait in concurrency_test.(*cluster).PushTransaction +[1] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed ---- @@ -175,7 +175,7 @@ sequence req=req5 [1] sequence req5: scanning lock table for conflicting locks [1] sequence req5: waiting in lock wait-queues [1] sequence req5: pushing txn 00000002 -[1] sequence req5: blocked on sync.Cond.Wait in concurrency_test.(*cluster).PushTransaction +[1] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction new-request name=req6 txn=none ts=16,1 scan key=c endkey=z @@ -220,7 +220,7 @@ finish req=req6 [3] sequence req7: scanning lock table for conflicting locks [3] sequence req7: waiting in lock wait-queues [3] sequence req7: pushing txn 00000002 -[3] sequence req7: blocked on sync.Cond.Wait in concurrency_test.(*cluster).PushTransaction +[3] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn2 status=committed ---- diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/deadlocks b/pkg/storage/concurrency/testdata/concurrency_manager/deadlocks new file mode 100644 index 000000000000..ae78a269c293 --- /dev/null +++ b/pkg/storage/concurrency/testdata/concurrency_manager/deadlocks @@ -0,0 +1,627 @@ +# ------------------------------------------------------------- +# Deadlock due to lock ordering. +# +# Setup: txn1, txn2, txn3 acquire locks a, b, c +# +# Test: txn1, txn2, txn3 read b, c, a +# txn1 is aborted to break deadlock +# txn3 proceeds and commits +# txn2 proceeds and commits +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-request name=req1w txn=txn1 ts=10,1 + put key=a value=v +---- + +new-request name=req2w txn=txn2 ts=10,1 + put key=b value=v +---- + +new-request name=req3w txn=txn3 ts=10,1 + put key=c value=v +---- + +sequence req=req1w +---- +[1] sequence req1w: sequencing request +[1] sequence req1w: acquiring latches +[1] sequence req1w: scanning lock table for conflicting locks +[1] sequence req1w: sequencing complete, returned guard + +sequence req=req2w +---- +[2] sequence req2w: sequencing request +[2] sequence req2w: acquiring latches +[2] sequence req2w: scanning lock table for conflicting locks +[2] sequence req2w: sequencing complete, returned guard + +sequence req=req3w +---- +[3] sequence req3w: sequencing request +[3] sequence req3w: acquiring latches +[3] sequence req3w: scanning lock table for conflicting locks +[3] sequence req3w: sequencing complete, returned guard + +on-lock-acquired txn=txn1 key=a +---- +[-] acquire lock: txn1 @ a + +on-lock-acquired txn=txn2 key=b +---- +[-] acquire lock: txn2 @ b + +on-lock-acquired txn=txn3 key=c +---- +[-] acquire lock: txn3 @ c + +finish req=req1w +---- +[-] finish req1w: finishing request + +finish req=req2w +---- +[-] finish req2w: finishing request + +finish req=req3w +---- +[-] finish req3w: finishing request + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 +local: num=0 + +# -------------------------------- +# Setup complete, test starts here +# -------------------------------- + +new-request name=req1r txn=txn1 ts=10,1 + get key=b +---- + +new-request name=req2r txn=txn2 ts=10,1 + get key=c +---- + +new-request name=req3r txn=txn3 ts=10,1 + get key=a +---- + +sequence req=req1r +---- +[4] sequence req1r: sequencing request +[4] sequence req1r: acquiring latches +[4] sequence req1r: scanning lock table for conflicting locks +[4] sequence req1r: waiting in lock wait-queues +[4] sequence req1r: pushing txn 00000002 +[4] sequence req1r: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req2r +---- +[5] sequence req2r: sequencing request +[5] sequence req2r: acquiring latches +[5] sequence req2r: scanning lock table for conflicting locks +[5] sequence req2r: waiting in lock wait-queues +[5] sequence req2r: pushing txn 00000003 +[5] sequence req2r: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req3r +---- +[4] sequence req1r: dependency cycle detected 00000001->00000002->00000003->00000001 +[5] sequence req2r: dependency cycle detected 00000002->00000003->00000001->00000002 +[6] sequence req3r: sequencing request +[6] sequence req3r: acquiring latches +[6] sequence req3r: scanning lock table for conflicting locks +[6] sequence req3r: waiting in lock wait-queues +[6] sequence req3r: pushing txn 00000001 +[6] sequence req3r: blocked on select in concurrency_test.(*cluster).PushTransaction +[6] sequence req3r: dependency cycle detected 00000003->00000001->00000002->00000003 + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + waiting readers: + req: 6, txn: 00000003-0000-0000-0000-000000000000 + distinguished req: 6 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1 + waiting readers: + req: 4, txn: 00000001-0000-0000-0000-000000000000 + distinguished req: 4 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 + waiting readers: + req: 5, txn: 00000002-0000-0000-0000-000000000000 + distinguished req: 5 +local: num=0 + +# Break the deadlock by aborting txn1. +on-txn-updated txn=txn1 status=aborted +---- +[-] update txn: aborting txn1 +[4] sequence req1r: detected pusher aborted +[4] sequence req1r: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): +[6] sequence req3r: resolving intent "a" for txn 00000001 with ABORTED status +[6] sequence req3r: acquiring latches +[6] sequence req3r: scanning lock table for conflicting locks +[6] sequence req3r: sequencing complete, returned guard + +# Txn3 can proceed and eventually commit. +finish req=req3r +---- +[-] finish req3r: finishing request + +on-txn-updated txn=txn3 status=committed +---- +[-] update txn: committing txn3 +[5] sequence req2r: resolving intent "c" for txn 00000003 with COMMITTED status +[5] sequence req2r: acquiring latches +[5] sequence req2r: scanning lock table for conflicting locks +[5] sequence req2r: sequencing complete, returned guard + +# Txn2 can proceed and eventually commit. +finish req=req2r +---- +[-] finish req2r: finishing request + +on-txn-updated txn=txn2 status=committed +---- +[-] update txn: committing txn2 + +reset namespace +---- + +# ------------------------------------------------------------- +# More complex deadlock due to lock ordering where not all of the +# members of the deadlock are distinguished waiters. +# +# Setup: txn1, txn2, txn3 acquire locks a, b, c +# +# Test: txn4 writes a +# txn1, txn2, txn3 write b, c, a +# txn1 is aborted to break deadlock +# txn3 proceeds and commits +# txn2 proceeds and commits +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-txn name=txn4 ts=10,1 epoch=0 +---- + +new-request name=req1w txn=txn1 ts=10,1 + put key=a value=v +---- + +new-request name=req2w txn=txn2 ts=10,1 + put key=b value=v +---- + +new-request name=req3w txn=txn3 ts=10,1 + put key=c value=v +---- + +sequence req=req1w +---- +[1] sequence req1w: sequencing request +[1] sequence req1w: acquiring latches +[1] sequence req1w: scanning lock table for conflicting locks +[1] sequence req1w: sequencing complete, returned guard + +sequence req=req2w +---- +[2] sequence req2w: sequencing request +[2] sequence req2w: acquiring latches +[2] sequence req2w: scanning lock table for conflicting locks +[2] sequence req2w: sequencing complete, returned guard + +sequence req=req3w +---- +[3] sequence req3w: sequencing request +[3] sequence req3w: acquiring latches +[3] sequence req3w: scanning lock table for conflicting locks +[3] sequence req3w: sequencing complete, returned guard + +on-lock-acquired txn=txn1 key=a +---- +[-] acquire lock: txn1 @ a + +on-lock-acquired txn=txn2 key=b +---- +[-] acquire lock: txn2 @ b + +on-lock-acquired txn=txn3 key=c +---- +[-] acquire lock: txn3 @ c + +finish req=req1w +---- +[-] finish req1w: finishing request + +finish req=req2w +---- +[-] finish req2w: finishing request + +finish req=req3w +---- +[-] finish req3w: finishing request + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 +local: num=0 + +# -------------------------------- +# Setup complete, test starts here +# -------------------------------- + +new-request name=req4w txn=txn4 ts=10,1 + put key=a value=v2 +---- + +new-request name=req1w2 txn=txn1 ts=10,1 + put key=b value=v2 +---- + +new-request name=req2w2 txn=txn2 ts=10,1 + put key=c value=v2 +---- + +new-request name=req3w2 txn=txn3 ts=10,1 + put key=a value=v2 +---- + +sequence req=req4w +---- +[4] sequence req4w: sequencing request +[4] sequence req4w: acquiring latches +[4] sequence req4w: scanning lock table for conflicting locks +[4] sequence req4w: waiting in lock wait-queues +[4] sequence req4w: pushing txn 00000001 +[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req1w2 +---- +[5] sequence req1w2: sequencing request +[5] sequence req1w2: acquiring latches +[5] sequence req1w2: scanning lock table for conflicting locks +[5] sequence req1w2: waiting in lock wait-queues +[5] sequence req1w2: pushing txn 00000002 +[5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req2w2 +---- +[6] sequence req2w2: sequencing request +[6] sequence req2w2: acquiring latches +[6] sequence req2w2: scanning lock table for conflicting locks +[6] sequence req2w2: waiting in lock wait-queues +[6] sequence req2w2: pushing txn 00000003 +[6] sequence req2w2: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req3w2 +---- +[5] sequence req1w2: dependency cycle detected 00000001->00000002->00000003->00000001 +[6] sequence req2w2: dependency cycle detected 00000002->00000003->00000001->00000002 +[7] sequence req3w2: sequencing request +[7] sequence req3w2: acquiring latches +[7] sequence req3w2: scanning lock table for conflicting locks +[7] sequence req3w2: waiting in lock wait-queues +[7] sequence req3w2: pushing txn 00000001 +[7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction +[7] sequence req3w2: dependency cycle detected 00000003->00000001->00000002->00000003 + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 10, txn: 00000004-0000-0000-0000-000000000000 + active: true req: 13, txn: 00000003-0000-0000-0000-000000000000 + distinguished req: 10 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 11, txn: 00000001-0000-0000-0000-000000000000 + distinguished req: 11 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 12, txn: 00000002-0000-0000-0000-000000000000 + distinguished req: 12 +local: num=0 + +# Break the deadlock by aborting txn1. +on-txn-updated txn=txn1 status=aborted +---- +[-] update txn: aborting txn1 +[4] sequence req4w: resolving intent "a" for txn 00000001 with ABORTED status +[4] sequence req4w: acquiring latches +[4] sequence req4w: scanning lock table for conflicting locks +[4] sequence req4w: sequencing complete, returned guard +[5] sequence req1w2: detected pusher aborted +[5] sequence req1w2: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): +[7] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status +[7] sequence req3w2: pushing txn 00000004 +[7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction + +# Txn4 can proceed. +finish req=req4w +---- +[-] finish req4w: finishing request +[7] sequence req3w2: acquiring latches +[7] sequence req3w2: scanning lock table for conflicting locks +[7] sequence req3w2: sequencing complete, returned guard + +# Txn3 can proceed and eventually commit. +finish req=req3w2 +---- +[-] finish req3w2: finishing request + +on-txn-updated txn=txn3 status=committed +---- +[-] update txn: committing txn3 +[6] sequence req2w2: resolving intent "c" for txn 00000003 with COMMITTED status +[6] sequence req2w2: acquiring latches +[6] sequence req2w2: scanning lock table for conflicting locks +[6] sequence req2w2: sequencing complete, returned guard + +# Txn2 can proceed and eventually commit. +finish req=req2w2 +---- +[-] finish req2w2: finishing request + +on-txn-updated txn=txn2 status=committed +---- +[-] update txn: committing txn2 + +reset namespace +---- + +# ------------------------------------------------------------- +# Deadlock due to request ordering. +# +# Setup: txn1, txn2, txn3 acquire locks a, b, c +# txn4 writes to b and c +# txn2 commits +# txn4 acquires reservation for b and blocks on c +# +# Test: txn1 writes to b +# txn3 writes to a +# txn4 is aborted to break deadlock +# txn1 proceeds and commits +# txn3 proceeds and commits +# ------------------------------------------------------------- + +new-txn name=txn1 ts=10,1 epoch=0 +---- + +new-txn name=txn2 ts=10,1 epoch=0 +---- + +new-txn name=txn3 ts=10,1 epoch=0 +---- + +new-txn name=txn4 ts=10,1 epoch=0 +---- + +new-request name=req1w txn=txn1 ts=10,1 + put key=a value=v +---- + +new-request name=req2w txn=txn2 ts=10,1 + put key=b value=v +---- + +new-request name=req3w txn=txn3 ts=10,1 + put key=c value=v +---- + +sequence req=req1w +---- +[1] sequence req1w: sequencing request +[1] sequence req1w: acquiring latches +[1] sequence req1w: scanning lock table for conflicting locks +[1] sequence req1w: sequencing complete, returned guard + +sequence req=req2w +---- +[2] sequence req2w: sequencing request +[2] sequence req2w: acquiring latches +[2] sequence req2w: scanning lock table for conflicting locks +[2] sequence req2w: sequencing complete, returned guard + +sequence req=req3w +---- +[3] sequence req3w: sequencing request +[3] sequence req3w: acquiring latches +[3] sequence req3w: scanning lock table for conflicting locks +[3] sequence req3w: sequencing complete, returned guard + +on-lock-acquired txn=txn1 key=a +---- +[-] acquire lock: txn1 @ a + +on-lock-acquired txn=txn2 key=b +---- +[-] acquire lock: txn2 @ b + +on-lock-acquired txn=txn3 key=c +---- +[-] acquire lock: txn3 @ c + +finish req=req1w +---- +[-] finish req1w: finishing request + +finish req=req2w +---- +[-] finish req2w: finishing request + +finish req=req3w +---- +[-] finish req3w: finishing request + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "b" + holder: txn: 00000002-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 +local: num=0 + +new-request name=req4w txn=txn4 ts=10,1 + put key=b value=v2 + put key=c value=v2 +---- + +sequence req=req4w +---- +[4] sequence req4w: sequencing request +[4] sequence req4w: acquiring latches +[4] sequence req4w: scanning lock table for conflicting locks +[4] sequence req4w: waiting in lock wait-queues +[4] sequence req4w: pushing txn 00000002 +[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction + +on-txn-updated txn=txn2 status=committed +---- +[-] update txn: committing txn2 +[4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status +[4] sequence req4w: pushing txn 00000003 +[4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "b" + res: req: 17, txn: 00000004-0000-0000-0000-000000000000, ts: 0.000000010,1 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 17, txn: 00000004-0000-0000-0000-000000000000 + distinguished req: 17 +local: num=0 + +# -------------------------------- +# Setup complete, test starts here +# -------------------------------- + +new-request name=req1w2 txn=txn1 ts=10,1 + put key=b value=v2 +---- + +new-request name=req3w2 txn=txn3 ts=10,1 + put key=a value=v2 +---- + +sequence req=req1w2 +---- +[5] sequence req1w2: sequencing request +[5] sequence req1w2: acquiring latches +[5] sequence req1w2: scanning lock table for conflicting locks +[5] sequence req1w2: waiting in lock wait-queues +[5] sequence req1w2: pushing txn 00000004 +[5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction + +sequence req=req3w2 +---- +[4] sequence req4w: dependency cycle detected 00000004->00000003->00000001->00000004 +[5] sequence req1w2: dependency cycle detected 00000001->00000004->00000003->00000001 +[6] sequence req3w2: sequencing request +[6] sequence req3w2: acquiring latches +[6] sequence req3w2: scanning lock table for conflicting locks +[6] sequence req3w2: waiting in lock wait-queues +[6] sequence req3w2: pushing txn 00000001 +[6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction +[6] sequence req3w2: dependency cycle detected 00000003->00000001->00000004->00000003 + +debug-lock-table +---- +global: num=3 + lock: "a" + holder: txn: 00000001-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 19, txn: 00000003-0000-0000-0000-000000000000 + distinguished req: 19 + lock: "b" + res: req: 17, txn: 00000004-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 18, txn: 00000001-0000-0000-0000-000000000000 + distinguished req: 18 + lock: "c" + holder: txn: 00000003-0000-0000-0000-000000000000, ts: 0.000000010,1 + queued writers: + active: true req: 17, txn: 00000004-0000-0000-0000-000000000000 + distinguished req: 17 +local: num=0 + +# Break the deadlock by aborting txn4. +on-txn-updated txn=txn4 status=aborted +---- +[-] update txn: aborting txn4 +[4] sequence req4w: detected pusher aborted +[4] sequence req4w: sequencing complete, returned error: TransactionAbortedError(ABORT_REASON_PUSHER_ABORTED): +[5] sequence req1w2: acquiring latches +[5] sequence req1w2: scanning lock table for conflicting locks +[5] sequence req1w2: sequencing complete, returned guard + +# Txn1 can proceed and eventually commit. +finish req=req1w2 +---- +[-] finish req1w2: finishing request + +on-txn-updated txn=txn1 status=committed +---- +[-] update txn: committing txn1 +[6] sequence req3w2: resolving intent "a" for txn 00000001 with COMMITTED status +[6] sequence req3w2: acquiring latches +[6] sequence req3w2: scanning lock table for conflicting locks +[6] sequence req3w2: sequencing complete, returned guard + +# Txn3 can proceed and eventually commit. +finish req=req3w2 +---- +[-] finish req3w2: finishing request + +on-txn-updated txn=txn3 status=committed +---- +[-] update txn: committing txn3 + +reset namespace +---- diff --git a/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock index 05c7abc6cc55..25cdca35a6b9 100644 --- a/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock +++ b/pkg/storage/concurrency/testdata/concurrency_manager/discovered_lock @@ -38,7 +38,7 @@ sequence req=req1 [2] sequence req1: scanning lock table for conflicting locks [2] sequence req1: waiting in lock wait-queues [2] sequence req1: pushing txn 00000001 -[2] sequence req1: blocked on sync.Cond.Wait in concurrency_test.(*cluster).PushTransaction +[2] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction on-txn-updated txn=txn1 status=aborted ----