Skip to content

Commit

Permalink
storage/concurrency: push reservation holders to detect deadlocks
Browse files Browse the repository at this point in the history
This is a partial reversion of #45420.

It turns out that there are cases where a reservation holder is a link
in a dependency cycle. This can happen when a reservation holder txn
is holding on to one reservation while blocking on a lock on another
key. If any txns queued up behind the reservation did not push ~someone~,
they wouldn't detect the deadlock.

```
     range A      .     range B
                  .
       txnB       .       txnC              txnA
        |         .        |  ^________      |
        v         .        v            \    v
 [lock X: (txnA)] .  [lock Y: (txnB)]  [res Z: (txnC)]
```

It turns out that this segment of the dependency cycle is always local
to a single concurrency manager, so it could potentially forward the
push through the reservation links to shorten the cycle and prevent
non-lock holders from ever being the victim of a deadlock abort. This
is tricky though, so for now, we just push.

To address the issue that motivated #45420, we perform this form of
push asynchronously while continuing to listen to state transitions
in the lockTable. If the pusher is unblocked (see #45420 for an example
of when that can happen), it simply cancels the push and proceeds with
navigating the lockTable.
  • Loading branch information
nvanbenschoten committed Mar 3, 2020
1 parent a569548 commit 51178dc
Show file tree
Hide file tree
Showing 8 changed files with 1,078 additions and 229 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ type lockTableGuard interface {
// have an initial notification. Note that notifications are collapsed if
// not retrieved, since it is not necessary for the waiter to see every
// state transition.
NewStateChan() <-chan struct{}
NewStateChan() chan struct{}

// CurState returns the latest waiting state.
CurState() waitingState
Expand Down
170 changes: 146 additions & 24 deletions pkg/storage/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -188,7 +187,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Event(ctx, "sequencing complete, returned no guard")
}
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "finish":
var reqName string
Expand All @@ -206,7 +205,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
delete(c.guardsByReqName, reqName)
c.mu.Unlock()
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "handle-write-intent-error":
var reqName string
Expand Down Expand Up @@ -234,7 +233,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
log.Eventf(ctx, "handling %v", err)
guard = m.HandleWriterIntentError(ctx, guard, err)
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "on-lock-acquired":
var txnName string
Expand All @@ -253,7 +252,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
up := roachpb.MakeLockUpdateWithDur(txn, span, lock.Unreplicated)
m.OnLockAcquired(ctx, &up)
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "on-txn-updated":
var txnName string
Expand Down Expand Up @@ -291,7 +290,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.Fatalf(t, err.Error())
}
})
return mon.waitAndCollect(t)
return c.waitAndCollect(t, mon)

case "debug-latch-manager":
global, local := m.LatchMetrics()
Expand All @@ -312,6 +311,10 @@ func TestConcurrencyManagerBasic(t *testing.T) {
if err := c.reset(); err != nil {
d.Fatalf(t, "could not reset cluster: %v", err)
}
// Reset request and txn namespace?
if d.HasArg("namespace") {
c.resetNamespace()
}
return ""

default:
Expand Down Expand Up @@ -340,16 +343,22 @@ type cluster struct {
mu syncutil.Mutex
guardsByReqName map[string]*concurrency.Guard
txnRecords map[uuid.UUID]*txnRecord
txnPushes map[uuid.UUID]*txnPush
}

type txnRecord struct {
mu syncutil.Mutex
cond sync.Cond
sig chan struct{}
txn *roachpb.Transaction // immutable, modify fields below
updatedStatus roachpb.TransactionStatus
updatedTimestamp hlc.Timestamp
}

type txnPush struct {
ctx context.Context
pusher, pushee uuid.UUID
}

func newCluster() *cluster {
return &cluster{
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},
Expand All @@ -359,13 +368,14 @@ func newCluster() *cluster {
requestsByName: make(map[string]concurrency.Request),
guardsByReqName: make(map[string]*concurrency.Guard),
txnRecords: make(map[uuid.UUID]*txnRecord),
txnPushes: make(map[uuid.UUID]*txnPush),
}
}

func (c *cluster) makeConfig() concurrency.Config {
st := clustersettings.MakeTestingClusterSettings()
concurrency.LockTableLivenessPushDelay.Override(&st.SV, 1*time.Millisecond)
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 1*time.Millisecond)
concurrency.LockTableLivenessPushDelay.Override(&st.SV, 0*time.Millisecond)
concurrency.LockTableDeadlockDetectionPushDelay.Override(&st.SV, 0*time.Millisecond)
return concurrency.Config{
NodeDesc: c.nodeDesc,
RangeDesc: c.rangeDesc,
Expand All @@ -384,11 +394,23 @@ func (c *cluster) PushTransaction(
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}
// Wait for the transaction to be pushed.
pusheeRecord.mu.Lock()
defer pusheeRecord.mu.Unlock()
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
}
defer c.unregisterPush(push)
}
for {
pusheeTxn := pusheeRecord.asTxnLocked()
// Is the pushee pushed?
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
var pushed bool
switch pushType {
case roachpb.PUSH_TIMESTAMP:
Expand All @@ -401,7 +423,24 @@ func (c *cluster) PushTransaction(
if pushed {
return pusheeTxn, nil
}
pusheeRecord.cond.Wait()
// Or the pusher aborted?
var pusherRecordSig chan struct{}
if pusherRecord != nil {
var pusherTxn roachpb.Transaction
pusherTxn, pusherRecordSig = pusherRecord.asTxn()
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED)
return roachpb.Transaction{}, roachpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return roachpb.Transaction{}, roachpb.NewError(ctx.Err())
}
}
}

