Skip to content

Commit

Permalink
roachpb: consider txn epochs in Transaction.Update
Browse files Browse the repository at this point in the history
Previously, Transaction.Update was pretty loose about which fields it
updated given a receiver and an argument with different epochs. This
relied on a number of assumptions about how these fields could change
between epochs. This commit tightens this up by making this all more
explicit.

It also adds in some log warnings around calls to Update that we don't
expect to see in practice.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 21, 2019
1 parent 9e76362 commit aed892a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 32 deletions.
84 changes: 59 additions & 25 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,12 @@ func (t *Transaction) Restart(
// - the conflicting transaction's upgradePriority
t.UpgradePriority(MakePriority(userPriority))
t.UpgradePriority(upgradePriority)
t.WriteTooOld = false
// Reset all epoch-scoped state.
t.Sequence = 0
t.WriteTooOld = false
t.OrigTimestampWasObserved = false
t.IntentSpans = nil
t.InFlightWrites = nil
}

// BumpEpoch increments the transaction's epoch, allowing for an in-place
Expand Down Expand Up @@ -978,30 +982,68 @@ func (t *Transaction) Update(o *Transaction) {
if t.ID == (uuid.UUID{}) {
*t = *o
return
} else if t.ID != o.ID {
log.Fatalf(context.Background(), "updating txn %v with different txn %v", t, o)
return
}
if len(t.Key) == 0 {
t.Key = o.Key
}
if !t.Status.IsFinalized() {
if (t.Epoch < o.Epoch) || (t.Epoch == o.Epoch && o.Status != PENDING) {
t.Status = o.Status
}
}

// If the epoch or refreshed timestamp move forward, overwrite
// WriteTooOld, otherwise the flags are cumulative.
if t.Epoch < o.Epoch || t.RefreshedTimestamp.Less(o.RefreshedTimestamp) {
// Update epoch-scoped state, depending on the two transactions' epochs.
if t.Epoch < o.Epoch {
// Replace all epoch-scoped state.
t.Epoch = o.Epoch
t.Status = o.Status
t.WriteTooOld = o.WriteTooOld
t.OrigTimestampWasObserved = o.OrigTimestampWasObserved
} else {
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved
}
t.Sequence = o.Sequence
t.IntentSpans = o.IntentSpans
t.InFlightWrites = o.InFlightWrites
} else if t.Epoch == o.Epoch {
// Forward all epoch-scoped state.
switch t.Status {
case PENDING:
t.Status = o.Status
case STAGING:
if o.Status != PENDING {
t.Status = o.Status
}
case ABORTED:
if o.Status == COMMITTED {
log.Warningf(context.Background(), "updating ABORTED txn %v with COMMITTED txn %v", t, o)
}
case COMMITTED:
// Nothing to do.
}

if t.Epoch < o.Epoch {
t.Epoch = o.Epoch
// If the refreshed timestamp move forward, overwrite
// WriteTooOld, otherwise the flags are cumulative.
if t.RefreshedTimestamp.Less(o.RefreshedTimestamp) {
t.WriteTooOld = o.WriteTooOld
t.OrigTimestampWasObserved = o.OrigTimestampWasObserved
} else {
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved
}

if t.Sequence < o.Sequence {
t.Sequence = o.Sequence
}
if len(o.IntentSpans) > 0 {
t.IntentSpans = o.IntentSpans
}
if len(o.InFlightWrites) > 0 {
t.InFlightWrites = o.InFlightWrites
}
} else /* t.Epoch > o.Epoch */ {
// Ignore epoch-specific state from previous epoch.
if o.Status == COMMITTED {
log.Warningf(context.Background(), "updating txn %v with COMMITTED txn at earlier epoch %v", t, o)
}
}

// Forward each of the transaction timestamps.
t.Timestamp.Forward(o.Timestamp)
t.LastHeartbeat.Forward(o.LastHeartbeat)
t.OrigTimestamp.Forward(o.OrigTimestamp)
Expand All @@ -1026,17 +1068,9 @@ func (t *Transaction) Update(o *Transaction) {
for _, v := range o.ObservedTimestamps {
t.UpdateObservedTimestamp(v.NodeID, v.Timestamp)
}
t.UpgradePriority(o.Priority)

if t.Sequence < o.Sequence {
t.Sequence = o.Sequence
}
if len(o.IntentSpans) > 0 {
t.IntentSpans = o.IntentSpans
}
if len(o.InFlightWrites) > 0 {
t.InFlightWrites = o.InFlightWrites
}
// Ratchet the transaction priority.
t.UpgradePriority(o.Priority)
}

// UpgradePriority sets transaction priority to the maximum of current
Expand Down
83 changes: 76 additions & 7 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -502,22 +503,74 @@ func TestTransactionUpdate(t *testing.T) {
t.Fatal(err)
}

// Updating an empty Transaction copies all fields.
var txn2 Transaction
txn2.Update(&txn)

if err := zerofields.NoZeroField(txn2); err != nil {
t.Fatal(err)
}
expTxn2 := txn
require.Equal(t, expTxn2, txn2)

// Updating a Transaction at an earlier epoch replaces all epoch-scoped fields.
var txn3 Transaction
txn3.ID = uuid.MakeV4()
txn3.ID = txn.ID
txn3.Epoch = txn.Epoch - 1
txn3.Status = STAGING
txn3.Name = "carl"
txn3.Priority = 123
txn3.Update(&txn)

if err := zerofields.NoZeroField(txn3); err != nil {
t.Fatal(err)
}
expTxn3 := txn
expTxn3.Name = "carl"
require.Equal(t, expTxn3, txn3)

// Updating a Transaction at the same epoch forwards all epoch-scoped fields.
var txn4 Transaction
txn4.ID = txn.ID
txn4.Epoch = txn.Epoch
txn4.Status = STAGING
txn4.Sequence = txn.Sequence + 10
txn4.Name = "carl"
txn4.Priority = 123
txn4.Update(&txn)

expTxn4 := txn
expTxn4.Name = "carl"
expTxn4.Sequence = txn.Sequence + 10
require.Equal(t, expTxn4, txn4)

// Updating a Transaction at a future epoch ignores all epoch-scoped fields.
var txn5 Transaction
txn5.ID = txn.ID
txn5.Epoch = txn.Epoch + 1
txn5.Status = PENDING
txn5.Sequence = txn.Sequence - 10
txn5.Name = "carl"
txn5.Priority = 123
txn5.Update(&txn)

expTxn5 := txn
expTxn5.Name = "carl"
expTxn5.Epoch = txn.Epoch + 1
expTxn5.Status = PENDING
expTxn5.Sequence = txn.Sequence - 10
expTxn5.IntentSpans = nil
expTxn5.InFlightWrites = nil
expTxn5.WriteTooOld = false
expTxn5.OrigTimestampWasObserved = false
require.Equal(t, expTxn5, txn5)

// Updating a different transaction fatals.
var exited bool
log.SetExitFunc(true /* hideStack */, func(int) { exited = true })
defer log.ResetExitFunc()

var txn6 Transaction
txn6.ID = uuid.MakeV4()
origTxn6 := txn6
txn6.Update(&txn)

require.Equal(t, origTxn6, txn6)
require.True(t, exited)
}

func TestTransactionUpdateMinTimestamp(t *testing.T) {
Expand Down Expand Up @@ -611,6 +664,22 @@ func TestTransactionClone(t *testing.T) {
}
}

func TestTransactionRestart(t *testing.T) {
txn := nonZeroTxn
txn.Restart(1, 1, makeTS(25, 1))

expTxn := nonZeroTxn
expTxn.Epoch++
expTxn.Sequence = 0
expTxn.Timestamp = makeTS(25, 1)
expTxn.OrigTimestamp = makeTS(25, 1)
expTxn.WriteTooOld = false
expTxn.OrigTimestampWasObserved = false
expTxn.IntentSpans = nil
expTxn.InFlightWrites = nil
require.Equal(t, expTxn, txn)
}

// TestTransactionRecordRoundtrips tests a few properties about Transaction
// and TransactionRecord protos. Remember that the latter is wire compatible
// with the former and contains a subset of its protos.
Expand Down

0 comments on commit aed892a

Please sign in to comment.