Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix[MQB]: Enable strong consistency CSL and fix leader activeness #495

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3549,10 +3549,13 @@ void Cluster::onClusterLeader(mqbnet::ClusterNode* node,
}

d_clusterOrchestrator.updateDatumStats();
d_clusterData.stats().setIsLeader(
d_clusterData.membership().selfNode() == node
? mqbstat::ClusterStats::LeaderStatus::e_LEADER
: mqbstat::ClusterStats::LeaderStatus::e_FOLLOWER);

if (status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
d_clusterData.stats().setIsLeader(
d_clusterData.membership().selfNode() == node
? mqbstat::ClusterStats::LeaderStatus::e_LEADER
: mqbstat::ClusterStats::LeaderStatus::e_FOLLOWER);
}
}

void Cluster::onLeaderPassiveThreshold()
Expand Down
8 changes: 2 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand All @@ -606,9 +604,7 @@ ClusterOrchestrator::ClusterOrchestrator(
mqbc::IncoreClusterStateLedger>(
d_allocators.get("ClusterStateLedger"),
clusterConfig,
mqbc::ClusterStateLedgerConsistency::e_EVENTUAL,
// TODO Add cluster config to determine Eventual vs
// Strong
mqbc::ClusterStateLedgerConsistency::e_STRONG,
d_clusterData_p,
clusterState,
&d_clusterData_p->blobSpPool())),
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ void ClusterProxy::onActiveNodeUp(mqbnet::ClusterNode* activeNode)
mqbnet::ElectorState::e_LEADER,
d_clusterData.electorInfo().electorTerm() + 1,
activeNode,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
// It is **prohibited** to set leader status directly from e_UNDEFINED
// to e_ACTIVE. Hence, we do: e_UNDEFINED -> e_PASSIVE -> e_ACTIVE
d_clusterData.electorInfo().setLeaderStatus(
mqbc::ElectorInfoLeaderStatus::e_ACTIVE);
}

Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3911,7 +3911,9 @@ void ClusterQueueHelper::onClusterLeader(
<< (node ? node->nodeDescription() : "** none **")
<< ", leader status: " << status;

restoreState(mqbs::DataStore::k_ANY_PARTITION_ID);
if (status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
restoreState(mqbs::DataStore::k_ANY_PARTITION_ID);
}

if (d_cluster_p->isRemote()) {
// non-proxy (replica) case is handled by
Expand Down
35 changes: 1 addition & 34 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,6 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw(
}

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);
}

// PRIVATE MANIPULATORS
Expand Down Expand Up @@ -979,8 +978,6 @@ void ClusterStateManager::onPartitionPrimaryAssignment(
oldLeaseId);

d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(
d_isFirstLeaderAdvisory);
}

d_afterPartitionPrimaryAssignmentCb(partitionId, primary, status);
Expand Down Expand Up @@ -1247,7 +1244,6 @@ void ClusterStateManager::sendClusterState(

// Self is leader and has published advisory above, so update it.
d_isFirstLeaderAdvisory = false;
d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory);

