Skip to content

Commit

Permalink
Merge branch 'main' into 678098-patch-5
Browse files Browse the repository at this point in the history
  • Loading branch information
678098 authored Oct 1, 2024
2 parents 84a3485 + 136ec3d commit 6f25240
Show file tree
Hide file tree
Showing 19 changed files with 372 additions and 381 deletions.
13 changes: 8 additions & 5 deletions src/groups/bmq/bmqt/bmqt_messageguid.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,14 @@ MessageGUIDHashAlgo::operator()(const void* data,
}
};

const bsls::Types::Uint64* start =
reinterpret_cast<const bsls::Types::Uint64*>(data);
const bsls::Types::Uint64 h1 = LocalFuncs::mix(start[0]);
const bsls::Types::Uint64 h2 = LocalFuncs::mix(start[1]);
d_result = LocalFuncs::combine(h1, h2);
// `data` buffer might not be aligned to 8 bytes, so recasting the pointer
// might lead to UB
bsls::Types::Uint64 parts[2];
bsl::memcpy(parts, data, bmqt::MessageGUID::e_SIZE_BINARY);

parts[0] = LocalFuncs::mix(parts[0]);
parts[1] = LocalFuncs::mix(parts[1]);
d_result = LocalFuncs::combine(parts[0], parts[1]);
}

