Skip to content

Commit

Permalink
wait for unconfirmed before buffering confirms
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Jun 25, 2024
1 parent 1387dfa commit 4af02cf
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
13 changes: 11 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5522,10 +5522,19 @@ void ClusterQueueHelper::deconfigureQueue(
QueueLiveState& queueLiveState = queueContextSp->d_liveQInfo;
mqbi::Queue* queue = queueLiveState.d_queue_sp.get();

// Keep `0` as `genCount` to indicate StopRequest processing to the
// queue.
// Do not send `0` as `genCount` because the queue will start CONFIRMs
// buffering too early (upon 'notifyQueue'). Queue needs to allow CONFIRMs
// until 'checkUnconfirmedQueueDispatched' decision.
bsls::Types::Uint64 genCount = 0;

if (!d_cluster_p->isRemote()) {
genCount = d_clusterState_p->partition(queueContextSp->partitionId())
.primaryLeaseId();
}
else {
genCount = d_clusterData_p->electorInfo().electorTerm();
}

for (StreamsMap::iterator iter = queueLiveState.d_subQueueIds.begin();
iter != queueLiveState.d_subQueueIds.end();
++iter) {
Expand Down
14 changes: 12 additions & 2 deletions src/integration-tests/test_puts_retransmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ def test_shutdown_primary_keep_replica(self, multi_node: Cluster):
# If shutting down primary, the replica needs to wait for new primary.
self.active_node.wait_status(wait_leader=True, wait_ready=False)

self.inspect_results(allow_duplicates=False)
# Do allow duplicates for the scenario when a CONFIRM had passed Proxy
# but did not reach the replication. New Primary then redelivers and
# the Proxy cannot detect the duplicate because it had removed the GUID
# upon the first CONFIRM

self.inspect_results(allow_duplicates=True)

def test_shutdown_replica(self, multi_node: Cluster):
self.setup_cluster_fanout(multi_node)
Expand All @@ -521,7 +526,12 @@ def test_shutdown_replica(self, multi_node: Cluster):
# Because the quorum is 3, cluster is still healthy after shutting down
# replica.

self.inspect_results(allow_duplicates=False)
# Do allow duplicates for the scenario when a CONFIRM had passed Proxy
# but did not reach the replication. New Primary then redelivers and
# the Proxy cannot detect the duplicate because it had removed the GUID
# upon the first CONFIRM

self.inspect_results(allow_duplicates=True)

def test_kill_primary_convert_replica(self, multi_node: Cluster):
self.setup_cluster_fanout(multi_node)
Expand Down

0 comments on commit 4af02cf

Please sign in to comment.