Skip to content

Commit

Permalink
mqbc::IncoreCSL: Enable strong consistency
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu committed Nov 11, 2024
1 parent a84b584 commit 1d50340
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 553 deletions.
8 changes: 2 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->bufferFactory())),
Expand All @@ -606,9 +604,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->bufferFactory())),
Expand Down
35 changes: 1 addition & 34 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,6 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw(
}

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);
}

// PRIVATE MANIPULATORS
Expand Down Expand Up @@ -979,8 +978,6 @@ void ClusterStateManager::onPartitionPrimaryAssignment(
oldLeaseId);

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(
d_isFirstLeaderAdvisory);
}

d_afterPartitionPrimaryAssignmentCb(partitionId, primary, status);
Expand Down Expand Up @@ -1230,7 +1227,6 @@ void ClusterStateManager::sendClusterState(

// Self is leader and has published advisory above, so update it.
d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);

mqbc::ClusterUtil::sendClusterState(d_clusterData_p,
d_clusterStateLedger_mp.get(),
Expand Down Expand Up @@ -2085,35 +2081,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down Expand Up @@ -2254,7 +2221,7 @@ void ClusterStateManager::processLeaderAdvisory(

// Leader status and sequence number are updated unconditionally. It may
// have been updated by one of the routines called earlier in this method,
// but there is no harm is setting these values again.
// but there is no harm in setting these values again.

d_clusterData_p->electorInfo().setLeaderMessageSequence(leaderMsgSeq);
d_clusterData_p->electorInfo().setLeaderStatus(
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class ClusterStateManager : public mqbc::ClusterStateObserver,
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ class ClusterStateLedger : public ElectorInfoObserver {
virtual int apply(const bdlbb::Blob& record,
mqbnet::ClusterNode* source) = 0;

virtual void setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) = 0;

/// Set the commit callback to the specified `value`.
virtual void setCommitCb(const CommitCb& value) = 0;

Expand Down
8 changes: 0 additions & 8 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ struct ClusterStateLedgerTestImp
return markDone();
}

void
setIsFirstLeaderAdvisory(BSLS_ANNOTATION_UNUSED bool isFirstLeaderAdvisory)
BSLS_KEYWORD_OVERRIDE
{
markDone();
}

// ACCESSORS
void setCommitCb(BSLS_ANNOTATION_UNUSED const CommitCb& value)
BSLS_KEYWORD_OVERRIDE
Expand Down Expand Up @@ -205,7 +198,6 @@ static void test1_clusterStateLedger_protocol()
apply(bmqp_ctrlmsg::LeaderAdvisory()));
BSLS_PROTOCOLTEST_ASSERT(testObj,
apply(bmqp_ctrlmsg::ClusterMessage()));
BSLS_PROTOCOLTEST_ASSERT(testObj, setIsFirstLeaderAdvisory(true));
BSLS_PROTOCOLTEST_ASSERT(
testObj,
setCommitCb(mqbc::ClusterStateLedger::CommitCb()));
Expand Down
29 changes: 0 additions & 29 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1749,35 +1749,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ class ClusterStateManager
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
10 changes: 5 additions & 5 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState,
<< clusterData.identity().description()
<< ": Received QueueUpdateAdvisory for known queue [uri: "
<< uri << "] with a mismatched queueKey "
<< "[expected: " << queueKey << ", received: " << queueKey
<< "[expected: " << cit->second->key() << ", received: " << queueKey
<< "]: " << queueUpdate << BMQTSK_ALARMLOG_END;
return; // RETURN
}
Expand Down Expand Up @@ -1276,10 +1276,10 @@ void ClusterUtil::registerAppId(ClusterData* clusterData,

if (mqbnet::ElectorState::e_LEADER !=
clusterData->electorInfo().electorState()) {
BALL_LOG_ERROR << clusterData->identity().description()
<< ": Failed to register appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
BALL_LOG_WARN << clusterData->identity().description()
<< ": Not registering appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
return; // RETURN
}

Expand Down
Loading

0 comments on commit 1d50340

Please sign in to comment.