inline MessageGUIDHashAlgo::result_type MessageGUIDHashAlgo::computeHash()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,7 @@ void Cluster::onRecoveryStatusDispatched(
else if (d_state.partition(pid).primaryLeaseId() <
primaryLeaseIds[pid]) {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< description() << " PartitionId [" << pid
<< description() << " Partition [" << pid
<< "]: self has higher retrieved leaseId ("
<< primaryLeaseIds[pid]
<< ") than the one notified by leader ("
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(
// partition. Log and move on.

BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " PartitionId [" << partitionId
<< " Partition [" << partitionId
<< "]: ignoring partition-primary sync notification "
<< "because there is new primary now. LeaseId in "
<< "notification: " << primaryLeaseId
Expand All @@ -356,7 +356,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(
BSLS_ASSERT_SAFE(pinfo.primaryLeaseId() > primaryLeaseId);

BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " PartitionId [" << partitionId
<< " Partition [" << partitionId
<< "]: ignoring partition-primary sync notification "
<< "because primary (self) has different leaseId. "
<< "LeaseId in notification: " << primaryLeaseId
Expand All @@ -374,7 +374,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(
// fails to transition to ACTIVE status in the stipulated time.

MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description() << " PartitionId ["
<< d_clusterData_p->identity().description() << " Partition ["
<< partitionId
<< "]: primary node (self) failed to sync partition, rc: "
<< status << ", leaseId: " << primaryLeaseId
Expand All @@ -387,7 +387,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(
}

BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " PartitionId [" << partitionId
<< " Partition [" << partitionId
<< "]: primary node (self) successfully synced the "
<< " partition. Current leaseId: " << pinfo.primaryLeaseId();

Expand Down
23 changes: 11 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ void ClusterQueueHelper::afterPartitionPrimaryAssignment(
// This routine is invoked only in the cluster nodes.

BALL_LOG_INFO << d_cluster_p->description()
<< " afterPartitionPrimaryAssignment: PartitionId ["
<< " afterPartitionPrimaryAssignment: Partition ["
<< partitionId << "]: new primary: "
<< (primary ? primary->nodeDescription() : "** none **")
<< ", primary status: " << status;
Expand Down Expand Up @@ -552,7 +552,7 @@ bool ClusterQueueHelper::onQueueUnassigning(
BALL_LOG_INFO << d_cluster_p->description()
<< " All references to queue " << uri << " with key '"
<< queueContext->key()
<< "' removed. Queue was mapped to PartitionId ["
<< "' removed. Queue was mapped to Partition ["
<< queueInfo.partitionId() << "].";

removeQueueRaw(queueContextIt);
Expand Down Expand Up @@ -788,7 +788,7 @@ void ClusterQueueHelper::onQueueContextAssigned(
pid);

logMsg << "Queue '" << queueContext->uri()
<< "' now assigned to PartitionId [" << pid << "]";
<< "' now assigned to Partition [" << pid << "]";
if (pinfo.primaryNode()) {
logMsg << " (" << pinfo.primaryNode()->nodeDescription() << ").";
}
Expand Down Expand Up @@ -3566,7 +3566,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
{
BALL_LOG_OUTPUT_STREAM
<< d_cluster_p->description()
<< ": Received state-restore event for PartitionId [";
<< ": Received state-restore event for Partition [";
if (allPartitions) {
BALL_LOG_OUTPUT_STREAM << "ALL";
}
Expand Down Expand Up @@ -3613,7 +3613,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
pinfo = &(d_clusterState_p->partition(partitionId));
BSLS_ASSERT_SAFE(pinfo);
if (!hasActiveAvailablePrimary(partitionId)) {
BALL_LOG_INFO << d_cluster_p->description() << " PartitionId ["
BALL_LOG_INFO << d_cluster_p->description() << " Partition ["
<< partitionId
<< "]: Not restoring partition state because there "
<< "is no primary or primary isn't ACTIVE. Current "
Expand Down Expand Up @@ -4154,7 +4154,7 @@ void ClusterQueueHelper::onQueueAssigned(
<< ": attempting to apply queue assignment for a known but"
<< " unassigned queue, but queueKey is not unique. "
<< "QueueKey [" << info.key() << "], URI [" << info.uri()
<< "], PartitionId [" << info.partitionId()
<< "], Partition [" << info.partitionId()
<< "]. Current leader is: '" << leaderDescription
<< "'. Ignoring this entry in the advisory."
<< MWCTSK_ALARMLOG_END;
Expand Down Expand Up @@ -4183,11 +4183,10 @@ void ClusterQueueHelper::onQueueAssigned(
MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE")
<< d_cluster_p->description()
<< ": attempting to apply queue assignment for an unknown "
<< "queue [" << info.uri() << "] assigned to PartitionId ["
<< "queue [" << info.uri() << "] assigned to Partition ["
<< info.partitionId() << "], but queueKey [" << info.key()
<< "] is not unique. "
<< " Current leader is: '" << leaderDescription << "'"
<< "Ignoring this assignment." << MWCTSK_ALARMLOG_END;
<< "] is not unique. Current leader is: '" << leaderDescription
<< "'. Ignoring this assignment." << MWCTSK_ALARMLOG_END;
return; // RETURN
}

Expand Down Expand Up @@ -5273,7 +5272,7 @@ void ClusterQueueHelper::processShutdownEvent()
BALL_LOG_INFO << d_cluster_p->description()
<< ": Deleting queue instance [" << queue->uri()
<< "], queueKey [" << queueContextSp->key()
<< "] which was assigned to PartitionId ["
<< "] which was assigned to Partition ["
<< queueContextSp->partitionId()
<< "], because self is going down.";

Expand Down Expand Up @@ -6374,7 +6373,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
BALL_LOG_INFO << d_cluster_p->description()
<< ": Garbage-collecting queue [" << uriCopy
<< "], queueKey [" << keyCopy << "] assigned to "
<< "PartitionId [" << pid << "] as it has expired.";
<< "Partition [" << pid << "] as it has expired.";

mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p,
uriCopy,
Expand Down
17 changes: 8 additions & 9 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ void ClusterStateManager::onLeaderSyncDataQueryResponse(
// See 'processPartitionPrimaryAdvisoryRaw' for similar check.

MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< " PartitionId [" << peerPinfo.partitionId()
<< d_clusterData_p->identity().description() << " Partition ["
<< peerPinfo.partitionId()
<< "]: self node views self as active/available primary, but a"
<< " different node is proposed as primary in the leader-sync "
<< "step: " << peerPinfo
Expand Down Expand Up @@ -785,8 +785,8 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw(
// 'onLeaderSyncDataQueryResponse' for similar check.

MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< " PartitionId [" << info.partitionId()
<< d_clusterData_p->identity().description() << " Partition ["
<< info.partitionId()
<< "]: self node views self as active/available primary, but a"
<< " different node is proposed as primary in the "
<< "partition/primary mapping: " << info << ". This downgrade "
Expand Down Expand Up @@ -1658,12 +1658,11 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
<< ": overwriting current known queue state "
<< "with the buffered advisory for queue ["
<< qcit->second->uri()
<< "]. Current assigned PartitionId ["
<< "]. Current assigned Partition ["
<< qcit->second->partitionId()
<< "], current queueKey [" << qcit->second->key()
<< "], new PartitionId ["
<< queueInfo.partitionId() << "], new queueKey ["
<< queueKey << "].";
<< "], new Partition [" << queueInfo.partitionId()
<< "], new queueKey [" << queueKey << "].";
}

// Remove existing state, mapping, etc.
Expand Down Expand Up @@ -1723,7 +1722,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
<< (delayed ? " buffered " : " ")
<< " queueAssignmentAdvisory from leader ["
<< source->nodeDescription() << "] for an unknown queue ["
<< uri << "] assigned to PartitionId ["
<< uri << "] assigned to Partition ["
<< queueInfo.partitionId() << "], but queueKey ["
<< queueKey << "] is not unique. Ignoring this entry in "
<< "the advisory." << MWCTSK_ALARMLOG_END;
Expand Down
Loading

0 comments on commit 6f25240

Please sign in to comment.