mqbc::ClusterUtil::sendClusterState(d_clusterData_p,
d_clusterStateLedger_mp.get(),
Expand Down Expand Up @@ -2018,35 +2014,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down Expand Up @@ -2187,7 +2154,7 @@ void ClusterStateManager::processLeaderAdvisory(

// Leader status and sequence number are updated unconditionally. It may
// have been updated by one of the routines called earlier in this method,
// but there is no harm is setting these values again.
// but there is no harm in setting these values again.

d_clusterData_p->electorInfo().setLeaderMessageSequence(leaderMsgSeq);
d_clusterData_p->electorInfo().setLeaderStatus(
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
13 changes: 13 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemonitor.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ struct TestHelper {
isActive ? mqbc::ElectorInfoLeaderStatus::e_ACTIVE
: mqbc::ElectorInfoLeaderStatus::e_PASSIVE;

if (d_cluster_mp->_clusterData()->electorInfo().leaderStatus() ==
mqbc::ElectorInfoLeaderStatus::e_UNDEFINED &&
status == mqbc::ElectorInfoLeaderStatus::e_ACTIVE) {
// It is **prohibited** to set leader status directly from
// e_UNDEFINED to e_ACTIVE, so we set to e_PASSIVE first then
// immediately to e_ACTIVE
d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
1,
node,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
}

d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
1,
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ class ClusterStateLedger : public ElectorInfoObserver {
virtual int apply(const bdlbb::Blob& record,
mqbnet::ClusterNode* source) = 0;

virtual void setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) = 0;

/// Set the commit callback to the specified `value`.
virtual void setCommitCb(const CommitCb& value) = 0;

Expand Down
8 changes: 0 additions & 8 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ struct ClusterStateLedgerTestImp
return markDone();
}

void
setIsFirstLeaderAdvisory(BSLS_ANNOTATION_UNUSED bool isFirstLeaderAdvisory)
BSLS_KEYWORD_OVERRIDE
{
markDone();
}

// ACCESSORS
void setCommitCb(BSLS_ANNOTATION_UNUSED const CommitCb& value)
BSLS_KEYWORD_OVERRIDE
Expand Down Expand Up @@ -205,7 +198,6 @@ static void test1_clusterStateLedger_protocol()
apply(bmqp_ctrlmsg::LeaderAdvisory()));
BSLS_PROTOCOLTEST_ASSERT(testObj,
apply(bmqp_ctrlmsg::ClusterMessage()));
BSLS_PROTOCOLTEST_ASSERT(testObj, setIsFirstLeaderAdvisory(true));
BSLS_PROTOCOLTEST_ASSERT(
testObj,
setCommitCb(mqbc::ClusterStateLedger::CommitCb()));
Expand Down
55 changes: 17 additions & 38 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1746,35 +1746,6 @@ void ClusterStateManager::processClusterStateEvent(
bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent());

// NOTE: Any validation of the event would go here.
if (source != d_clusterData_p->electorInfo().leaderNode()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": ignoring cluster state event from cluster node "
<< source->nodeDescription() << " as this node is not "
<< "the current perceived leader. Current leader: ["
<< d_clusterData_p->electorInfo().leaderNodeId() << ": "
<< (d_clusterData_p->electorInfo().leaderNode()
? d_clusterData_p->electorInfo()
.leaderNode()
->nodeDescription()
: "* UNKNOWN *")
<< "]";
return; // RETURN
}
// 'source' is the perceived leader

// TBD: Suppress the following check for now, which will help some
// integration tests to pass. At this point, it is not clear if it is safe
// to process cluster state events while self is stopping.
//
// if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING
// == d_clusterData_p->membership().selfNodeStatus()) {
// return; // RETURN
// }

// TODO: Validate the incoming advisory and potentially buffer it for later
// if the node is currently starting.

const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source);
if (rc != 0) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down Expand Up @@ -1878,14 +1849,16 @@ void ClusterStateManager::onNodeStopped()

// MANIPULATORS
// (virtual: mqbc::ElectorInfoObserver)
void ClusterStateManager::onClusterLeader(
mqbnet::ClusterNode* node,
BSLS_ANNOTATION_UNUSED ElectorInfoLeaderStatus::Enum status)
void ClusterStateManager::onClusterLeader(mqbnet::ClusterNode* node,
ElectorInfoLeaderStatus::Enum status)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(
(node && (ElectorInfoLeaderStatus::e_UNDEFINED != status)) ||
(!node && (ElectorInfoLeaderStatus::e_UNDEFINED == status)));

if (!node) {
// Leader lost
Expand All @@ -1899,21 +1872,27 @@ void ClusterStateManager::onClusterLeader(
return; // RETURN
}

// We can only reach this code path if leader `node` is not null. Here, we
// only care about transitioning from no leader to having a leader. We
// don't care about leader status changing between e_PASSIVE and e_ACTIVE.
if (d_clusterFSM.state() != ClusterFSM::State::e_UNKNOWN) {
return; // RETURN
}

// IMPORTANT: This is the main entry point to start running the Cluster FSM
InputMessages inputMessages(1, d_allocator_p);
inputMessages.at(0).setSource(node); // leader node

if (d_clusterData_p->membership().selfNode()->nodeId() == node->nodeId()) {
BALL_LOG_INFO << d_clusterData_p->identity().description() << ": "
<< (d_clusterFSM.isSelfLeader() ? "Re-" : "")
<< "Transitioning to leader in the Cluster FSM.";
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Transitioning to leader in the Cluster FSM.";

applyFSMEvent(ClusterFSM::Event::e_SLCT_LDR,
ClusterFSMEventMetadata(inputMessages));
}
else {
BALL_LOG_INFO << d_clusterData_p->identity().description() << ": "
<< (d_clusterFSM.isSelfFollower() ? "Re-" : "")
<< "Transitioning to follower in the Cluster FSM.";
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Transitioning to follower in the Cluster FSM.";

applyFSMEvent(ClusterFSM::Event::e_SLCT_FOL,
ClusterFSMEventMetadata(inputMessages));
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Process the specified `event`.
/// Process the specified cluster state `event`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
Expand Down
9 changes: 6 additions & 3 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ typedef mqbc::ClusterStateManager::ClusterStateLedgerMp ClusterStateLedgerMp;
struct Tester {
public:
// PUBLIC DATA
bool d_isLeader;
bdlbb::PooledBlobBufferFactory d_bufferFactory;
bmqu::TempDirectory d_tempDir;
bslma::ManagedPtr<mqbmock::Cluster> d_cluster_mp;
Expand All @@ -96,7 +97,8 @@ struct Tester {
public:
// CREATORS
Tester(bool isLeader)
: d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator())
: d_isLeader(isLeader)
, d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator())
, d_tempDir(bmqtst::TestHelperUtil::allocator())
, d_cluster_mp(0)
, d_clusterStateLedger_p(0)
Expand Down Expand Up @@ -140,7 +142,7 @@ struct Tester {
mqbmock::Cluster(&d_bufferFactory,
bmqtst::TestHelperUtil::allocator(),
true, // isClusterMember
isLeader,
d_isLeader,
true, // isCSLMode
true, // isFSMWorkflow
clusterNodeDefs,
Expand Down Expand Up @@ -211,7 +213,8 @@ struct Tester {
mqbmock::Cluster::k_LEADER_NODE_ID);
BSLS_ASSERT_OPT(leaderNode != 0);
d_cluster_mp->_clusterData()->electorInfo().setElectorInfo(
mqbnet::ElectorState::e_LEADER,
d_isLeader ? mqbnet::ElectorState::e_LEADER
: mqbnet::ElectorState::e_FOLLOWER,
electorTerm,
leaderNode,
mqbc::ElectorInfoLeaderStatus::e_PASSIVE);
Expand Down
13 changes: 7 additions & 6 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState,
<< clusterData.identity().description()
<< ": Received QueueUpdateAdvisory for known queue [uri: "
<< uri << "] with a mismatched queueKey "
<< "[expected: " << queueKey << ", received: " << queueKey
<< "]: " << queueUpdate << BMQTSK_ALARMLOG_END;
<< "[expected: " << cit->second->key()
<< ", received: " << queueKey << "]: " << queueUpdate
<< BMQTSK_ALARMLOG_END;
return; // RETURN
}
}
Expand Down Expand Up @@ -1213,10 +1214,10 @@ void ClusterUtil::registerAppId(ClusterData* clusterData,

if (mqbnet::ElectorState::e_LEADER !=
clusterData->electorInfo().electorState()) {
BALL_LOG_ERROR << clusterData->identity().description()
<< ": Failed to register appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
BALL_LOG_WARN << clusterData->identity().description()
<< ": Not registering appId '" << appId
<< "' for domain '" << domain->name()
<< "'. Self is not leader.";
return; // RETURN
}

Expand Down
Loading
Loading