diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index 2b465498a..163c93bb9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -587,9 +587,7 @@ ClusterOrchestrator::ClusterOrchestrator( mqbc::IncoreClusterStateLedger>( d_allocators.get("ClusterStateLedger"), clusterConfig, - mqbc::ClusterStateLedgerConsistency::e_EVENTUAL, - // TODO Add cluster config to determine Eventual vs - // Strong + mqbc::ClusterStateLedgerConsistency::e_STRONG, d_clusterData_p, clusterState, &d_clusterData_p->blobSpPool())), @@ -606,9 +604,7 @@ ClusterOrchestrator::ClusterOrchestrator( mqbc::IncoreClusterStateLedger>( d_allocators.get("ClusterStateLedger"), clusterConfig, - mqbc::ClusterStateLedgerConsistency::e_EVENTUAL, - // TODO Add cluster config to determine Eventual vs - // Strong + mqbc::ClusterStateLedgerConsistency::e_STRONG, d_clusterData_p, clusterState, &d_clusterData_p->blobSpPool())), diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 16fdfcc0b..8bfd0380d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -943,7 +943,6 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw( } d_isFirstLeaderAdvisory = false; - d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory); } // PRIVATE MANIPULATORS @@ -979,8 +978,6 @@ void ClusterStateManager::onPartitionPrimaryAssignment( oldLeaseId); d_isFirstLeaderAdvisory = false; - d_clusterStateLedger_mp->setIsFirstLeaderAdvisory( - d_isFirstLeaderAdvisory); } d_afterPartitionPrimaryAssignmentCb(partitionId, primary, status); @@ -1247,7 +1244,6 @@ void ClusterStateManager::sendClusterState( // Self is leader and has published advisory above, so update it. d_isFirstLeaderAdvisory = false; - d_clusterStateLedger_mp->setIsFirstLeaderAdvisory(d_isFirstLeaderAdvisory); mqbc::ClusterUtil::sendClusterState(d_clusterData_p, d_clusterStateLedger_mp.get(), @@ -2018,35 +2014,6 @@ void ClusterStateManager::processClusterStateEvent( bmqp::Event rawEvent(event.blob().get(), d_allocator_p); BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent()); - // NOTE: Any validation of the event would go here. - if (source != d_clusterData_p->electorInfo().leaderNode()) { - BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": ignoring cluster state event from cluster node " - << source->nodeDescription() << " as this node is not " - << "the current perceived leader. Current leader: [" - << d_clusterData_p->electorInfo().leaderNodeId() << ": " - << (d_clusterData_p->electorInfo().leaderNode() - ? d_clusterData_p->electorInfo() - .leaderNode() - ->nodeDescription() - : "* UNKNOWN *") - << "]"; - return; // RETURN - } - // 'source' is the perceived leader - - // TBD: Suppress the following check for now, which will help some - // integration tests to pass. At this point, it is not clear if it is safe - // to process cluster state events while self is stopping. - // - // if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING - // == d_clusterData_p->membership().selfNodeStatus()) { - // return; // RETURN - // } - - // TODO: Validate the incoming advisory and potentially buffer it for later - // if the node is currently starting. - const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source); if (rc != 0) { BALL_LOG_ERROR << d_clusterData_p->identity().description() @@ -2187,7 +2154,7 @@ void ClusterStateManager::processLeaderAdvisory( // Leader status and sequence number are updated unconditionally. It may // have been updated by one of the routines called earlier in this method, - // but there is no harm is setting these values again. + // but there is no harm in setting these values again. d_clusterData_p->electorInfo().setLeaderMessageSequence(leaderMsgSeq); d_clusterData_p->electorInfo().setLeaderStatus( diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index c745ef8b6..c926bb64a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -463,7 +463,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; - /// Process the specified `event`. + /// Process the specified cluster state `event`. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. diff --git a/src/groups/mqb/mqbc/mqbc_clusterstateledger.h b/src/groups/mqb/mqbc/mqbc_clusterstateledger.h index 827001b53..359947d8c 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstateledger.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstateledger.h @@ -306,8 +306,6 @@ class ClusterStateLedger : public ElectorInfoObserver { virtual int apply(const bdlbb::Blob& record, mqbnet::ClusterNode* source) = 0; - virtual void setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) = 0; - /// Set the commit callback to the specified `value`. virtual void setCommitCb(const CommitCb& value) = 0; diff --git a/src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp b/src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp index b1fc64838..6fe71790e 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstateledger.t.cpp @@ -107,13 +107,6 @@ struct ClusterStateLedgerTestImp return markDone(); } - void - setIsFirstLeaderAdvisory(BSLS_ANNOTATION_UNUSED bool isFirstLeaderAdvisory) - BSLS_KEYWORD_OVERRIDE - { - markDone(); - } - // ACCESSORS void setCommitCb(BSLS_ANNOTATION_UNUSED const CommitCb& value) BSLS_KEYWORD_OVERRIDE @@ -205,7 +198,6 @@ static void test1_clusterStateLedger_protocol() apply(bmqp_ctrlmsg::LeaderAdvisory())); BSLS_PROTOCOLTEST_ASSERT(testObj, apply(bmqp_ctrlmsg::ClusterMessage())); - BSLS_PROTOCOLTEST_ASSERT(testObj, setIsFirstLeaderAdvisory(true)); BSLS_PROTOCOLTEST_ASSERT( testObj, setCommitCb(mqbc::ClusterStateLedger::CommitCb())); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 43a7da245..1ba151f73 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -1746,35 +1746,6 @@ void ClusterStateManager::processClusterStateEvent( bmqp::Event rawEvent(event.blob().get(), d_allocator_p); BSLS_ASSERT_SAFE(rawEvent.isClusterStateEvent()); - // NOTE: Any validation of the event would go here. - if (source != d_clusterData_p->electorInfo().leaderNode()) { - BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": ignoring cluster state event from cluster node " - << source->nodeDescription() << " as this node is not " - << "the current perceived leader. Current leader: [" - << d_clusterData_p->electorInfo().leaderNodeId() << ": " - << (d_clusterData_p->electorInfo().leaderNode() - ? d_clusterData_p->electorInfo() - .leaderNode() - ->nodeDescription() - : "* UNKNOWN *") - << "]"; - return; // RETURN - } - // 'source' is the perceived leader - - // TBD: Suppress the following check for now, which will help some - // integration tests to pass. At this point, it is not clear if it is safe - // to process cluster state events while self is stopping. - // - // if ( bmqp_ctrlmsg::NodeStatus::E_STOPPING - // == d_clusterData_p->membership().selfNodeStatus()) { - // return; // RETURN - // } - - // TODO: Validate the incoming advisory and potentially buffer it for later - // if the node is currently starting. - const int rc = d_clusterStateLedger_mp->apply(*rawEvent.blob(), source); if (rc != 0) { BALL_LOG_ERROR << d_clusterData_p->identity().description() diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index 1977dd4fb..57228c840 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -558,7 +558,7 @@ class ClusterStateManager BSLS_KEYWORD_FINAL const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; - /// Process the specified `event`. + /// Process the specified cluster state `event`. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 02a37a3c4..1da9b32cc 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -200,7 +200,7 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState, << clusterData.identity().description() << ": Received QueueUpdateAdvisory for known queue [uri: " << uri << "] with a mismatched queueKey " - << "[expected: " << queueKey << ", received: " << queueKey + << "[expected: " << cit->second->key() << ", received: " << queueKey << "]: " << queueUpdate << BMQTSK_ALARMLOG_END; return; // RETURN } @@ -1213,10 +1213,10 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, if (mqbnet::ElectorState::e_LEADER != clusterData->electorInfo().electorState()) { - BALL_LOG_ERROR << clusterData->identity().description() - << ": Failed to register appId '" << appId - << "' for domain '" << domain->name() - << "'. Self is not leader."; + BALL_LOG_WARN << clusterData->identity().description() + << ": Not registering appId '" << appId + << "' for domain '" << domain->name() + << "'. Self is not leader."; return; // RETURN } diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index 7d32ae985..cfc8f716c 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -249,13 +249,6 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, return rc_SUCCESS; // RETURN } - if (bmqp_ctrlmsg::NodeStatus::E_AVAILABLE != - d_clusterData_p->membership().selfNodeStatus()) { - // If self is not available, then the cluster state snapshot is likely - // incomplete, so there is no point in writing it. - return rc_SUCCESS; // RETURN - } - // Determine the correct LSN to put in the cluster state snapshot, which is // the maximum of current LSN from ElectorInfo and the largest LSN from // uncommitted advisories. @@ -780,10 +773,18 @@ int IncoreClusterStateLedger::applyRecordInternal( void IncoreClusterStateLedger::cancelUncommittedAdvisories() { + AdvisoriesMapIter iter = d_uncommittedAdvisories.begin(); + if (iter != d_uncommittedAdvisories.end()) { + const bmqp_ctrlmsg::LeaderMessageSequence& seqNum = iter->first; + BALL_LOG_INFO << description() << "Canceling " + << d_uncommittedAdvisories.size() + << " uncommitted advisories in the incore CSL starting " + "at seqNum = " + << seqNum; + } + if (isSelfLeader()) { - for (AdvisoriesMapIter iter = d_uncommittedAdvisories.begin(); - iter != d_uncommittedAdvisories.end(); - ++iter) { + for (; iter != d_uncommittedAdvisories.end(); ++iter) { const ClusterMessageInfo& info = iter->second; bmqp_ctrlmsg::ControlMessage controlMessage; controlMessage.choice().makeClusterMessage(info.d_clusterMessage); @@ -798,8 +799,7 @@ void IncoreClusterStateLedger::cancelUncommittedAdvisories() } int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, - mqbnet::ClusterNode* source, - bool delayed) + mqbnet::ClusterNode* source) { // executed by the *CLUSTER DISPATCHER* thread @@ -810,28 +810,24 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, BSLS_ASSERT_SAFE(source); BSLS_ASSERT_SAFE(source->nodeId() != d_clusterData_p->membership().selfNode()->nodeId()); - BSLS_ASSERT_SAFE( - isSelfLeader() || - source->nodeId() == - d_clusterData_p->electorInfo().leaderNode()->nodeId()); enum RcEnum { // Value for the various RC error categories rc_SUCCESS = 0 // Success , - rc_MISSING_HEADER = -1 // Event or record header is missing + rc_INVALID_SOURCE = -1 // Source is not leader , - rc_INVALID_HEADER = -2 // Event or record header is invalid + rc_MISSING_HEADER = -2 // Event or record header is missing , - rc_INVALID_SOURCE = -3 // Source is not leader + rc_INVALID_HEADER = -3 // Event or record header is invalid , - rc_RECORD_STALE = -4 // Record is stale + rc_UNEXPECTED_RECORD_TYPE = -4 // Unexpected record type , rc_RECORD_ALREADY_APPLIED = -5 // Record was already applied , - rc_LOAD_MESSAGE_FAILURE = -6 // Fail to load cluster message + rc_RECORD_STALE = -6 // Record is stale , - rc_NODE_STOPPING = -7 // Self node is stopping + rc_LOAD_MESSAGE_FAILURE = -7 // Fail to load cluster message , rc_ADVISORY_INVALID = -8 // Advisory is invalid, as determined // by type-specific validation @@ -842,7 +838,24 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, BALL_LOG_INFO << "Applying cluster state record event from node '" << source->nodeDescription() << "'"; - // Sanity check on record + if (!isSelfLeader() && + d_clusterData_p->electorInfo().leaderNodeId() != source->nodeId()) { + BALL_LOG_ERROR << description() + << ": Ignoring cluster state record event from '" + << source->nodeDescription() + << "'. Reason: Source node is not the leader" + << " [source: " << source->nodeDescription() + << ", leader: " + << (d_clusterData_p->electorInfo().leaderNode() + ? d_clusterData_p->electorInfo() + .leaderNode() + ->nodeDescription() + : "** none **") + << "]."; + return rc_INVALID_SOURCE; // RETURN + } + + // Sanity check event header bmqu::BlobObjectProxy eventHeader( &event, -bmqp::EventHeader::k_MIN_HEADER_SIZE, @@ -864,6 +877,7 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, BSLS_ASSERT_SAFE(eventHeader->length() == event.length()); BSLS_ASSERT_SAFE(eventHeader->type() == bmqp::EventType::e_CLUSTER_STATE); + // Sanity check record header bmqu::BlobPosition recordHeaderPosition; int rc = bmqu::BlobUtil::findOffsetSafe(&recordHeaderPosition, event, @@ -897,60 +911,71 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, BSLS_ASSERT_SAFE(ClusterStateLedgerUtil::recordSize(*recordHeader) == (event.length() - eventHeaderSize)); + // Validate sequence number and record type bmqp_ctrlmsg::LeaderMessageSequence seqNum; seqNum.electorTerm() = recordHeader->electorTerm(); seqNum.sequenceNumber() = recordHeader->sequenceNumber(); - if (recordHeader->recordType() != ClusterStateRecordType::e_ACK && - d_uncommittedAdvisories.find(seqNum) != + + if (isSelfLeader()) { + // Leader should only receive Acks. + if (recordHeader->recordType() != ClusterStateRecordType::e_ACK) { + BALL_LOG_ERROR + << description() + << ": Ignoring cluster state record event with seqNum = " + << seqNum << " from '" << source->nodeDescription() + << "'. Reason: Leader should only receive Acks, " + "but we received record of type: " + << recordHeader->recordType(); + return rc_UNEXPECTED_RECORD_TYPE; // RETURN + } + + BSLS_ASSERT_SAFE( + seqNum <= d_clusterData_p->electorInfo().leaderMessageSequence()); + + if (d_uncommittedAdvisories.find(seqNum) == d_uncommittedAdvisories.end()) { - BALL_LOG_ERROR << description() << ": Failed to apply record from '" - << source->nodeDescription() - << "'. Reason: record was already applied. "; - return rc_RECORD_ALREADY_APPLIED; // RETURN + BALL_LOG_INFO + << description() + << ": Ignoring cluster state record ack with seqNum = " + << seqNum << " from '" << source->nodeDescription() + << "', as quorum of acks has already been reached."; + return rc_SUCCESS; // RETURN + } } + else { + // Follower should only receive advisories and commits. + if (recordHeader->recordType() != ClusterStateRecordType::e_SNAPSHOT && + recordHeader->recordType() != ClusterStateRecordType::e_UPDATE && + recordHeader->recordType() != ClusterStateRecordType::e_COMMIT) { + BALL_LOG_ERROR + << description() + << ": Ignoring cluster state record event with seqNum = " + << seqNum << " from '" << source->nodeDescription() + << "'. Reason: Follower should only receive advisories and " + "commits, but we received record of type: " + << recordHeader->recordType(); + return rc_UNEXPECTED_RECORD_TYPE; // RETURN + } - // Validate advisory and source - if (!delayed) { - // Source (leader) and leader sequence number should not be validated - // for delayed (aka buffered) advisories. Those attributes were - // validated when buffered advisories were received. - - if (d_clusterData_p->electorInfo().leaderNode() != source) { - // Different leader. Ignore message. - BALL_LOG_ERROR << description() << ": Ignoring event from '" - << source->nodeDescription() - << "'. Reason: Source node is not the leader" - << " [source: " << source->nodeDescription() - << ", leader: " - << (d_clusterData_p->electorInfo().leaderNode() - ? d_clusterData_p->electorInfo() - .leaderNode() - ->nodeDescription() - : "** none **") - << "]."; - return rc_INVALID_SOURCE; // RETURN + if (d_uncommittedAdvisories.find(seqNum) != + d_uncommittedAdvisories.end()) { + BALL_LOG_ERROR << description() + << ": Failed to apply record with seqNum = " + << seqNum << " from '" << source->nodeDescription() + << "'. Reason: record was already applied. "; + return rc_RECORD_ALREADY_APPLIED; // RETURN } if (seqNum < d_clusterData_p->electorInfo().leaderMessageSequence()) { BALL_LOG_ERROR - << description() << ": Failed to apply record from '" - << source->nodeDescription() << "'. Reason: record is stale " - << "[sequenceNumber: " << seqNum << ", leaderMessageSeq: " + << description() + << ": Failed to apply record with seqNum = " << seqNum + << " from '" << source->nodeDescription() + << "'. Reason: record is stale; self leaderMessageSeq = " << d_clusterData_p->electorInfo().leaderMessageSequence() << "]."; return rc_RECORD_STALE; // RETURN } - - // Leader status and sequence number are updated unconditionally. It - // may have been updated by one of the callers of this routine, but - // there is no harm is setting these values again. - if (d_clusterData_p->clusterConfig() - .clusterAttributes() - .isCSLModeEnabled()) { - d_clusterData_p->electorInfo().setLeaderMessageSequence(seqNum); - d_clusterData_p->electorInfo().setLeaderStatus( - mqbc::ElectorInfoLeaderStatus::e_ACTIVE); - } } // Load cluster message from record @@ -963,47 +988,38 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, return rc * 10 + rc_LOAD_MESSAGE_FAILURE; // RETURN } - // CONDITIONS: If advisory is delayed, it must be a queue advisory (only - // those are buffered) or a commit of queue advisory - BSLS_ASSERT_SAFE(!delayed || - message.choice().isQueueAssignmentAdvisoryValue() || + BSLS_ASSERT_SAFE(message.choice().isQueueAssignmentAdvisoryValue() || message.choice().isQueueUnassignedAdvisoryValue() || - message.choice().isQueueUnAssignmentAdvisoryValue() || + message.choice().isQueueUpdateAdvisoryValue() || + message.choice().isPartitionPrimaryAdvisoryValue() || + message.choice().isLeaderAdvisoryValue() || + message.choice().isLeaderAdvisoryAckValue() || message.choice().isLeaderAdvisoryCommitValue()); - // More validations (type-specific): A queue advisory - if (message.choice().isQueueAssignmentAdvisoryValue() || - message.choice().isQueueUnassignedAdvisoryValue() || - message.choice().isQueueUnAssignmentAdvisoryValue()) { - // Queue advisory - - // TBD: Suppress the following check for now, which will help some - // integration tests to pass. At this point, it is not clear if it is - // safe to process queue advisory messages self is stopping. - // - // if ( d_clusterData_p->membership().selfNodeStatus() - // == bmqp_ctrlmsg::NodeStatus::E_STOPPING) { - // // No need to process the advisory since self is stopping. - // BALL_LOG_INFO << description() - // << "Ignoring event from '" - // << source->nodeDescription() - // << "'. Reason: Self is stopping."; - // return rc_NODE_STOPPING; // RETURN - // } - } - else if (message.choice().isPartitionPrimaryAdvisoryValue()) { + BALL_LOG_INFO << description() << ": Applying cluster message with type = " + << recordHeader->recordType() << " and seqNum = " << seqNum + << " from '" << source->nodeDescription() + << "': " << message; + + // Validate partition-primary mappings, if any + if (message.choice().isPartitionPrimaryAdvisoryValue() || + message.choice().isLeaderAdvisoryValue()) { const bsl::vector& partitions = - message.choice().partitionPrimaryAdvisory().partitions(); + message.choice().isPartitionPrimaryAdvisoryValue() + ? message.choice().partitionPrimaryAdvisory().partitions() + : message.choice().leaderAdvisory().partitions(); + // Validate the notification. If *any* part of notification is found // invalid, we reject the *entire* notification. for (int i = 0; i < static_cast(partitions.size()); ++i) { const bmqp_ctrlmsg::PartitionPrimaryInfo& info = partitions[i]; - if (info.partitionId() >= - static_cast(d_clusterState_p->partitions().size())) { + if ((info.partitionId() < 0) || + (info.partitionId() >= + static_cast(d_clusterState_p->partitions().size()))) { BMQTSK_ALARMLOG_ALARM("CLUSTER") << d_clusterData_p->identity().description() - << ": Invalid partitionId: " << info - << " specified in partition-primary advisory. " + << ": Invalid Partition Id: " << info + << " specified in advisory: " << message << ". " << "Ignoring this *ENTIRE* advisory message." << BMQTSK_ALARMLOG_END; return rc_ADVISORY_INVALID; // RETURN @@ -1016,33 +1032,12 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, BMQTSK_ALARMLOG_ALARM("CLUSTER") << d_clusterData_p->identity().description() << ": Invalid primaryNodeId: " << info - << " specified in partition-primary advisory." + << " specified in advisory: " << message << ". " << " Ignoring this *ENTIRE* advisory." << BMQTSK_ALARMLOG_END; return rc_ADVISORY_INVALID; // RETURN } - if (proposedPrimaryNode == - d_clusterData_p->membership().selfNode() && - bmqp_ctrlmsg::NodeStatus::E_STARTING == - d_clusterData_p->membership().selfNodeStatus()) { - // Self is the proposed primary but self is STARTING. This is - // a bug because if this node perceives self as STARTING, any - // other node (including the leader) *cannot* perceive this - // node as AVAILABLE. This node might be STOPPING, but that's - // ok since its possible that it transitioned from AVAILABLE to - // other state immediately after leader broadcast the advisory. - // Lower layers will take care of that scenario. - - BMQTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << ": proposed primary specified in partition/primary : " - << info << " is self but self is STARTING. " - << "Ignoring this *ENTIRE* advisory." - << BMQTSK_ALARMLOG_END; - return rc_ADVISORY_INVALID; // RETURN - } - const ClusterStatePartitionInfo& pi = d_clusterState_p->partition( info.partitionId()); if (d_clusterData_p->membership().selfNode() != @@ -1058,7 +1053,7 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, // currently not supported, so self node will exit. Note that // this scnenario can be witnessed in a bad network where some // nodes cannot see other nodes intermittently. See - // 'onLeaderSyncDataQueryResponseDispatched' for similar check. + // 'onLeaderSyncDataQueryResponse' for similar check. BMQTSK_ALARMLOG_ALARM("CLUSTER") << d_clusterData_p->identity().description() << ": Partition [" << info.partitionId() @@ -1074,93 +1069,61 @@ int IncoreClusterStateLedger::applyImpl(const bdlbb::Blob& event, // EXIT } - if (d_isFirstLeaderAdvisory) { - // If this node just started and recovered from a peer, it will - // already be aware of the *current* primaryLeaseId for each - // partition, but not its primary node (this is because leaseId - // is retrieved from the storage, but not the primary nodeId, - // because we don't persist the primary nodeId). When this - // node becomes AVAILABLE, the leader simply sends the - // partition/primary advisory without bumping up the leaseId. - // 'd_isFirstLeaderAdvisory' flag takes care of this scenario. - - // Note that 'pi.primaryNode()' may not be zero because - // currently, we update the cluster state even upon receiving - // primary status advisory from a primary node, so self node - // may or may not have received a primary-status advisory from - // the primary node. + if ((pi.primaryNode() == proposedPrimaryNode) || + (pi.primaryNode() == 0)) { + // Proposed primary node is same as self's primary node, or + // self views this partition as orphan. In either case, + // leaseId cannot be smaller. It can, however, be equal. + // The case in which 'pi.primaryNode() == + // proposedPrimaryNode' and leaseId is same, is obvious -- + // the leader simply re-sent the partition primary mapping + // advisory. But the case where pi.primaryNode() is null + // (and 'proposedPrimaryNode' is valid) *and* leaseId is + // same can be explained in this way: this node (replica) + // was aware of the primary and leaseId, but then at some + // point, lost connection to the primary, and marked this + // partition as orphan. Note that primary node did not + // crash. After some time, connection was re-established, + // and leader/primary resent the primary mapping again -- + // with same leaseId. This scenario was seen when cluster + // was running on VM boxes. Also note that above scenario + // is different from the case where a node has not heard + // from leader even once. if (info.primaryLeaseId() < pi.primaryLeaseId()) { BMQTSK_ALARMLOG_ALARM("CLUSTER") << d_clusterData_p->identity().description() - << ": Stale primaryLeaseId specified in: " << info + << ": Stale primaryLeaseId specified " + << "in: " << info << ", current primaryLeaseId: " << pi.primaryLeaseId() + << ". Primary node viewed by self: " + << (pi.primaryNode() != 0 + ? pi.primaryNode()->nodeDescription() + : "** null **") + << ", proposed primary node: " + << proposedPrimaryNode->nodeDescription() << ". Ignoring this *ENTIRE* advisory." << BMQTSK_ALARMLOG_END; return rc_ADVISORY_INVALID; // RETURN } } else { - if ((pi.primaryNode() == proposedPrimaryNode) || - (pi.primaryNode() == 0)) { - // Proposed primary node is same as self's primary node, or - // self views this partition as orphan. In either case, - // leaseId cannot be smaller. It can, however, be equal. - // The case in which 'pi.primaryNode() == - // proposedPrimaryNode' and leaseId is same, is obvious -- - // the leader simply re-sent the partition primary mapping - // advisory. But the case where pi.primaryNode() is null - // (and 'proposedPrimaryNode' is valid) *and* leaseId is - // same can be explained in this way: this node (replica) - // was aware of the primary and leaseId, but then at some - // point, lost connection to the primary, and marked this - // partition as orphan. Note that primary node did not - // crash. After some time, connection was re-established, - // and leader/primary resent the primary mapping again -- - // with same leaseId. This scenario was seen when cluster - // was running on VM boxes. Also note that above scenario - // is different from the case where a node has not heard - // from leader even once. - - if (info.primaryLeaseId() < pi.primaryLeaseId()) { - BMQTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << ": Stale primaryLeaseId specified " - << "in: " << info << ", current primaryLeaseId: " - << pi.primaryLeaseId() - << ". Primary node viewed by self: " - << (pi.primaryNode() != 0 - ? pi.primaryNode()->nodeDescription() - : "** null **") - << ", proposed primary node: " - << proposedPrimaryNode->nodeDescription() - << ". Ignoring this *ENTIRE* advisory." - << BMQTSK_ALARMLOG_END; - return rc_ADVISORY_INVALID; // RETURN - } - } - else { - // Different (non-zero) primary nodes. Proposed leaseId - // must be greater. - - if (info.primaryLeaseId() <= pi.primaryLeaseId()) { - BMQTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << ": Stale primaryLeaseId specified in: " << info - << ", current primaryLeaseId: " - << pi.primaryLeaseId() - << ". Ignoring this *ENTIRE* advisory." - << BMQTSK_ALARMLOG_END; - return rc_ADVISORY_INVALID; // RETURN - } + // Different (non-zero) primary nodes. Proposed leaseId + // must be greater. + + if (info.primaryLeaseId() <= pi.primaryLeaseId()) { + BMQTSK_ALARMLOG_ALARM("CLUSTER") + << d_clusterData_p->identity().description() + << ": Stale primaryLeaseId specified in: " << info + << ", current primaryLeaseId: " << pi.primaryLeaseId() + << ". Ignoring this *ENTIRE* advisory." + << BMQTSK_ALARMLOG_END; + return rc_ADVISORY_INVALID; // RETURN } } } } - BALL_LOG_INFO << description() << ": Applying cluster message from '" - << source->nodeDescription() << "': " << message; - rc = applyRecordInternal(event, eventHeaderSize, recordHeaderPosition, @@ -1189,7 +1152,6 @@ IncoreClusterStateLedger::IncoreClusterStateLedger( BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_allocator_p(allocator) -, d_isFirstLeaderAdvisory(true) , d_isOpen(false) , d_blobSpPool_p(blobSpPool_p) , d_description(allocator) @@ -1210,7 +1172,7 @@ IncoreClusterStateLedger::IncoreClusterStateLedger( // Create description bmqu::MemOutStream osstr; osstr << "IncoreClusterStateLedger (cluster: " - << d_clusterData_p->identity().name() << ") : "; + << d_clusterData_p->identity().name() << ")"; d_description.assign(osstr.str().data(), osstr.str().length()); // Instantiate ledger config @@ -1264,11 +1226,9 @@ void IncoreClusterStateLedger::onClusterLeader( d_clusterData_p->cluster().dispatcher()->inDispatcherThread( &d_clusterData_p->cluster())); - if (status == ElectorInfoLeaderStatus::e_PASSIVE) { - return; // RETURN + if (status == ElectorInfoLeaderStatus::e_UNDEFINED) { + cancelUncommittedAdvisories(); } - - cancelUncommittedAdvisories(); } // MANIPULATORS @@ -1512,14 +1472,7 @@ int IncoreClusterStateLedger::apply(const bdlbb::Blob& event, d_clusterData_p->cluster().dispatcher()->inDispatcherThread( &d_clusterData_p->cluster())); - return applyImpl(event, source, - false); // delayed -} - -void IncoreClusterStateLedger::setIsFirstLeaderAdvisory( - bool isFirstLeaderAdvisory) -{ - d_isFirstLeaderAdvisory = isFirstLeaderAdvisory; + return applyImpl(event, source); } // ACCESSORS diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h index 0e30680e9..49dc110d8 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h @@ -153,15 +153,6 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger { bslma::Allocator* d_allocator_p; // Allocator used to supply memory - bool d_isFirstLeaderAdvisory; - // Flag to indicate whether this is - // first leader advisory. *NOTE*: this - // flag is a workaround to address the - // existing cyclic dependency b/w - // leader and primary at node startup - // and will be removed once all CSL - // phases are complete. - bool d_isOpen; // Flag to indicate open/close status // of this object @@ -266,16 +257,14 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger { void cancelUncommittedAdvisories(); /// Apply the specified raw cluster state record `event` received from - /// the specified `source` node according to the specified `delayed` - /// flag indicating if the event was previously buffered. Note that - /// while a replica node may receive any type of records from the - /// leader, the leader may *only* receive ack records from a replica. + /// the specified `source` node. Note that while a replica node may + /// receive any type of records from the leader, the leader may *only* + /// receive ack records from a replica. /// /// THREAD: This method can be invoked only in the associated cluster's /// dispatcher thread. int applyImpl(const bdlbb::Blob& event, - mqbnet::ClusterNode* source, - bool delayed); + mqbnet::ClusterNode* source); // PRIVATE ACCESSORS @@ -389,9 +378,6 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger { int apply(const bdlbb::Blob& event, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; - void - setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) BSLS_KEYWORD_OVERRIDE; - /// Set the commit callback to the specified `value`. void setCommitCb(const CommitCb& value) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp index e0d3f7758..5efa10180 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp @@ -16,13 +16,6 @@ // mqbc_incoreclusterstateledger.t.cpp -*-C++-*- #include -// ---------------------------------------------------------------------------- -// NOTICE -// -// Strong consistency mode is neither implemented nor tested for this -// component. -// ---------------------------------------------------------------------------- - // BMQ #include #include @@ -75,8 +68,6 @@ using namespace bsl; //============================================================================= // TEST PLAN //----------------------------------------------------------------------------- -// NOTE: At this time, we are only testing for eventual consistency. -// // - breathing test - open, accessors (state + description), close // - [OPTIONAL] open, open (fail), close, close (fail) // - apply (leader + follower): @@ -194,8 +185,8 @@ struct Tester { public: // PUBLIC DATA + bool d_isLeader; bdlbb::PooledBlobBufferFactory d_bufferFactory; - mqbc::ClusterStateLedgerConsistency::Enum d_consistencyLevel; bmqu::TempDirectory d_tempDir; bsl::string d_location; bslma::ManagedPtr d_cluster_mp; @@ -206,8 +197,8 @@ struct Tester { public: // CREATORS Tester(bool isLeader = true, const bslstl::StringRef& location = "") - : d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator()) - , d_consistencyLevel(mqbc::ClusterStateLedgerConsistency::e_EVENTUAL) + : d_isLeader(isLeader) + , d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator()) , d_tempDir(bmqtst::TestHelperUtil::allocator()) , d_location( !location.empty() @@ -271,9 +262,14 @@ struct Tester { ->lookupNode(mqbmock::Cluster::k_LEADER_NODE_ID); BSLS_ASSERT_OPT(leaderNode != 0); d_cluster_mp->_clusterData()->electorInfo().setElectorInfo( - mqbnet::ElectorState::e_LEADER, + d_isLeader ? mqbnet::ElectorState::e_LEADER + : mqbnet::ElectorState::e_FOLLOWER, 1, // term leaderNode, + mqbc::ElectorInfoLeaderStatus::e_PASSIVE); + // It is **prohibited** to set leader status directly from e_UNDEFINED + // to e_ACTIVE. Hence, we do: e_UNDEFINED -> e_PASSIVE -> e_ACTIVE + d_cluster_mp->_clusterData()->electorInfo().setLeaderStatus( mqbc::ElectorInfoLeaderStatus::e_ACTIVE); // Set partition primaries in the cluster state @@ -291,7 +287,7 @@ struct Tester { new (*bmqtst::TestHelperUtil::allocator()) mqbc::IncoreClusterStateLedger( d_cluster_mp->_clusterDefinition(), - d_consistencyLevel, + mqbc::ClusterStateLedgerConsistency::e_STRONG, d_cluster_mp->_clusterData(), &d_cluster_mp->_state(), d_cluster_mp->_blobSpPool(), @@ -360,6 +356,35 @@ struct Tester { bdlbb::BlobUtil::append(event, record); } + /// Let the specified `ledger` receive the specified `numAcks` acks for the record having the specific `sequenceNumber`. Behavior is undefined unless the caller is the leader node. + void receiveAck(mqbc::IncoreClusterStateLedger *ledger, + const bmqp_ctrlmsg::LeaderMessageSequence& sequenceNumber, + int numAcks) + { + // PRECONDITIONS + BSLS_ASSERT_OPT(d_isLeader); + + bmqp_ctrlmsg::LeaderAdvisoryAck ack; + ack.sequenceNumberAcked() = sequenceNumber; + + bmqp_ctrlmsg::ClusterMessage message; + message.choice().makeLeaderAdvisoryAck(ack); + + bdlbb::Blob ackEvent(d_cluster_mp->_bufferFactory(), s_allocator_p); + constructEventBlob(&ackEvent, + message, + ack.sequenceNumberAcked(), + 123456, + mqbc::ClusterStateRecordType::e_ACK); + + for (int i = 1; i <= numAcks; ++i) { + BMQTST_ASSERT_EQ(ledger->apply(ackEvent, + d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID + i)), + 0); + } + } + // ACCESSORS bool hasNoMoreBroadcastedMessages() const { @@ -377,13 +402,37 @@ struct Tester { return true; } - bool hasBroadcastedMessages(int number, bool isFinal = true) const + /// Return true if we the follower has sent `number` messages to the leader, false otherwise. Behavior is undefined unless the caller is a follower node. + bool hasSentMessagesToLeader(int number) const { // PRECONDITIONS BSLS_ASSERT_OPT(d_cluster_mp->_channels().size() > 0); + BSLS_ASSERT_OPT(!d_isLeader); + + for (TestChannelMapCIter citer = d_cluster_mp->_channels().cbegin(); + citer != d_cluster_mp->_channels().cend(); + ++citer) { + if (citer->first->nodeId() == mqbmock::Cluster::k_LEADER_NODE_ID) { + if (!citer->second->waitFor(number)) { + return false; // RETURN + } + BSLS_ASSERT_OPT(citer->second->writeCalls().size() >= number); + } else { + BSLS_ASSERT_OPT((!citer->second->waitFor(1))); + BSLS_ASSERT_OPT(citer->second->writeCalls().empty()); + } + } + + return true; + } + + /// Return true if we the leader has broadcast `number` messages, false otherwise. Behavior is undefined unless the caller is the leader node. + bool hasBroadcastedMessages(int number) const + { + // PRECONDITIONS + BSLS_ASSERT_OPT(d_cluster_mp->_channels().size() > 0); + BSLS_ASSERT_OPT(d_isLeader); - size_t numMessages = 0; - bool first = true; for (TestChannelMapCIter citer = d_cluster_mp->_channels().cbegin(); citer != d_cluster_mp->_channels().cend(); ++citer) { @@ -394,18 +443,10 @@ struct Tester { continue; // CONTINUE } - if (!citer->second->waitFor(number, isFinal)) { + if (!citer->second->waitFor(number)) { return false; // RETURN } - - if (first) { - numMessages = citer->second->writeCalls().size(); - first = false; - } - else { - BSLS_ASSERT_OPT(numMessages == - citer->second->writeCalls().size()); - } + BSLS_ASSERT_OPT(citer->second->writeCalls().size() >= number); } return true; @@ -503,7 +544,7 @@ static void test1_breathingTest() BMQTST_ASSERT_EQ(obj->open(), 0); BMQTST_ASSERT_EQ(obj->description(), - "IncoreClusterStateLedger (cluster: testCluster) : "); + "IncoreClusterStateLedger (cluster: testCluster)"); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); BMQTST_ASSERT(tester.hasBroadcastedMessages(0)); @@ -516,7 +557,8 @@ static void test2_apply_PartitionPrimaryAdvisory() // PARTITION PRIMARY INFO // // Concerns: -// Applying 'PartitionPrimaryAdvisory' (only at leader). +// Apply 'PartitionPrimaryAdvisory' (only at leader), receive a quorum of +// acks, then commit the advisory. // // Testing: // int apply(const bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory); @@ -528,7 +570,7 @@ static void test2_apply_PartitionPrimaryAdvisory() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'PartitionPrimaryAdvisory' + // Apply 'PartitionPrimaryAdvisory' bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; @@ -541,17 +583,23 @@ static void test2_apply_PartitionPrimaryAdvisory() .nextLeaderMessageSequence(&advisory.sequenceNumber()); BMQTST_ASSERT_EQ(obj->apply(advisory), 0); - // Verify bmqp_ctrlmsg::ControlMessage expected; expected.choice() .makeClusterMessage() .choice() .makePartitionPrimaryAdvisory(advisory); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(1)); + BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); + + // Receive a quorum of acks + tester.receiveAck(obj, advisory.sequenceNumber(), 3); + + // The advisory should be committed after quorum of acks BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); - BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); BSLS_ASSERT_OPT(obj->close() == 0); BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); @@ -562,7 +610,7 @@ static void test3_apply_QueueAssignmentAdvisory() // QUEUE ASSIGNMENT ADVISORY // // Concerns: -// Applying 'QueueAssignmentAdvisory' (only at leader). +// Applying 'QueueAssignmentAdvisory' (only at leader), receive a quorum of acks, then commit the advisory. // // Testing: // int apply(const bmqp_ctrlmsg::QueueAssignmentAdvisory& advisory); @@ -574,7 +622,7 @@ static void test3_apply_QueueAssignmentAdvisory() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'QueueAssignmentAdvisory' + // Apply 'QueueAssignmentAdvisory' bmqp_ctrlmsg::QueueAssignmentAdvisory qadvisory; tester.d_cluster_mp->_clusterData() ->electorInfo() @@ -591,19 +639,26 @@ static void test3_apply_QueueAssignmentAdvisory() BMQTST_ASSERT_EQ(obj->apply(qadvisory), 0); - // Verify bmqp_ctrlmsg::ControlMessage expected; expected.choice() .makeClusterMessage() .choice() .makeQueueAssignmentAdvisory(qadvisory); - BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); - BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(1)); BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); + + // Receive a quorum of acks + tester.receiveAck(obj, qadvisory.sequenceNumber(), 3); + + // The advisory should be committed after quorum of acks + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); + BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); BSLS_ASSERT_OPT(obj->close() == 0); + BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); } static void test4_apply_QueueUnassignedAdvisory() @@ -611,7 +666,7 @@ static void test4_apply_QueueUnassignedAdvisory() // QUEUE UNASSIGNED ADVISORY // // Concerns: -// Applying 'QueueUnassignedAdvisory' (only at leader). +// Applying 'QueueUnassignedAdvisory' (only at leader), receive a quorum of acks, then commit the advisory. // // Testing: // int apply(const bmqp_ctrlmsg::QueueUnassignedAdvisory& advisory); @@ -623,7 +678,7 @@ static void test4_apply_QueueUnassignedAdvisory() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'QueueUnassignedAdvisory' + // Apply 'QueueUnassignedAdvisory' bmqp_ctrlmsg::QueueUnassignedAdvisory qadvisory; tester.d_cluster_mp->_clusterData() ->electorInfo() @@ -637,18 +692,25 @@ static void test4_apply_QueueUnassignedAdvisory() BMQTST_ASSERT_EQ(obj->apply(qadvisory), 0); - // Verify bmqp_ctrlmsg::ControlMessage expected; expected.choice() .makeClusterMessage() .choice() .makeQueueUnassignedAdvisory(qadvisory); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(1)); + BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); + + // Receive a quorum of acks + tester.receiveAck(obj, qadvisory.sequenceNumber(), 3); + + // The advisory should be committed after quorum of acks BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); - BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); BSLS_ASSERT_OPT(obj->close() == 0); + BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); } static void test5_apply_QueueUpdateAdvisory() @@ -656,7 +718,7 @@ static void test5_apply_QueueUpdateAdvisory() // QUEUE UPDATE ADVISORY // // Concerns: -// Applying 'QueueUpdateAdvisory' (only at leader). +// Applying 'QueueUpdateAdvisory' (only at leader), receive a quorum of acks, then commit the advisory. // // Testing: // int apply(const bmqp_ctrlmsg::QueueUpdateAdvisory& advisory); @@ -668,7 +730,7 @@ static void test5_apply_QueueUpdateAdvisory() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'QueueUpdateAdvisory' + // Apply 'QueueUpdateAdvisory' bmqp_ctrlmsg::QueueUpdateAdvisory qadvisory; tester.d_cluster_mp->_clusterData() ->electorInfo() @@ -701,16 +763,23 @@ static void test5_apply_QueueUpdateAdvisory() BMQTST_ASSERT_EQ(obj->apply(qadvisory), 0); - // Verify bmqp_ctrlmsg::ControlMessage expected; expected.choice().makeClusterMessage().choice().makeQueueUpdateAdvisory( qadvisory); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(1)); + BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); + + // Receive a quorum of acks + tester.receiveAck(obj, qadvisory.sequenceNumber(), 3); + + // The advisory should be committed after quorum of acks BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); - BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); BSLS_ASSERT_OPT(obj->close() == 0); + BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); } static void test6_apply_LeaderAdvisory() @@ -718,7 +787,7 @@ static void test6_apply_LeaderAdvisory() // LEADER ADVISORY // // Concerns: -// Applying 'LeaderAdvisory' (only at leader). +// Applying 'LeaderAdvisory' (only at leader), receive a quorum of acks, then commit the advisory. // // Testing: // int apply(const bmqp_ctrlmsg::LeaderAdvisory& advisory); @@ -730,7 +799,7 @@ static void test6_apply_LeaderAdvisory() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'LeaderAdvisory' + // Apply 'LeaderAdvisory' bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; @@ -752,14 +821,20 @@ static void test6_apply_LeaderAdvisory() BMQTST_ASSERT_EQ(obj->apply(leaderAdvisory), 0); - // Verify bmqp_ctrlmsg::ControlMessage expected; expected.choice().makeClusterMessage().choice().makeLeaderAdvisory( leaderAdvisory); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(1)); + BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); + + // Receive a quorum of acks + tester.receiveAck(obj, leaderAdvisory.sequenceNumber(), 3); + + // The advisory should be committed after quorum of acks BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); - BMQTST_ASSERT_EQ(tester.broadcastedMessage(0), expected); BSLS_ASSERT_OPT(obj->close() == 0); BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); @@ -784,7 +859,7 @@ static void test7_apply_ClusterStateRecord() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // 1. Create an update record + // Create an update record bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; @@ -810,11 +885,11 @@ static void test7_apply_ClusterStateRecord() // Apply the update record BMQTST_ASSERT_EQ(obj->apply(updateEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID)), - 0); + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); - BMQTST_ASSERT(tester.hasBroadcastedMessages(0)); + BMQTST_ASSERT(tester.hasSentMessagesToLeader(1)); // Verify that the underlying ledger contains the update record bslma::ManagedPtr cslIter = @@ -858,11 +933,11 @@ static void test7_apply_ClusterStateRecord() // Apply the snapshot record BMQTST_ASSERT_EQ(obj->apply(snapshotEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID)), - 0); + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); - BMQTST_ASSERT(tester.hasBroadcastedMessages(0)); + BMQTST_ASSERT(tester.hasSentMessagesToLeader(1)); BMQTST_ASSERT_EQ(cslIter->next(), 0); BMQTST_ASSERT(cslIter->isValid()); @@ -877,84 +952,15 @@ static void test7_apply_ClusterStateRecord() BMQTST_ASSERT_EQ(msg.choice().leaderAdvisory(), leaderAdvisory); } -static void test8_apply_ClusterStateRecordAck() -// ------------------------------------------------------------------------ -// CLUSTER STATE RECORD ACK -// -// Concerns: -// Applying 'LeaderAdvisoryAck' (only at leader). -// - For an invalid advisory (one that has already been committed) -// NOTE: At this time only weak consistency is implemented, so all -// valid advisories are immediately committed and this test driver -// can only test for an "invalid" advisory. -// Once strong consistency is supported, testing an ACK for valid -// advisories will be possible. -// -// Testing: -// int apply(const bdlbb::Blob& record) // for 'record' of type 'e_ACK' -// ------------------------------------------------------------------------ -{ - bmqtst::TestHelper::printTestName("APPLY - CLUSTER STATE RECORD ACK"); - - Tester tester; - mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); - BSLS_ASSERT_OPT(obj->open() == 0); - - bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; - pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; - pinfo.partitionId() = 1U; - pinfo.primaryLeaseId() = 2U; - - // Build 'PartitionPrimaryAdvisory' - bmqp_ctrlmsg::PartitionPrimaryAdvisory advisory; - advisory.partitions().push_back(pinfo); - tester.d_cluster_mp->_clusterData() - ->electorInfo() - .nextLeaderMessageSequence(&advisory.sequenceNumber()); - - BMQTST_ASSERT_EQ(obj->apply(advisory), 0); - - bmqp_ctrlmsg::ControlMessage expected; - expected.choice() - .makeClusterMessage() - .choice() - .makePartitionPrimaryAdvisory(advisory); - BSLS_ASSERT_OPT(tester.numCommittedMessages() == 1U); - BSLS_ASSERT_OPT(tester.committedMessage(0) == expected); - BSLS_ASSERT_OPT(tester.hasBroadcastedMessages(2)); - BSLS_ASSERT_OPT(tester.broadcastedMessage(0) == expected); - - // Build 'LeaderAdvisoryAck' - bmqp_ctrlmsg::LeaderAdvisoryAck ack; - ack.sequenceNumberAcked() = advisory.sequenceNumber(); - - bmqp_ctrlmsg::ClusterMessage message; - message.choice().makeLeaderAdvisoryAck(ack); - - bdlbb::Blob ackEvent(tester.d_cluster_mp->_bufferFactory(), - bmqtst::TestHelperUtil::allocator()); - tester.constructEventBlob(&ackEvent, - message, - ack.sequenceNumberAcked(), - 123456, - mqbc::ClusterStateRecordType::e_ACK); - - BMQTST_ASSERT_NE(obj->apply(ackEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID + 1)), - 0); - BSLS_ASSERT_OPT(obj->close() == 0); - BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); -} - -static void test9_apply_ClusterStateRecordCommit() +static void test8_apply_ClusterStateRecordCommit() // ------------------------------------------------------------------------ // CLUSTER STATE RECORD COMMIT // // Concerns: // Applying 'LeaderAdvisoryCommit' (we test only at follower). -// - For a valid advisory (one that has been previously applied) -// - For an invalid advisory +// - Should pass for an uncommited advisory +// - Should fail for an advisory that has already been committed +// - Should fail for an invalid sequence number // // Testing: // int apply(const bdlbb::Blob& record) // for 'record' of type @@ -967,7 +973,7 @@ static void test9_apply_ClusterStateRecordCommit() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // Build 'PartitionPrimaryAdvisory' + // Apply 'PartitionPrimaryAdvisory' bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; @@ -989,16 +995,14 @@ static void test9_apply_ClusterStateRecordCommit() 123456, mqbc::ClusterStateRecordType::e_UPDATE); - // Apply advisory BMQTST_ASSERT_EQ(obj->apply(advisoryEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID)), - 0); + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 0U); - BMQTST_ASSERT(tester.hasBroadcastedMessages(0)); + BMQTST_ASSERT(tester.hasSentMessagesToLeader(1)); - // 1. Build and apply 'LeaderAdvisoryCommit' for an advisory that has been - // previously applied (but not yet committed) + // 1. Should pass for an uncommited advisory bmqp_ctrlmsg::LeaderAdvisoryCommit commit; commit.sequenceNumberCommitted() = advisory.sequenceNumber(); commit.sequenceNumber().electorTerm() = 1U; @@ -1015,12 +1019,12 @@ static void test9_apply_ClusterStateRecordCommit() 123567, mqbc::ClusterStateRecordType::e_COMMIT); - // Verify commit BMQTST_ASSERT_EQ(obj->apply(commitEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID)), - 0); + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); + BMQTST_ASSERT(tester.hasSentMessagesToLeader(1)); bmqp_ctrlmsg::ControlMessage expected; expected.choice() @@ -1029,19 +1033,42 @@ static void test9_apply_ClusterStateRecordCommit() .makePartitionPrimaryAdvisory(advisory); BMQTST_ASSERT_EQ(tester.committedMessage(0), expected); - // 2. Apply 'LeaderAdvisoryCommit' for an advisory that has already been - // previously committed + // 2. Should fail for an advisory that has already been committed BMQTST_ASSERT_NE(obj->apply(commitEvent, - tester.d_cluster_mp->netCluster().lookupNode( - mqbmock::Cluster::k_LEADER_NODE_ID)), - 0); + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); + + // 3. Should fail for an invalid sequence number + bmqp_ctrlmsg::LeaderAdvisoryCommit invalidCommit; + invalidCommit.sequenceNumberCommitted().electorTerm() = 999U; + invalidCommit.sequenceNumberCommitted().sequenceNumber() = 999U; + invalidCommit.sequenceNumber().electorTerm() = 1U; + invalidCommit.sequenceNumber().sequenceNumber() = 4U; + + bmqp_ctrlmsg::ClusterMessage invalidCommitMessage; + invalidCommitMessage.choice().makeLeaderAdvisoryCommit(invalidCommit); + + bdlbb::Blob invalidCommitEvent(tester.d_cluster_mp->_bufferFactory(), + s_allocator_p); + tester.constructEventBlob(&invalidCommitEvent, + invalidCommitMessage, + invalidCommit.sequenceNumber(), + 123567, + mqbc::ClusterStateRecordType::e_COMMIT); + + BMQTST_ASSERT_NE(obj->apply(invalidCommitEvent, + tester.d_cluster_mp->netCluster().lookupNode( + mqbmock::Cluster::k_LEADER_NODE_ID)), + 0); BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); BSLS_ASSERT_OPT(obj->close() == 0); BMQTST_ASSERT(tester.hasNoMoreBroadcastedMessages()); } -static void test10_persistanceLeader() +static void test9_persistanceLeader() // ------------------------------------------------------------------------ // PERSISTENCE LEADER // @@ -1049,7 +1076,7 @@ static void test10_persistanceLeader() // IncoreCSL provides persistence of the logs at the leader node. // // Plan: -// 1 Apply advisories of different types +// 1 Apply and commit advisories of different types // 2 Close the CSL // 3 Open the CSL and instantiate ClusterStateLedgerIterator // 4 Iterate through the records and verify that they are as expected @@ -1064,107 +1091,102 @@ static void test10_persistanceLeader() mqbc::IncoreClusterStateLedger* obj = tester.d_clusterStateLedger_mp.get(); BSLS_ASSERT_OPT(obj->open() == 0); - // 1. Apply advisories of different types + // 1. Apply and commit advisories of different types - // Apply 'PartitionPrimaryAdvisory' + // Apply and commit 'PartitionPrimaryAdvisory' + bmqp_ctrlmsg::PartitionPrimaryAdvisory pmAdvisory; + tester.d_cluster_mp->_clusterData() + ->electorInfo() + .nextLeaderMessageSequence(&pmAdvisory.sequenceNumber()); bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; pinfo.primaryLeaseId() = 2U; - - bmqp_ctrlmsg::PartitionPrimaryAdvisory pmAdvisory; pmAdvisory.partitions().push_back(pinfo); - tester.d_cluster_mp->_clusterData() - ->electorInfo() - .nextLeaderMessageSequence(&pmAdvisory.sequenceNumber()); + BSLS_ASSERT_OPT(obj->apply(pmAdvisory) == 0); - BSLS_ASSERT_OPT(tester.numCommittedMessages() == 1U); - // Apply 'QueueAssignmentAdvisory' + tester.receiveAck(obj, pmAdvisory.sequenceNumber(), 3); + BMQTST_ASSERT_EQ(tester.numCommittedMessages(), 1U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(2)); + + // Apply and commit 'QueueAssignmentAdvisory' bmqp_ctrlmsg::QueueAssignmentAdvisory qAssignAdvisory; tester.d_cluster_mp->_clusterData() ->electorInfo() .nextLeaderMessageSequence(&qAssignAdvisory.sequenceNumber()); - bmqp_ctrlmsg::QueueInfo qinfo; - qinfo.uri() = "bmq://bmq.test.mmap.priority/q1"; + qinfo.uri() = "bmq://bmq.test.mmap.fanout/q1"; qinfo.partitionId() = 1U; - mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(), "7777"); key.loadBinary(&qinfo.key()); - qAssignAdvisory.queues().push_back(qinfo); BSLS_ASSERT_OPT(obj->apply(qAssignAdvisory) == 0); - BSLS_ASSERT_OPT(tester.numCommittedMessages() == 2U); - - // Apply 'QueueUnassignedAdvisory' - bmqp_ctrlmsg::QueueUnassignedAdvisory qUnassignedAdvisory; - tester.d_cluster_mp->_clusterData() - ->electorInfo() - .nextLeaderMessageSequence(&qUnassignedAdvisory.sequenceNumber()); - - qUnassignedAdvisory.queues().push_back(qinfo); - BSLS_ASSERT_OPT(obj->apply(qUnassignedAdvisory) == 0); - BSLS_ASSERT_OPT(tester.numCommittedMessages() == 3U); + tester.receiveAck(obj, qAssignAdvisory.sequenceNumber(), 3); + BSLS_ASSERT_OPT(tester.numCommittedMessages() == 2U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(4)); - // Apply 'QueueUpdateAdvisory' + // Apply and commit 'QueueUpdateAdvisory' bmqp_ctrlmsg::QueueUpdateAdvisory qUpdateAdvisory; tester.d_cluster_mp->_clusterData() ->electorInfo() .nextLeaderMessageSequence(&qUpdateAdvisory.sequenceNumber()); - bmqp_ctrlmsg::QueueInfoUpdate qupdate; - qupdate.uri() = "bmq://bmq.test.mmap.priority/q1"; + qupdate.uri() = "bmq://bmq.test.mmap.fanout/q1"; qupdate.partitionId() = 1U; - - mqbu::StorageKey key2(mqbu::StorageKey::BinaryRepresentation(), "8888"); - key2.loadBinary(&qupdate.key()); - + key.loadBinary(&qupdate.key()); qupdate.addedAppIds().resize(1); - qupdate.removedAppIds().resize(1); - qupdate.domain() = "bmq.test.mmap.priority"; - + qupdate.domain() = "bmq.test.mmap.fanout"; bmqp_ctrlmsg::AppIdInfo& addedAppId = qupdate.addedAppIds().back(); - addedAppId.appId() = "App1"; + addedAppId.appId() = "qux"; mqbu::StorageKey appKey1(mqbu::StorageKey::BinaryRepresentation(), "12345"); appKey1.loadBinary(&addedAppId.appKey()); - - bmqp_ctrlmsg::AppIdInfo& removedAppId = qupdate.removedAppIds().back(); - removedAppId.appId() = "App2"; - mqbu::StorageKey appKey2(mqbu::StorageKey::BinaryRepresentation(), - "23456"); - appKey2.loadBinary(&removedAppId.appKey()); - qUpdateAdvisory.queueUpdates().push_back(qupdate); BSLS_ASSERT_OPT(obj->apply(qUpdateAdvisory) == 0); + + tester.receiveAck(obj, qUpdateAdvisory.sequenceNumber(), 3); + BSLS_ASSERT_OPT(tester.numCommittedMessages() == 3U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(6)); + + // Apply and commit 'QueueUnassignedAdvisory' + bmqp_ctrlmsg::QueueUnassignedAdvisory qUnassignedAdvisory; + tester.d_cluster_mp->_clusterData() + ->electorInfo() + .nextLeaderMessageSequence(&qUnassignedAdvisory.sequenceNumber()); + qUnassignedAdvisory.queues().push_back(qinfo); + + BSLS_ASSERT_OPT(obj->apply(qUnassignedAdvisory) == 0); + + tester.receiveAck(obj, qUnassignedAdvisory.sequenceNumber(), 3); BSLS_ASSERT_OPT(tester.numCommittedMessages() == 4U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(8)); - // Apply 'LeaderAdvisory' + // Apply and commit 'LeaderAdvisory' + bmqp_ctrlmsg::LeaderAdvisory leaderAdvisory; + tester.d_cluster_mp->_clusterData() + ->electorInfo() + .nextLeaderMessageSequence(&leaderAdvisory.sequenceNumber()); bmqp_ctrlmsg::PartitionPrimaryInfo pinfo2; pinfo2.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo2.partitionId() = 2U; pinfo2.primaryLeaseId() = 2U; - bmqp_ctrlmsg::QueueInfo qinfo2; - qinfo2.uri() = "bmq://bmq.test.mmap.priority/q2"; + qinfo2.uri() = "bmq://bmq.test.mmap.fanout/q2"; qinfo2.partitionId() = 2U; - - mqbu::StorageKey key3(mqbu::StorageKey::BinaryRepresentation(), "9999"); - key3.loadBinary(&qinfo2.key()); - - bmqp_ctrlmsg::LeaderAdvisory leaderAdvisory; + mqbu::StorageKey key2(mqbu::StorageKey::BinaryRepresentation(), "9999"); + key2.loadBinary(&qinfo2.key()); leaderAdvisory.queues().push_back(qinfo2); leaderAdvisory.partitions().push_back(pinfo2); - tester.d_cluster_mp->_clusterData() - ->electorInfo() - .nextLeaderMessageSequence(&leaderAdvisory.sequenceNumber()); BSLS_ASSERT_OPT(obj->apply(leaderAdvisory) == 0); + + tester.receiveAck(obj, leaderAdvisory.sequenceNumber(), 3); BSLS_ASSERT_OPT(tester.numCommittedMessages() == 5U); + BMQTST_ASSERT(tester.hasBroadcastedMessages(10)); // 2. Close the CSL BSLS_ASSERT_OPT(obj->close() == 0); @@ -1212,40 +1234,39 @@ static void test10_persistanceLeader() verifyLeaderAdvisoryCommit(*cslIter, qAssignAdvisory.sequenceNumber()); - // Verify 'QueueUnassignedAdvisory' and its commit + // Verify 'QueueUpdateAdvisory' and its commit BMQTST_ASSERT_EQ(cslIter->next(), 0); BMQTST_ASSERT(cslIter->isValid()); verifyRecordHeader(*cslIter, mqbc::ClusterStateRecordType::e_UPDATE, - qUnassignedAdvisory.sequenceNumber()); + qUpdateAdvisory.sequenceNumber()); rc = cslIter->loadClusterMessage(&msg); BMQTST_ASSERT_EQ(rc, 0); - BMQTST_ASSERT(msg.choice().isQueueUnassignedAdvisoryValue()); - BMQTST_ASSERT_EQ(msg.choice().queueUnassignedAdvisory(), - qUnassignedAdvisory); + BMQTST_ASSERT(msg.choice().isQueueUpdateAdvisoryValue()); + BMQTST_ASSERT_EQ(msg.choice().queueUpdateAdvisory(), qUpdateAdvisory); BMQTST_ASSERT_EQ(cslIter->next(), 0); BMQTST_ASSERT(cslIter->isValid()); - verifyLeaderAdvisoryCommit(*cslIter, qUnassignedAdvisory.sequenceNumber()); + verifyLeaderAdvisoryCommit(*cslIter, qUpdateAdvisory.sequenceNumber()); - // Verify 'QueueUpdateAdvisory' and its commit + // Verify 'QueueUnassignedAdvisory' and its commit BMQTST_ASSERT_EQ(cslIter->next(), 0); BMQTST_ASSERT(cslIter->isValid()); verifyRecordHeader(*cslIter, mqbc::ClusterStateRecordType::e_UPDATE, - qUpdateAdvisory.sequenceNumber()); + qUnassignedAdvisory.sequenceNumber()); rc = cslIter->loadClusterMessage(&msg); BMQTST_ASSERT_EQ(rc, 0); - BMQTST_ASSERT(msg.choice().isQueueUpdateAdvisoryValue()); - BMQTST_ASSERT_EQ(msg.choice().queueUpdateAdvisory(), qUpdateAdvisory); + BMQTST_ASSERT(msg.choice().isQueueUnassignedAdvisoryValue()); + BMQTST_ASSERT_EQ(msg.choice().queueUnassignedAdvisory(), qUnassignedAdvisory); BMQTST_ASSERT_EQ(cslIter->next(), 0); BMQTST_ASSERT(cslIter->isValid()); - verifyLeaderAdvisoryCommit(*cslIter, qUpdateAdvisory.sequenceNumber()); + verifyLeaderAdvisoryCommit(*cslIter, qUnassignedAdvisory.sequenceNumber()); // Verify 'LeaderAdvisory' and its commit BMQTST_ASSERT_EQ(cslIter->next(), 0); @@ -1269,7 +1290,7 @@ static void test10_persistanceLeader() BMQTST_ASSERT(!cslIter->isValid()); } -static void test11_persistanceFollower() +static void test10_persistanceFollower() // ------------------------------------------------------------------------ // PERSISTENCE FOLLOWER // @@ -1559,7 +1580,7 @@ static void test11_persistanceFollower() BMQTST_ASSERT(!cslIter->isValid()); } -static void test12_persistanceAcrossRollover() +static void test11_persistanceAcrossRollover() // ------------------------------------------------------------------------ // PERSISTENCE ACROSS ROLLOVER // @@ -1569,8 +1590,8 @@ static void test12_persistanceAcrossRollover() // // Plan: // Open logs that were already written at a particular location: -// 1 Apply enough advisories to trigger rollover -// 2 Apply some more advisories and "save" them in a list, +// 1 Apply and commit enough advisories to trigger rollover +// 2 Apply and commit some more advisories and "save" them in a list, // lastAdvisories // 3 Close the CSL // 4 Open the CSL and instantiate ClusterStateLedgerIterator. @@ -1612,7 +1633,7 @@ static void test12_persistanceAcrossRollover() int rc = encoder.encode(&osb, qadvisory); BSLS_ASSERT_OPT(rc == 0); - // 1. Apply enough advisories to trigger rollover + // 1. Apply and commit enough advisories to trigger rollover size_t i = 0; while (ledger->numLogs() == 1U) { tester.d_cluster_mp->_clusterData() @@ -1621,6 +1642,9 @@ static void test12_persistanceAcrossRollover() BMQTST_ASSERT_EQ(obj->apply(qadvisory), 0); + // Receive a quorum of acks + tester.receiveAck(obj, qadvisory.sequenceNumber(), 3); + bmqp_ctrlmsg::ControlMessage expected; expected.choice() .makeClusterMessage() @@ -1640,7 +1664,7 @@ static void test12_persistanceAcrossRollover() // for 1 second to make sure the logs are written 1 second apart. sleep(1); - // 2. Apply some more advisories and "save" them in a list + // 2. Apply and commit some more advisories and "save" them in a list bsl::vector lastAdvisories( bmqtst::TestHelperUtil::allocator()); @@ -1654,7 +1678,7 @@ static void test12_persistanceAcrossRollover() qadvisory.sequenceNumber(), mqbc::ClusterStateRecordType::e_UPDATE)); - // Apply 'PartitionPrimaryAdvisory' + // Apply and commit 'PartitionPrimaryAdvisory' bmqp_ctrlmsg::PartitionPrimaryInfo pinfo; pinfo.primaryNodeId() = mqbmock::Cluster::k_LEADER_NODE_ID; pinfo.partitionId() = 1U; @@ -1667,6 +1691,9 @@ static void test12_persistanceAcrossRollover() .nextLeaderMessageSequence(&pmAdvisory.sequenceNumber()); BMQTST_ASSERT_EQ(obj->apply(pmAdvisory), 0); + // Receive a quorum of acks + tester.receiveAck(obj, pmAdvisory.sequenceNumber(), 3); + bmqp_ctrlmsg::ControlMessage expectedPmAdvisory; expectedPmAdvisory.choice() .makeClusterMessage() @@ -1699,6 +1726,9 @@ static void test12_persistanceAcrossRollover() BMQTST_ASSERT_EQ(obj->apply(qUnassignedAdvisory), 0); + // Receive a quorum of acks + tester.receiveAck(obj, qUnassignedAdvisory.sequenceNumber(), 3); + bmqp_ctrlmsg::ControlMessage expectedQUnassignedAdvisory; expectedQUnassignedAdvisory.choice() .makeClusterMessage() @@ -1738,6 +1768,9 @@ static void test12_persistanceAcrossRollover() BMQTST_ASSERT_EQ(obj->apply(leaderAdvisory), 0); + // Receive a quorum of acks + tester.receiveAck(obj, leaderAdvisory.sequenceNumber(), 3); + bmqp_ctrlmsg::ControlMessage expectedLeaderAdvisory; expectedLeaderAdvisory.choice() .makeClusterMessage() @@ -1806,16 +1839,9 @@ static void test12_persistanceAcrossRollover() BMQTST_ASSERT(!cslIter->isValid()); BSLS_ASSERT_OPT(obj->close() == 0); - - // TODO Test that if node is available, a cluster state snapshot is written - // upon rollover, and that a snapshot is *NOT* written if node is not - // available. - // Can explicitly set node status via: - // d_cluster_mp->_clusterData()->membership().setSelfNodeStatus( - // bmqp_ctrlmsg::NodeStatus::E_AVAILABLE); } -static void test13_rolloverUncommittedAdvisories() +static void test12_rolloverUncommittedAdvisories() // ------------------------------------------------------------------------ // ROLLOVER UNCOMMITTED ADVISORIES // @@ -1825,7 +1851,7 @@ static void test13_rolloverUncommittedAdvisories() // // Plan: // 1 Apply some advisory to remain uncommitted -// 2 Apply enough committed advisories to trigger rollover +// 2 Apply and commit enough advisories to trigger rollover // 3 Close the CSL // 4 Open the CSL and instantiate ClusterStateLedgerIterator. // 5 Verify that the uncommitted advisories are written to the new log, @@ -1885,7 +1911,7 @@ static void test13_rolloverUncommittedAdvisories() mqbc::ClusterStateRecordType::e_UPDATE)); BSLS_ASSERT_OPT(tester.numCommittedMessages() == 0); - BSLS_ASSERT_OPT(tester.hasBroadcastedMessages(0)); + BSLS_ASSERT_OPT(tester.hasSentMessagesToLeader(1)); // Apply 'QueueUnassignedAdvisory' bmqp_ctrlmsg::ClusterMessage qUnassignedAdvisoryMsg; @@ -1927,7 +1953,7 @@ static void test13_rolloverUncommittedAdvisories() mqbc::ClusterStateRecordType::e_UPDATE)); BSLS_ASSERT_OPT(tester.numCommittedMessages() == 0); - BSLS_ASSERT_OPT(tester.hasBroadcastedMessages(0)); + BSLS_ASSERT_OPT(tester.hasSentMessagesToLeader(2)); // Apply 'LeaderAdvisory' bmqp_ctrlmsg::PartitionPrimaryInfo pinfo2; @@ -1974,9 +2000,9 @@ static void test13_rolloverUncommittedAdvisories() mqbc::ClusterStateRecordType::e_SNAPSHOT)); BSLS_ASSERT_OPT(tester.numCommittedMessages() == 0); - BSLS_ASSERT_OPT(tester.hasBroadcastedMessages(0)); + BSLS_ASSERT_OPT(tester.hasSentMessagesToLeader(3)); - // 2. Apply enough committed advisories to trigger rollover + // 2. Apply and commit enough advisories to trigger rollover // Build 'QueueAssignmentAdvisory' bmqp_ctrlmsg::ClusterMessage qAssignAdvisoryMsg; @@ -2006,7 +2032,7 @@ static void test13_rolloverUncommittedAdvisories() bmqp_ctrlmsg::LeaderAdvisoryCommit& commit = commitMsg.choice().makeLeaderAdvisoryCommit(); - // Repeatedly apply above advisory and its commit, until rollover + // Repeatedly apply and commit above advisory, until rollover size_t i = 0; while (ledger->numLogs() == 1U) { tester.d_cluster_mp->_clusterData() @@ -2051,7 +2077,7 @@ static void test13_rolloverUncommittedAdvisories() .makeQueueAssignmentAdvisory(qAssignAdvisory); BSLS_ASSERT_OPT(tester.numCommittedMessages() == i + 1); BSLS_ASSERT_OPT(tester.committedMessage(i) == expected); - BSLS_ASSERT_OPT(tester.hasBroadcastedMessages(0)); + BSLS_ASSERT_OPT(tester.hasSentMessagesToLeader(i+ 4)); ++i; } @@ -2150,12 +2176,11 @@ int main(int argc, char* argv[]) switch (_testCase) { case 0: - case 13: test13_rolloverUncommittedAdvisories(); break; - case 12: test12_persistanceAcrossRollover(); break; - case 11: test11_persistanceFollower(); break; - case 10: test10_persistanceLeader(); break; - case 9: test9_apply_ClusterStateRecordCommit(); break; - case 8: test8_apply_ClusterStateRecordAck(); break; + case 12: test12_rolloverUncommittedAdvisories(); break; + case 11: test11_persistanceAcrossRollover(); break; + case 10: test10_persistanceFollower(); break; + case 9: test9_persistanceLeader(); break; + case 8: test8_apply_ClusterStateRecordCommit(); break; case 7: test7_apply_ClusterStateRecord(); break; case 6: test6_apply_LeaderAdvisory(); break; case 5: test5_apply_QueueUpdateAdvisory(); break; diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index d916424cc..c3e3c13dd 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -302,7 +302,7 @@ class ClusterStateManager { processRegistrationRequest(const bmqp_ctrlmsg::ControlMessage& message, mqbnet::ClusterNode* source) = 0; - /// Process the specified `event`. + /// Process the specified cluster state `event`. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. diff --git a/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.cpp b/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.cpp index b111b470b..df7428486 100644 --- a/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.cpp @@ -51,7 +51,6 @@ int ClusterStateLedger::applyAdvisoryInternal( ClusterStateLedger::ClusterStateLedger(mqbc::ClusterData* clusterData, bslma::Allocator* allocator) : d_allocator_p(allocator) -, d_isFirstLeaderAdvisory(true) , d_isOpen(false) , d_pauseCommitCb(false) , d_commitCb() diff --git a/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.h b/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.h index 4a29c17e8..7cb1722ff 100644 --- a/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.h +++ b/src/groups/mqb/mqbmock/mqbmock_clusterstateledger.h @@ -73,10 +73,6 @@ class ClusterStateLedger : public mqbc::ClusterStateLedger { bslma::Allocator* d_allocator_p; // Allocator used to supply memory. - bool d_isFirstLeaderAdvisory; - // Flag to indicate whether this is the first leader - // advisory. - bool d_isOpen; // Flag to indicate open/close status of this object. @@ -186,9 +182,6 @@ class ClusterStateLedger : public mqbc::ClusterStateLedger { int apply(const bdlbb::Blob& record, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; - void - setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) BSLS_KEYWORD_OVERRIDE; - /// Set the commit callback to the specified `value`. void setCommitCb(const CommitCb& value) BSLS_KEYWORD_OVERRIDE; @@ -254,19 +247,6 @@ inline bool ClusterStateLedger::isSelfLeader() const // MANIPULATORS // (virtual mqbc::ClusterStateLedger) -inline void -ClusterStateLedger::setIsFirstLeaderAdvisory(bool isFirstLeaderAdvisory) -{ - // executed by the *CLUSTER DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_clusterData_p->cluster().dispatcher()->inDispatcherThread( - &d_clusterData_p->cluster())); - - d_isFirstLeaderAdvisory = isFirstLeaderAdvisory; -} - inline void ClusterStateLedger::setCommitCb(const CommitCb& value) { d_commitCb = value;