Expand All @@ -424,8 +463,7 @@ func (c *cluster) registerTxn(name string, txn *roachpb.Transaction) {
c.mu.Lock()
defer c.mu.Unlock()
c.txnsByName[name] = txn
r := &txnRecord{txn: txn}
r.cond.L = &r.mu
r := &txnRecord{txn: txn, sig: make(chan struct{})}
c.txnRecords[txn.ID] = r
}

Expand All @@ -452,17 +490,89 @@ func (c *cluster) updateTxnRecord(
defer r.mu.Unlock()
r.updatedStatus = status
r.updatedTimestamp = ts
r.cond.Broadcast()
// Notify all listeners. This is a poor man's composable cond var.
close(r.sig)
r.sig = make(chan struct{})
return nil
}

func (r *txnRecord) asTxnLocked() roachpb.Transaction {
func (r *txnRecord) asTxn() (roachpb.Transaction, chan struct{}) {
r.mu.Lock()
defer r.mu.Unlock()
txn := *r.txn
if r.updatedStatus > txn.Status {
txn.Status = r.updatedStatus
}
txn.WriteTimestamp.Forward(r.updatedTimestamp)
return txn
return txn, r.sig
}

func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*txnPush, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.txnPushes[pusher]; ok {
return nil, errors.Errorf("txn %v already pushing", pusher)
}
p := &txnPush{
ctx: ctx,
pusher: pusher,
pushee: pushee,
}
c.txnPushes[pusher] = p
return p, nil
}

func (c *cluster) unregisterPush(push *txnPush) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.txnPushes, push.pusher)
}

// detectDeadlocks looks at all in-flight transaction pushes and determines
// whether any are blocked due to dependency cycles within transactions. If so,
// the method logs an event on the contexts of each of the members of the cycle.
func (c *cluster) detectDeadlocks() {
// This cycle detection algorithm it not particularly efficient - at worst
// it runs in O(n ^ 2) time. However, it's simple and effective at assigning
// each member of each cycle a unique view of the cycle that it's a part of.
// This works because we currently only allow a transaction to push a single
// other transaction at a time.
c.mu.Lock()
defer c.mu.Unlock()
var chain []uuid.UUID
seen := make(map[uuid.UUID]struct{})
for orig, origPush := range c.txnPushes {
pusher := orig
chain = append(chain[:0], orig)
for id := range seen {
delete(seen, id)
}
seen[pusher] = struct{}{}
for {
push, ok := c.txnPushes[pusher]
if !ok {
break
}
pusher = push.pushee
chain = append(chain, pusher)
if _, ok := seen[pusher]; ok {
// Cycle detected!
if pusher == orig {
// The cycle we were looking for (i.e. starting at orig).
var chainBuf strings.Builder
for i, id := range chain {
if i > 0 {
chainBuf.WriteString("->")
}
chainBuf.WriteString(id.Short())
}
log.Eventf(origPush.ctx, "dependency cycle detected %s", chainBuf.String())
}
break
}
seen[pusher] = struct{}{}
}
}
}

// reset clears all request state in the cluster. This avoids portions of tests
Expand Down Expand Up @@ -494,6 +604,17 @@ func (c *cluster) reset() error {
return nil
}

// resetNamespace resets the entire cluster namespace, clearing both request
// definitions and transaction definitions.
func (c *cluster) resetNamespace() {
c.mu.Lock()
defer c.mu.Unlock()
c.txnCounter = 0
c.txnsByName = make(map[string]*roachpb.Transaction)
c.requestsByName = make(map[string]concurrency.Request)
c.txnRecords = make(map[uuid.UUID]*txnRecord)
}

// collectSpans collects the declared spans for a set of requests.
// Its logic mirrors that in Replica.collectSpans.
func (c *cluster) collectSpans(
Expand All @@ -520,6 +641,12 @@ func (c *cluster) collectSpans(
return latchSpans, lockSpans
}

func (c *cluster) waitAndCollect(t *testing.T, m *monitor) string {
m.waitForAsyncGoroutinesToStall(t)
c.detectDeadlocks()
return m.collectRecordings()
}

// monitor tracks a set of running goroutines as they execute and captures
// tracing recordings from them. It is capable of watching its set of goroutines
// until they all mutually stall.
Expand Down Expand Up @@ -589,11 +716,6 @@ func (m *monitor) resetSeqNums() {
m.seq = 0
}

func (m *monitor) waitAndCollect(t *testing.T) string {
m.waitForAsyncGoroutinesToStall(t)
return m.collectRecordings()
}

func (m *monitor) collectRecordings() string {
// Collect trace recordings from each goroutine.
type logRecord struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (g *lockTableGuardImpl) ShouldWait() bool {
return g.mu.startWait
}

func (g *lockTableGuardImpl) NewStateChan() <-chan struct{} {
func (g *lockTableGuardImpl) NewStateChan() chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
return g.mu.signal
Expand Down
Loading

0 comments on commit 51178dc

Please sign in to comment.