diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index d82c45e76243..9e02e5b93784 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -934,8 +934,12 @@ func (t *Transaction) Restart( // - the conflicting transaction's upgradePriority t.UpgradePriority(MakePriority(userPriority)) t.UpgradePriority(upgradePriority) - t.WriteTooOld = false + // Reset all epoch-scoped state. t.Sequence = 0 + t.WriteTooOld = false + t.OrigTimestampWasObserved = false + t.IntentSpans = nil + t.InFlightWrites = nil } // BumpEpoch increments the transaction's epoch, allowing for an in-place @@ -978,30 +982,68 @@ func (t *Transaction) Update(o *Transaction) { if t.ID == (uuid.UUID{}) { *t = *o return + } else if t.ID != o.ID { + log.Fatalf(context.Background(), "updating txn %v with different txn %v", t, o) + return } if len(t.Key) == 0 { t.Key = o.Key } - if !t.Status.IsFinalized() { - if (t.Epoch < o.Epoch) || (t.Epoch == o.Epoch && o.Status != PENDING) { - t.Status = o.Status - } - } - // If the epoch or refreshed timestamp move forward, overwrite - // WriteTooOld, otherwise the flags are cumulative. - if t.Epoch < o.Epoch || t.RefreshedTimestamp.Less(o.RefreshedTimestamp) { + // Update epoch-scoped state, depending on the two transactions' epochs. + if t.Epoch < o.Epoch { + // Replace all epoch-scoped state. + t.Epoch = o.Epoch + t.Status = o.Status t.WriteTooOld = o.WriteTooOld t.OrigTimestampWasObserved = o.OrigTimestampWasObserved - } else { - t.WriteTooOld = t.WriteTooOld || o.WriteTooOld - t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved - } + t.Sequence = o.Sequence + t.IntentSpans = o.IntentSpans + t.InFlightWrites = o.InFlightWrites + } else if t.Epoch == o.Epoch { + // Forward all epoch-scoped state. + switch t.Status { + case PENDING: + t.Status = o.Status + case STAGING: + if o.Status != PENDING { + t.Status = o.Status + } + case ABORTED: + if o.Status == COMMITTED { + log.Warningf(context.Background(), "updating ABORTED txn %v with COMMITTED txn %v", t, o) + } + case COMMITTED: + // Nothing to do. + } - if t.Epoch < o.Epoch { - t.Epoch = o.Epoch + // If the refreshed timestamp move forward, overwrite + // WriteTooOld, otherwise the flags are cumulative. + if t.RefreshedTimestamp.Less(o.RefreshedTimestamp) { + t.WriteTooOld = o.WriteTooOld + t.OrigTimestampWasObserved = o.OrigTimestampWasObserved + } else { + t.WriteTooOld = t.WriteTooOld || o.WriteTooOld + t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved + } + + if t.Sequence < o.Sequence { + t.Sequence = o.Sequence + } + if len(o.IntentSpans) > 0 { + t.IntentSpans = o.IntentSpans + } + if len(o.InFlightWrites) > 0 { + t.InFlightWrites = o.InFlightWrites + } + } else /* t.Epoch > o.Epoch */ { + // Ignore epoch-specific state from previous epoch. + if o.Status == COMMITTED { + log.Warningf(context.Background(), "updating txn %v with COMMITTED txn at earlier epoch %v", t, o) + } } + // Forward each of the transaction timestamps. t.Timestamp.Forward(o.Timestamp) t.LastHeartbeat.Forward(o.LastHeartbeat) t.OrigTimestamp.Forward(o.OrigTimestamp) @@ -1026,17 +1068,9 @@ func (t *Transaction) Update(o *Transaction) { for _, v := range o.ObservedTimestamps { t.UpdateObservedTimestamp(v.NodeID, v.Timestamp) } - t.UpgradePriority(o.Priority) - if t.Sequence < o.Sequence { - t.Sequence = o.Sequence - } - if len(o.IntentSpans) > 0 { - t.IntentSpans = o.IntentSpans - } - if len(o.InFlightWrites) > 0 { - t.InFlightWrites = o.InFlightWrites - } + // Ratchet the transaction priority. + t.UpgradePriority(o.Priority) } // UpgradePriority sets transaction priority to the maximum of current diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go index d9de20337186..7b91cdce61e9 100644 --- a/pkg/roachpb/data_test.go +++ b/pkg/roachpb/data_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -502,22 +503,74 @@ func TestTransactionUpdate(t *testing.T) { t.Fatal(err) } + // Updating an empty Transaction copies all fields. var txn2 Transaction txn2.Update(&txn) - if err := zerofields.NoZeroField(txn2); err != nil { - t.Fatal(err) - } + expTxn2 := txn + require.Equal(t, expTxn2, txn2) + // Updating a Transaction at an earlier epoch replaces all epoch-scoped fields. var txn3 Transaction - txn3.ID = uuid.MakeV4() + txn3.ID = txn.ID + txn3.Epoch = txn.Epoch - 1 + txn3.Status = STAGING txn3.Name = "carl" txn3.Priority = 123 txn3.Update(&txn) - if err := zerofields.NoZeroField(txn3); err != nil { - t.Fatal(err) - } + expTxn3 := txn + expTxn3.Name = "carl" + require.Equal(t, expTxn3, txn3) + + // Updating a Transaction at the same epoch forwards all epoch-scoped fields. + var txn4 Transaction + txn4.ID = txn.ID + txn4.Epoch = txn.Epoch + txn4.Status = STAGING + txn4.Sequence = txn.Sequence + 10 + txn4.Name = "carl" + txn4.Priority = 123 + txn4.Update(&txn) + + expTxn4 := txn + expTxn4.Name = "carl" + expTxn4.Sequence = txn.Sequence + 10 + require.Equal(t, expTxn4, txn4) + + // Updating a Transaction at a future epoch ignores all epoch-scoped fields. + var txn5 Transaction + txn5.ID = txn.ID + txn5.Epoch = txn.Epoch + 1 + txn5.Status = PENDING + txn5.Sequence = txn.Sequence - 10 + txn5.Name = "carl" + txn5.Priority = 123 + txn5.Update(&txn) + + expTxn5 := txn + expTxn5.Name = "carl" + expTxn5.Epoch = txn.Epoch + 1 + expTxn5.Status = PENDING + expTxn5.Sequence = txn.Sequence - 10 + expTxn5.IntentSpans = nil + expTxn5.InFlightWrites = nil + expTxn5.WriteTooOld = false + expTxn5.OrigTimestampWasObserved = false + require.Equal(t, expTxn5, txn5) + + // Updating a different transaction fatals. + var exited bool + log.SetExitFunc(true /* hideStack */, func(int) { exited = true }) + defer log.ResetExitFunc() + + var txn6 Transaction + txn6.ID = uuid.MakeV4() + origTxn6 := txn6 + txn6.Update(&txn) + + require.Equal(t, origTxn6, txn6) + require.True(t, exited) } func TestTransactionUpdateMinTimestamp(t *testing.T) { @@ -611,6 +664,22 @@ func TestTransactionClone(t *testing.T) { } } +func TestTransactionRestart(t *testing.T) { + txn := nonZeroTxn + txn.Restart(1, 1, makeTS(25, 1)) + + expTxn := nonZeroTxn + expTxn.Epoch++ + expTxn.Sequence = 0 + expTxn.Timestamp = makeTS(25, 1) + expTxn.OrigTimestamp = makeTS(25, 1) + expTxn.WriteTooOld = false + expTxn.OrigTimestampWasObserved = false + expTxn.IntentSpans = nil + expTxn.InFlightWrites = nil + require.Equal(t, expTxn, txn) +} + // TestTransactionRecordRoundtrips tests a few properties about Transaction // and TransactionRecord protos. Remember that the latter is wire compatible // with the former and contains a subset of its protos.