From a15a5baab499143c17c4b1af618b8e58a1fa470d Mon Sep 17 00:00:00 2001 From: Chris Beard Date: Mon, 30 Sep 2024 12:25:21 -0400 Subject: [PATCH 1/3] Fix[MQB]: Normalize "PartitionID [n]" to "Partition [n]" in logs (#424) * Fix[MQB]: Normalize "PartitionID [n]" to "Partition [n]" in logs Signed-off-by: Christopher Beard * clang-format Signed-off-by: Christopher Beard * Update src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp Co-authored-by: Evgeny Malygin <678098@protonmail.com> Signed-off-by: Chris Beard * clang-format Signed-off-by: Christopher Beard --------- Signed-off-by: Christopher Beard Co-authored-by: Evgeny Malygin <678098@protonmail.com> --- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 2 +- .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 8 +- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 23 +- .../mqb/mqbblp/mqbblp_clusterstatemanager.cpp | 17 +- .../mqb/mqbblp/mqbblp_recoverymanager.cpp | 333 +++++++++--------- src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 6 +- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 46 +-- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 15 +- .../mqb/mqbc/mqbc_partitionstatetable.h | 3 +- src/groups/mqb/mqbc/mqbc_recoveryutil.cpp | 2 +- src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp | 2 +- src/groups/mqb/mqbc/mqbc_storageutil.cpp | 219 ++++++------ src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp | 6 +- .../mqb/mqbs/mqbs_filebackedstorage.cpp | 25 +- src/groups/mqb/mqbs/mqbs_filestore.cpp | 2 +- src/groups/mqb/mqbs/mqbs_filestoreutil.cpp | 24 +- src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp | 2 +- src/groups/mqb/mqbs/mqbs_storageprintutil.cpp | 4 +- 18 files changed, 363 insertions(+), 376 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index e7a143567..4941b727a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -2184,7 +2184,7 @@ void Cluster::onRecoveryStatusDispatched( else if (d_state.partition(pid).primaryLeaseId() < primaryLeaseIds[pid]) { MWCTSK_ALARMLOG_ALARM("CLUSTER") - << description() << " PartitionId [" << pid + << description() << " Partition [" << pid << "]: self has higher retrieved leaseId (" << primaryLeaseIds[pid] << ") than the one notified by leader (" diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index 6ebb52e5f..3def3d866 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -332,7 +332,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched( // partition. Log and move on. BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: ignoring partition-primary sync notification " << "because there is new primary now. LeaseId in " << "notification: " << primaryLeaseId @@ -356,7 +356,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched( BSLS_ASSERT_SAFE(pinfo.primaryLeaseId() > primaryLeaseId); BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: ignoring partition-primary sync notification " << "because primary (self) has different leaseId. " << "LeaseId in notification: " << primaryLeaseId @@ -374,7 +374,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched( // fails to transition to ACTIVE status in the stipulated time. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: primary node (self) failed to sync partition, rc: " << status << ", leaseId: " << primaryLeaseId @@ -387,7 +387,7 @@ void ClusterOrchestrator::onPartitionPrimaryStatusDispatched( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: primary node (self) successfully synced the " << " partition. Current leaseId: " << pinfo.primaryLeaseId(); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index d25d6ec68..f8fa9467d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -348,7 +348,7 @@ void ClusterQueueHelper::afterPartitionPrimaryAssignment( // This routine is invoked only in the cluster nodes. BALL_LOG_INFO << d_cluster_p->description() - << " afterPartitionPrimaryAssignment: PartitionId [" + << " afterPartitionPrimaryAssignment: Partition [" << partitionId << "]: new primary: " << (primary ? primary->nodeDescription() : "** none **") << ", primary status: " << status; @@ -552,7 +552,7 @@ bool ClusterQueueHelper::onQueueUnassigning( BALL_LOG_INFO << d_cluster_p->description() << " All references to queue " << uri << " with key '" << queueContext->key() - << "' removed. Queue was mapped to PartitionId [" + << "' removed. Queue was mapped to Partition [" << queueInfo.partitionId() << "]."; removeQueueRaw(queueContextIt); @@ -788,7 +788,7 @@ void ClusterQueueHelper::onQueueContextAssigned( pid); logMsg << "Queue '" << queueContext->uri() - << "' now assigned to PartitionId [" << pid << "]"; + << "' now assigned to Partition [" << pid << "]"; if (pinfo.primaryNode()) { logMsg << " (" << pinfo.primaryNode()->nodeDescription() << ")."; } @@ -3566,7 +3566,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId) { BALL_LOG_OUTPUT_STREAM << d_cluster_p->description() - << ": Received state-restore event for PartitionId ["; + << ": Received state-restore event for Partition ["; if (allPartitions) { BALL_LOG_OUTPUT_STREAM << "ALL"; } @@ -3613,7 +3613,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId) pinfo = &(d_clusterState_p->partition(partitionId)); BSLS_ASSERT_SAFE(pinfo); if (!hasActiveAvailablePrimary(partitionId)) { - BALL_LOG_INFO << d_cluster_p->description() << " PartitionId [" + BALL_LOG_INFO << d_cluster_p->description() << " Partition [" << partitionId << "]: Not restoring partition state because there " << "is no primary or primary isn't ACTIVE. Current " @@ -4154,7 +4154,7 @@ void ClusterQueueHelper::onQueueAssigned( << ": attempting to apply queue assignment for a known but" << " unassigned queue, but queueKey is not unique. " << "QueueKey [" << info.key() << "], URI [" << info.uri() - << "], PartitionId [" << info.partitionId() + << "], Partition [" << info.partitionId() << "]. Current leader is: '" << leaderDescription << "'. Ignoring this entry in the advisory." << MWCTSK_ALARMLOG_END; @@ -4183,11 +4183,10 @@ void ClusterQueueHelper::onQueueAssigned( MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") << d_cluster_p->description() << ": attempting to apply queue assignment for an unknown " - << "queue [" << info.uri() << "] assigned to PartitionId [" + << "queue [" << info.uri() << "] assigned to Partition [" << info.partitionId() << "], but queueKey [" << info.key() - << "] is not unique. " - << " Current leader is: '" << leaderDescription << "'" - << "Ignoring this assignment." << MWCTSK_ALARMLOG_END; + << "] is not unique. Current leader is: '" << leaderDescription + << "'. Ignoring this assignment." << MWCTSK_ALARMLOG_END; return; // RETURN } @@ -5273,7 +5272,7 @@ void ClusterQueueHelper::processShutdownEvent() BALL_LOG_INFO << d_cluster_p->description() << ": Deleting queue instance [" << queue->uri() << "], queueKey [" << queueContextSp->key() - << "] which was assigned to PartitionId [" + << "] which was assigned to Partition [" << queueContextSp->partitionId() << "], because self is going down."; @@ -6374,7 +6373,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate) BALL_LOG_INFO << d_cluster_p->description() << ": Garbage-collecting queue [" << uriCopy << "], queueKey [" << keyCopy << "] assigned to " - << "PartitionId [" << pid << "] as it has expired."; + << "Partition [" << pid << "] as it has expired."; mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p, uriCopy, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 7f24cf09b..e215d3ac7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -531,8 +531,8 @@ void ClusterStateManager::onLeaderSyncDataQueryResponse( // See 'processPartitionPrimaryAdvisoryRaw' for similar check. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << " PartitionId [" << peerPinfo.partitionId() + << d_clusterData_p->identity().description() << " Partition [" + << peerPinfo.partitionId() << "]: self node views self as active/available primary, but a" << " different node is proposed as primary in the leader-sync " << "step: " << peerPinfo @@ -785,8 +785,8 @@ void ClusterStateManager::processPartitionPrimaryAdvisoryRaw( // 'onLeaderSyncDataQueryResponse' for similar check. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << " PartitionId [" << info.partitionId() + << d_clusterData_p->identity().description() << " Partition [" + << info.partitionId() << "]: self node views self as active/available primary, but a" << " different node is proposed as primary in the " << "partition/primary mapping: " << info << ". This downgrade " @@ -1658,12 +1658,11 @@ void ClusterStateManager::processQueueAssignmentAdvisory( << ": overwriting current known queue state " << "with the buffered advisory for queue [" << qcit->second->uri() - << "]. Current assigned PartitionId [" + << "]. Current assigned Partition [" << qcit->second->partitionId() << "], current queueKey [" << qcit->second->key() - << "], new PartitionId [" - << queueInfo.partitionId() << "], new queueKey [" - << queueKey << "]."; + << "], new Partition [" << queueInfo.partitionId() + << "], new queueKey [" << queueKey << "]."; } // Remove existing state, mapping, etc. @@ -1723,7 +1722,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory( << (delayed ? " buffered " : " ") << " queueAssignmentAdvisory from leader [" << source->nodeDescription() << "] for an unknown queue [" - << uri << "] assigned to PartitionId [" + << uri << "] assigned to Partition [" << queueInfo.partitionId() << "], but queueKey [" << queueKey << "] is not unique. Ignoring this entry in " << "the advisory." << MWCTSK_ALARMLOG_END; diff --git a/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp b/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp index 56e664a84..f6c0842a3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp @@ -102,7 +102,7 @@ void movePartitionFiles(int partitionId, BALL_LOG_SET_CATEGORY(k_LOG_CATEGORY); - BALL_LOG_INFO << "For PartitionId [" << partitionId + BALL_LOG_INFO << "For Partition [" << partitionId << "], will archive all files."; bsl::vector fileSets; @@ -111,14 +111,14 @@ void movePartitionFiles(int partitionId, currentLocation, partitionId); if (0 != rc) { - BALL_LOG_WARN << "For PartitionId [" << partitionId + BALL_LOG_WARN << "For Partition [" << partitionId << "], failed to find file sets at location [" << currentLocation << "], rc: " << rc; return; // RETURN } for (unsigned int i = 0; i < fileSets.size(); ++i) { - BALL_LOG_INFO << "For PartitionId [" << partitionId + BALL_LOG_INFO << "For Partition [" << partitionId << "], archiving file set: " << fileSets[i]; mqbs::FileSystemUtil::move(fileSets[i].dataFile(), archiveLocation); mqbs::FileSystemUtil::move(fileSets[i].journalFile(), archiveLocation); @@ -289,7 +289,7 @@ void RecoveryManager_ChunkDeleter::operator()( &fti->qlistFd()); if (rc != 0) { // Failed to close one or more partition files - BALL_LOG_ERROR << "For PartitionId [" + BALL_LOG_ERROR << "For Partition [" << d_requestContext_p->partitionId() << "], failed" << " to close one or more partition files " << "[journalFd: " << fti->journalFd().fd() @@ -409,7 +409,7 @@ void RecoveryManager::recoveryStartupWaitDispatched(int partitionId) // No peer is AVAILABLE. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "], no peers are AVAILABLE after waiting for approx. " << d_clusterConfig.partitionConfig() .syncConfig() @@ -477,7 +477,7 @@ void RecoveryManager::recoveryStartupWaitPartitionDispatched( if (0 != recoveryCtx.oldSyncPointOffset()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "], no sync point has been received during " << "recovery wait-time and no peers are available. " << "Partition will be truncated to the last syncPt " @@ -491,7 +491,7 @@ void RecoveryManager::recoveryStartupWaitPartitionDispatched( } else { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "], no sync point has been received during " << "recovery wait-time and no peers are available. " << "No syncPt is present in the local journal, and " @@ -517,7 +517,7 @@ void RecoveryManager::recoveryStartupWaitPartitionDispatched( recoveryCtx.setRecoveryPeer(peer); BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "], sending storage sync request to an AVAILABLE peer: " << peer->nodeDescription() << ", after no sync point was received during recovery " @@ -562,7 +562,7 @@ void RecoveryManager::recoveryStatusDispatched(int partitionId) } MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << ": For PartitionId [" + << d_clusterData_p->identity().description() << ": For Partition [" << partitionId << "], recovery not completed " << "after maximum stipulated time of " << d_clusterConfig.partitionConfig() @@ -614,7 +614,7 @@ void RecoveryManager::primarySyncStatusDispatched(int partitionId) } MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << ": For PartitionId [" + << d_clusterData_p->identity().description() << ": For Partition [" << partitionId << "], primary sync not " << "completed after maximum stipulated time of " << d_clusterConfig.partitionConfig() @@ -656,7 +656,7 @@ void RecoveryManager::onNodeDownDispatched(int partitionId, if (recoveryCtx.inRecovery() && recoveryCtx.recoveryPeer() == node) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId << "]: peer " + << ": Partition [" << partitionId << "]: peer " << node->nodeDescription() << ", which was serving storage sync request to self " << "node, has gone down. Notifying of partition recovery" @@ -666,7 +666,7 @@ void RecoveryManager::onNodeDownDispatched(int partitionId, else if (primarySyncCtx.primarySyncInProgress() && primarySyncCtx.syncPeer() == node) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId << "]: peer " + << ": Partition [" << partitionId << "]: peer " << node->nodeDescription() << ", which was serving primary-partition sync request " << "to self node, has gone down. Notifying of partition " @@ -699,7 +699,7 @@ void RecoveryManager::partitionSyncCleanupDispatched(int partitionId) &fti.qlistFd()); if (rc != 0) { // Failed to close one or more partition files - BALL_LOG_ERROR << "For PartitionId [" << partitionId << "], failed" + BALL_LOG_ERROR << "For Partition [" << partitionId << "], failed" << " to close one or more partition files " << "[journalFd: " << fti.journalFd().fd() << ", dataFd: " << fti.dataFd().fd() @@ -771,8 +771,8 @@ void RecoveryManager::sendStorageSyncRequesterHelper(RecoveryContext* context, // 'onStorageSyncResponse' won't be invoked in this case). MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], failed to send " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], failed to send " << "storage sync request to node " << context->recoveryPeer()->nodeDescription() << ", rc: " << status << ". No retry attempt will be made." << MWCTSK_ALARMLOG_END; @@ -868,8 +868,8 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (!recoveryCtx.inRecovery()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: " - << "received storage sync response from: " + << " Partition [" << partitionId + << "]: " << "received storage sync response from: " << responder->nodeDescription() << " for request: " << req << ", but partition is no " << "longer in recovery (probably due to recovery " @@ -887,9 +887,9 @@ void RecoveryManager::onStorageSyncResponseDispatched( // stopping. No need to retry. BALL_LOG_INFO - << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: " - << "storage sync request: " << req << " cancelled because self" + << d_clusterData_p->identity().description() << " Partition [" + << partitionId << "]: " << "storage sync request: " << req + << " cancelled because self" << " node is stopping. No retry attempt will be made."; onPartitionRecoveryStatus(partitionId, -1 /* status */); @@ -904,7 +904,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (maxAttempts == recoveryCtx.numAttempts()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received storage sync failure response: " << context->response().choice().status() << " from node: " << responder->nodeDescription() @@ -931,7 +931,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( // SyncPt). BALL_LOG_ERROR << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received storage sync failure response: " << context->response().choice().status() << " from node: " << responder->nodeDescription() @@ -968,8 +968,8 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (response.partitionId() != partitionId) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], invalid partitionId" + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], invalid partitionId" << " specified in storage sync response: " << response << " from node: " << responder->nodeDescription() << " for storage sync request: " << req @@ -986,8 +986,8 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (response.storageSyncResponseType() == bmqp_ctrlmsg::StorageSyncResponseType::E_UNDEFINED) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], invalid " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], invalid " << "storage-sync response type specified: " << response << " from node: " << responder->nodeDescription() << " for storage sync request: " << req @@ -1002,7 +1002,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received storage sync response: " << response << " from node: " << responder->nodeDescription(); @@ -1037,7 +1037,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (endSp <= beginSp) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received incorrect sync points in storage sync response" << " type: " << rtype << ". Begin sync point: " << beginSp << ", end sync point: " << endSp << MWCTSK_ALARMLOG_END; @@ -1053,7 +1053,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( endSp < recoveryCtx.newSyncPoint())) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], received " + << ": For Partition [" << partitionId << "], received " << "incorrect end sync point in storage sync response type: " << rtype << ". End sync point: " << endSp << ", 'B' sync point: " << recoveryCtx.newSyncPoint() @@ -1069,7 +1069,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (endSp < recoveryCtx.oldSyncPoint()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], received " + << ": For Partition [" << partitionId << "], received " << "incorrect end sync point in storage sync response type: " << rtype << ". End sync point: " << endSp << ", 'A' sync point: " << recoveryCtx.oldSyncPoint() @@ -1099,7 +1099,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (rc != 0) { // Failed to close one or more partition files BALL_LOG_ERROR - << "For PartitionId [" << partitionId << "], " + << "For Partition [" << partitionId << "], " << "failed to close one or more partition " << "files [journal: " << recoveryCtx.fileSet().journalFile() @@ -1110,7 +1110,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], will archive all files."; bsl::vector fileSets; @@ -1121,14 +1121,14 @@ void RecoveryManager::onStorageSyncResponseDispatched( partitionId); if (rc != 0) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], failed to find file sets at location [" << d_dataStoreConfig.location() << "], rc: " << rc; } else { for (unsigned int i = 0; i < fileSets.size(); ++i) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], archiving file set: " << fileSets[i]; mqbs::FileSystemUtil::move( fileSets[i].dataFile(), @@ -1191,7 +1191,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " + << ": For Partition [" << partitionId << "], " << "failed to create file set: " << fileSet << ", rc: " << rc << ", reason: " << errorDesc.str() << ", during recovery." << MWCTSK_ALARMLOG_END; @@ -1222,7 +1222,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (endSp <= beginSp) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], received " + << ": For Partition [" << partitionId << "], received " << "incorrect sync points in storage sync response type: " << rtype << ". Begin sync point: " << beginSp << ", end sync point: " << endSp << MWCTSK_ALARMLOG_END; @@ -1237,7 +1237,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (beginSp != recoveryCtx.oldSyncPoint()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], received " + << ": For Partition [" << partitionId << "], received " << "incorrect sync points in storage sync response type: " << rtype << ". Begin sync point: " << beginSp << ", 'A' sync point: " << recoveryCtx.oldSyncPoint() @@ -1251,7 +1251,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( (recoveryCtx.newSyncPoint() != endSp)) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], received " + << ": For Partition [" << partitionId << "], received " << "incorrect sync points in storage sync response type: " << rtype << ". End sync point: " << endSp << ", 'B' sync point: " << recoveryCtx.newSyncPoint() @@ -1278,7 +1278,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (rc != 0) { // Failed to close one or more partition files BALL_LOG_ERROR - << "For PartitionId [" << partitionId << "], " + << "For Partition [" << partitionId << "], " << "failed to close one or more partition " << "files [journal: " << recoveryCtx.fileSet().journalFile() @@ -1306,7 +1306,7 @@ void RecoveryManager::onStorageSyncResponseDispatched( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], failed to open file set: " << recoveryCtx.fileSet() << ", rc: " << rc << ", reason: " << errorDesc.str() << ", during recovery." << MWCTSK_ALARMLOG_END; @@ -1350,9 +1350,9 @@ void RecoveryManager::onPartitionRecoveryStatus(int partitionId, int status) BSLS_ASSERT_SAFE(recoveryCtx.dataFd().isValid()); BALL_LOG_INFO - << d_clusterData_p->identity().description() << " PartitionId [" - << partitionId << "]: truncating & closing partition with sizes: " - << "journal: " + << d_clusterData_p->identity().description() << " Partition [" + << partitionId + << "]: truncating & closing partition with sizes: " << "journal: " << mwcu::PrintUtil::prettyNumber(static_cast( recoveryCtx.journalFileOffset())) << ", data: " @@ -1381,14 +1381,13 @@ void RecoveryManager::onPartitionRecoveryStatus(int partitionId, int status) &recoveryCtx.qlistFd()); if (rc != 0) { // Failed to close one or more partition files - BALL_LOG_ERROR << "For PartitionId [" << partitionId << "], " + BALL_LOG_ERROR << "For Partition [" << partitionId << "], " << "failed to close one or more partition " << "files [journal: " << recoveryCtx.fileSet().journalFile() << ", data: " << recoveryCtx.fileSet().dataFile() << ", qlist: " << recoveryCtx.fileSet().qlistFile() - << "]" - << "rc: " << rc; + << "]" << "rc: " << rc; } } @@ -1451,8 +1450,8 @@ void RecoveryManager::stopDispatched(int partitionId, bslmt::Latch* latch) if (recoveryContext.inRecovery()) { BALL_LOG_WARN - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], stopping recovery while it is in progress with " << (recoveryContext.recoveryPeer() ? recoveryContext.recoveryPeer()->nodeDescription() @@ -1465,7 +1464,7 @@ void RecoveryManager::stopDispatched(int partitionId, bslmt::Latch* latch) if (primarySyncCtx.primarySyncInProgress()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " + << ": For Partition [" << partitionId << "], " << "stopping primary sync while it's in progress with " << (primarySyncCtx.syncPeer() ? primarySyncCtx.syncPeer()->nodeDescription() @@ -1560,7 +1559,7 @@ int RecoveryManager::sendFile(RequestContext* context, if (bmqt::GenericResult::e_SUCCESS != writeRc) { BALL_LOG_ERROR << "Failed to write " << chunkFileType << " file chunk with sequence # " << sequenceNumber - << " for PartitionId [" << context->partitionId() + << " for Partition [" << context->partitionId() << "] to peer node: " << context->requesterNode()->nodeDescription() << ", rc: " << writeRc; @@ -1608,7 +1607,7 @@ int RecoveryManager::sendFile(RequestContext* context, if (bmqt::GenericResult::e_SUCCESS != writeRc) { BALL_LOG_ERROR << "Failed to write " << chunkFileType << " file chunk with sequence # " << sequenceNumber - << " for PartitionId [" << context->partitionId() << "]" + << " for Partition [" << context->partitionId() << "]" << " to peer node: " << context->requesterNode()->nodeDescription() << ", rc: " << writeRc; @@ -1819,7 +1818,7 @@ int RecoveryManager::replayPartition( if (currentSeqNum != toSequenceNum) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: incomplete replay of partition. Sequence number " << "of last record sent: " << currentSeqNum << ", was supposed to send up to: " << toSequenceNum @@ -1865,7 +1864,7 @@ void RecoveryManager::syncPeerPartitions(PrimarySyncContext* primarySyncCtx) if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << pid << "]: Failed to open JOURNAL/QLIST/DATA file, rc: " << rc << ", reason [" << errorDesc.str() << "] while new primary (self)" << " is initiating partition-sync with peers." @@ -1891,7 +1890,7 @@ void RecoveryManager::syncPeerPartitions(PrimarySyncContext* primarySyncCtx) if (!pps.needsPartitionSync()) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: skipping partition-sync with peer: " << pps.peer()->nodeDescription() << ". Peer's partition sequence num: " @@ -1902,15 +1901,15 @@ void RecoveryManager::syncPeerPartitions(PrimarySyncContext* primarySyncCtx) rc = syncPeerPartition(primarySyncCtx, pps); if (0 == rc) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: new primary (self) successfully synced " << "partition with peer: " << pps.peer()->nodeDescription(); } else { MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << d_clusterData_p->identity().description() << " Partition [" + << pid << "]: new primary (self) failed to sync partition with peer: " << pps.peer()->nodeDescription() << ", rc: " << rc << "." << MWCTSK_ALARMLOG_END; @@ -1953,7 +1952,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, if (selfSequenceNum < ppState.partitionSequenceNum()) { // TBD: assert? BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: new primary (self) has smaller sequence number: " << selfSequenceNum << ", than " << ppState.partitionSequenceNum() @@ -1967,7 +1966,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // sequence numbers *must* match. TBD: assert? BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: new primary (self) has different sequence " << "number: " << selfSequenceNum << ", than " << ppState.partitionSequenceNum() @@ -1981,7 +1980,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, if (selfSequenceNum == ppState.partitionSequenceNum()) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: new primary (self) has same sequence " << "number: " << selfSequenceNum << ", as: " << ppState.partitionSequenceNum() @@ -2004,7 +2003,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // not handled. BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << pid + << " Partition [" << pid << "]: partition sync from archived files not yet " << "supported, for peer: " << ppState.peer()->nodeDescription() @@ -2033,7 +2032,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // represents a bogus sync point. BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << pid << "]: Last sync point: " + << " Partition [" << pid << "]: Last sync point: " << ppState.lastSyncPointOffsetPair() << ", of peer: " << ppState.peer()->nodeDescription() << ", while syncing partition."; @@ -2051,7 +2050,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << pid << "] Encountered invalid sync point JOURNAL offset: " << journalSpOffset << ", JOURNAL size: " << fti.journalFd().fileSize() @@ -2085,7 +2084,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << pid << "]: Invalid QLIST offset in sync point: " << spOffsetPair.syncPoint() << ". Current QLIST file size: " << fti.qlistFd().fileSize() @@ -2103,7 +2102,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << pid << "]: Invalid DATA offset in sync point: " << spOffsetPair.syncPoint() << ". Current DATA file size: " << fti.dataFd().fileSize() @@ -2121,7 +2120,7 @@ int RecoveryManager::syncPeerPartition(PrimarySyncContext* primarySyncCtx, journalSpOffset); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << pid << "]: New primary (self) failed to " << "replay partition while syncing partition in peer: " << ppState.peer()->nodeDescription() << ", rc: " << rc @@ -2177,7 +2176,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]" + << ": For Partition [" << partitionId << "]" << ", failed to load JOURNAL_OP record position in storage " << "message from " << source->nodeDescription() << ", rc: " << rc << ". Ignoring this message." @@ -2196,7 +2195,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, // Should never happen. MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]" + << ": For Partition [" << partitionId << "]" << ", failed to load JournalOp record in storage message from " << source->nodeDescription() << ". Ignoring this message." << MWCTSK_ALARMLOG_END; @@ -2213,7 +2212,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, BALL_LOG_ERROR << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]" + << ": For Partition [" << partitionId << "]" << ", received a SyncPt with invalid sequence number in the " << "RecordHeader: (" << syncPointRecHeader->primaryLeaseId() << ", " << syncPointRecHeader->sequenceNumber() << "), from " @@ -2227,7 +2226,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]" + << ": For Partition [" << partitionId << "]" << ", received a JOURNAL_OP record of type " << journalOpRec->type() << " (expected type SYNCPOINT) in storage message with " @@ -2266,7 +2265,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, // This indicates bug in BlazingMQ replication logic. BALL_LOG_ERROR << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]" + << ": For Partition [" << partitionId << "]" << ", received a SyncPt JOURNAL_OP record " << "with invalid primaryLeaseId in RecordHeader." << " Sequence number in RecordHeader (" @@ -2319,7 +2318,7 @@ bool RecoveryManager::hasSyncPoint(bmqp_ctrlmsg::SyncPoint* syncPoint, if (mqbs::SyncPointType::e_ROLLOVER == journalOpRec->syncPointType()) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "] :" + << " Partition [" << partitionId << "] :" << "received a rolled-over SyncPt, but skipping it. " << "Sequence number in RecordHeader (" << syncPointRecHeader->primaryLeaseId() << ", " @@ -2417,7 +2416,7 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( primarySyncCtx.selfLastSyncPtOffsetPair().syncPoint(); BALL_LOG_DEBUG << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: processing " + << " Partition [" << partitionId << "]: processing " << pairs.size() << " partition sync state query responses"; for (NodeResponsePairsConstIter it = pairs.begin(); it != pairs.end(); @@ -2439,7 +2438,7 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( if (it->second.choice().isStatusValue()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Received failed partition sync state query " << "response " << it->second.choice().status() << " from " << it->first->nodeDescription() @@ -2466,7 +2465,7 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( if (partitionId != r.partitionId()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Invalid partitionId specified in partition " << "primary sync query response: " << r.partitionId() << ", from " << it->first->nodeDescription(); @@ -2480,11 +2479,11 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( // has other issues. TBD: self should attempt to bring this node // upto speed. BALL_LOG_WARN - << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << d_clusterData_p->identity().description() << " Partition [" + << partitionId << "]: Invalid primaryLeaseId specified in partition " - << "primary sync query response" - << ", from " << it->first->nodeDescription(); + << "primary sync query response" << ", from " + << it->first->nodeDescription(); ppState->setNeedsPartitionSync(false); continue; // CONTINUE @@ -2522,7 +2521,7 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( if (maxSeqNode == d_clusterData_p->membership().selfNode()) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Self node has latest view of partition during " << "primary sync, with sequence number " << maxSeq << ". Self node will now attempt to sync replica peers."; @@ -2568,7 +2567,7 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( // One of the peers is ahead. BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: Peer " + << " Partition [" << partitionId << "]: Peer " << maxSeqNode->nodeDescription() << " has most advanced view of the partition during primary " << "sync " << maxSeq @@ -2623,8 +2622,8 @@ void RecoveryManager::onPartitionSyncStateQueryResponseDispatched( // 'onPartitionSyncDataQueryResponse' won't be invoked in this case). MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], failed to send " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], failed to send " << "partition sync data query to node " << maxSeqNode->nodeDescription() << ", rc: " << status << ". No retry attempt will be made." << MWCTSK_ALARMLOG_END; @@ -2690,9 +2689,9 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( if (context->response().choice().isStatusValue()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << " PartitionId [" - << partitionId << "]: " - << "received partition sync data query failure response: " + << d_clusterData_p->identity().description() << " Partition [" + << partitionId + << "]: " << "received partition sync data query failure response: " << context->response().choice().status() << " from node: " << responder->nodeDescription() << " for request: " << req << ". No retry attempt will be made." @@ -2721,7 +2720,7 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( if (response.partitionId() != partitionId) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " << "invalid partitionId specified in response: " << response << " from node: " << responder->nodeDescription() @@ -2737,7 +2736,7 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( if (!primarySyncCtx.primarySyncInProgress()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: " + << " Partition [" << partitionId << "]: " << "received partition sync data response from: " << responder->nodeDescription() << " for request: " << req << ", but partition is no " @@ -2748,8 +2747,8 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( if (primarySyncCtx.syncPeer() != responder) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId << "]: " - << "received partition sync data response from " + << " Partition [" << partitionId + << "]: " << "received partition sync data response from " << "unexpected node: " << responder->nodeDescription() << " for request: " << req << ", expected node: " << primarySyncCtx.syncPeer()->nodeDescription() @@ -2758,7 +2757,7 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received partition sync data response: " << response << " from node: " << responder->nodeDescription(); @@ -2768,9 +2767,9 @@ void RecoveryManager::onPartitionSyncDataQueryResponseDispatched( if (peerPartitionSeqNum <= primarySyncCtx.selfPartitionSequenceNum()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << " PartitionId [" - << partitionId << "]: " - << "invalid partition sequenceNum specified in response: " + << d_clusterData_p->identity().description() << " Partition [" + << partitionId + << "]: " << "invalid partition sequenceNum specified in response: " << response << " from node: " << responder->nodeDescription() << " for partition sync data request: " << req << ". Self partition seqNum: " @@ -2950,7 +2949,7 @@ void RecoveryManager::startRecovery( bsl::rand() % 5000; BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], will check after " << startupWaitMs << " millisec " << "if any sync point has been received by then."; @@ -2987,8 +2986,8 @@ void RecoveryManager::startRecovery( if ((rc != 0) && (rc != 1)) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], failed to find or " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], failed to find or " << "open a recoverable file set, rc: " << rc << ", reason: " << errorDesc.str() << ". Recovery will proceed as if this node had no local " @@ -3000,7 +2999,7 @@ void RecoveryManager::startRecovery( if (rc == 1) { // Special 'rc' implying no file sets present => no sync point. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], no recoverable file sets found, which implies no " << "last sync point."; @@ -3008,7 +3007,7 @@ void RecoveryManager::startRecovery( } BALL_LOG_INFO << d_clusterData_p->identity().description() << ": For" - << " PartitionId [" << partitionId << "], file set opened" + << " Partition [" << partitionId << "], file set opened" << " for recovery: " << recoveryCtx.fileSet(); mqbs::JournalFileIterator jit; @@ -3028,7 +3027,7 @@ void RecoveryManager::startRecovery( &recoveryCtx.qlistFd()); if (rc != 0) { // Failed to close one or more partition files - BALL_LOG_ERROR << "For PartitionId [" << partitionId << "], failed" + BALL_LOG_ERROR << "For Partition [" << partitionId << "], failed" << " to close one or more partition files " << "[journal: " << recoveryCtx.fileSet().journalFile() @@ -3038,8 +3037,8 @@ void RecoveryManager::startRecovery( } MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], " << "failed to load iterator(s) for recoverable file set, " << "rc: " << rc << ", reason: " << errorDesc.str() << ". Recovery will proceed as if this node had no local " @@ -3055,7 +3054,7 @@ void RecoveryManager::startRecovery( // Nothing to retrieve from local storage. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], no sync point found in local storage. Recovery " << "will proceed as if this node had no local " << "recoverable files for this partition."; @@ -3063,7 +3062,7 @@ void RecoveryManager::startRecovery( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Potential last sync point offset retrieved at: " << mwcu::PrintUtil::prettyNumber( static_cast(lastSyncPointOffset)); @@ -3075,8 +3074,8 @@ void RecoveryManager::startRecovery( if (mqbs::SyncPointType::e_UNDEFINED == journalOpRec.syncPointType()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], last sync point has" + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], last sync point has" << " invalid SyncPt sub-type: " << journalOpRec.syncPointType() << ". Ignoring this sync point. Recovery will proceed as if this " << "node had no local recoverable files for this partition." @@ -3092,7 +3091,7 @@ void RecoveryManager::startRecovery( // altogether. BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Last sync point retrieved during recovery has " << "sub-type [" << journalOpRec.syncPointType() << "]. Recovery will proceed as if this node had no " @@ -3103,8 +3102,8 @@ void RecoveryManager::startRecovery( if (0 == journalOpRec.primaryLeaseId() || 0 == journalOpRec.sequenceNum()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], " << "last sync point has invalid primaryLeaseId: " << journalOpRec.primaryLeaseId() << " or sequenceNum: " << journalOpRec.sequenceNum() @@ -3121,8 +3120,8 @@ void RecoveryManager::startRecovery( if (bdls::FilesystemUtil::getFileSize(recoveryCtx.fileSet().dataFile()) < static_cast(dataFileOffset)) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], data file size is " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], data file size is " << "smaller than data file offset present in last sync point [" << bdls::FilesystemUtil::getFileSize( recoveryCtx.fileSet().dataFile()) @@ -3139,8 +3138,8 @@ void RecoveryManager::startRecovery( if (bdls::FilesystemUtil::getFileSize(recoveryCtx.fileSet().qlistFile()) < static_cast(qlistFileOffset)) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], QLIST file size is " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], QLIST file size is " << "smaller than QLIST file offset present in last sync point [" << bdls::FilesystemUtil::getFileSize( recoveryCtx.fileSet().qlistFile()) @@ -3165,7 +3164,7 @@ void RecoveryManager::startRecovery( recoveryCtx.setQlistFileOffset(qlistFileOffset); BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], retrieved " + << ": For Partition [" << partitionId << "], retrieved " << "old SyncPt at journal offset: " << mwcu::PrintUtil::prettyNumber( static_cast(lastSyncPointOffset)) @@ -3205,14 +3204,14 @@ void RecoveryManager::processStorageEvent( recoveryCtx.addStorageEvent(blob); BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], buffered a storage event from source node " << source->nodeDescription() << " as self is syncing storage with that node."; } else { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received storage event from source node " << source->nodeDescription() << ", while self is " << "syncing storage with " @@ -3249,7 +3248,7 @@ void RecoveryManager::processStorageEvent( // position. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received sync point " << syncPoint << " from node " << source->nodeDescription() << ". Primary's journal offset: " << primaryJournalOffset @@ -3285,7 +3284,7 @@ void RecoveryManager::processStorageEvent( if (bmqt::EventBuilderResult::e_SUCCESS != buildRc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " + << ": For Partition [" << partitionId << "], " << "failed to build first buffered storage event after " << "encountering sync point: " << syncPoint << " from node: " << source->nodeDescription() @@ -3312,7 +3311,7 @@ void RecoveryManager::processStorageEvent( recoveryCtx.setRecoveryPeer(source); BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received sync point " << syncPoint << " from node " << source->nodeDescription() << ". Sending storage sync request to this node."; @@ -3337,9 +3336,8 @@ void RecoveryManager::processRecoveryEvent( if (source != recoveryCtx.recoveryPeer()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId - << "], received recovery event from node " + << d_clusterData_p->identity().description() << ": For Partition [" + << partitionId << "], received recovery event from node " << source->nodeDescription() << ", which is not identified as recovery peer node " << (recoveryCtx.recoveryPeer() @@ -3385,7 +3383,7 @@ void RecoveryManager::processRecoveryEvent( if (header.fileChunkType() != recoveryCtx.expectedChunkFileType()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received incorrect file chunk type: " << header.fileChunkType() << ", expected: " << recoveryCtx.expectedChunkFileType() @@ -3408,7 +3406,7 @@ void RecoveryManager::processRecoveryEvent( if (header.chunkSequenceNumber() != expectedSeqNum) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], received incorrect chunk sequence " << "number: " << header.chunkSequenceNumber() << ", expected: " << expectedSeqNum @@ -3431,7 +3429,7 @@ void RecoveryManager::processRecoveryEvent( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "]," + << ": For Partition [" << partitionId << "]," << " failed to load chunk position, rc: " << rc << ". Chunk type: " << header.fileChunkType() << ", chunk sequence number: " << header.chunkSequenceNumber() @@ -3467,9 +3465,8 @@ void RecoveryManager::processRecoveryEvent( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId << "], " - << "failed to calculate MD5 digest, " - << "rc: " << rc + << ": For Partition [" << partitionId << "], " + << "failed to calculate MD5 digest, " << "rc: " << rc << ". Chunk type: " << header.fileChunkType() << ", chunk sequence number: " << header.chunkSequenceNumber() @@ -3492,7 +3489,7 @@ void RecoveryManager::processRecoveryEvent( bmqp::RecoveryHeader::k_MD5_DIGEST_LEN)) { mwcu::MemOutStream out; out << d_clusterData_p->identity().description() - << ": For PartitionId [" << partitionId + << ": For Partition [" << partitionId << "], chunk MD5 digest mismatch. Calculated: "; bdlb::Print::singleLineHexDump( @@ -3609,7 +3606,7 @@ void RecoveryManager::processShutdownEvent(int partitionId) // timers should also be cancelled here. BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received shutdown event."; RecoveryContext& recoveryCtx = d_recoveryContexts[partitionId]; @@ -3664,7 +3661,7 @@ void RecoveryManager::processStorageSyncRequest( BALL_LOG_WARN << d_clusterData_p->identity().description() << ": Received duplicate storage sync request " << "from: " << source->nodeDescription() - << " for PartitionId [" << req.partitionId() + << " for Partition [" << req.partitionId() << "]. Ignoring this request."; bmqp_ctrlmsg::Status& status = @@ -3723,7 +3720,7 @@ void RecoveryManager::processStorageSyncRequest( if (!mqbc::ClusterUtil::isValid(beginSpOffsetPair)) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << req.partitionId() + << ": Partition [" << req.partitionId() << "] received invalid starting sync point (A) from " << source->nodeDescription() << " in its storage sync request: " << req @@ -3798,7 +3795,7 @@ void RecoveryManager::processStorageSyncRequest( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: Number of sync points in the " << "current journal: " << spOffsetPairs.size(); @@ -3920,7 +3917,7 @@ void RecoveryManager::processStorageSyncRequest( BSLS_ASSERT_SAFE(fti.dataFd().isValid()); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: successfully opened the partition."; bsls::Types::Uint64 journalFileBeginOffset = 0; @@ -3935,7 +3932,7 @@ void RecoveryManager::processStorageSyncRequest( bmqp_ctrlmsg::StorageSyncResponseType::Value responseType; BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: Sync point summary:\n" << "\t'X': " << xsp << '\n' << "\t'Y': " << ysp << '\n' @@ -3954,7 +3951,7 @@ void RecoveryManager::processStorageSyncRequest( // corresponding to 'A' and 'B'. BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: starting search for sync point: " << asp << " in a list of " << spOffsetPairs.size() << " sync points."; @@ -3982,7 +3979,7 @@ void RecoveryManager::processStorageSyncRequest( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: Begin SyncPt (A) found at journal offset: " << retA.first->offset(); // Note that above, the retrieved offset of SyncPt in the journal @@ -3996,7 +3993,7 @@ void RecoveryManager::processStorageSyncRequest( mqbs::FileStoreProtocol::k_JOURNAL_RECORD_SIZE; BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: starting search for sync point: " << bsp << " in a list of " << spOffsetPairs.size() << " sync points."; @@ -4025,7 +4022,7 @@ void RecoveryManager::processStorageSyncRequest( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: End SyncPt (B) found at journal offset: " << retB.first->offset(); // Note that above, the retrieved offset of SyncPt in the journal @@ -4086,7 +4083,7 @@ void RecoveryManager::processStorageSyncRequest( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: starting search for sync point: " << endSpOffset << " in a list of " << spOffsetPairs.size() << " sync points."; @@ -4098,7 +4095,7 @@ void RecoveryManager::processStorageSyncRequest( SyncPointOffsetPairComparator()); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: done with search."; if (rcPair.first == rcPair.second) { @@ -4120,7 +4117,7 @@ void RecoveryManager::processStorageSyncRequest( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: End SyncPt (max(X, B)) found at journal offset: " << rcPair.first->offset(); // Note that the journal offset of the retrieved SyncPt printed @@ -4163,7 +4160,7 @@ void RecoveryManager::processStorageSyncRequest( dataFileBeginOffset; BSLS_ASSERT_SAFE(dataFileSize >= 0); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: sending DATA patch/file of size: " << mwcu::PrintUtil::prettyNumber(dataFileSize) << " bytes."; @@ -4191,7 +4188,7 @@ void RecoveryManager::processStorageSyncRequest( qlistFileBeginOffset; BSLS_ASSERT_SAFE(qlistFileSize >= 0); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: sending QLIST file/patch of size: " << mwcu::PrintUtil::prettyNumber(qlistFileSize) << " bytes."; @@ -4217,7 +4214,7 @@ void RecoveryManager::processStorageSyncRequest( journalFileBeginOffset; BSLS_ASSERT_SAFE(journalFileSize >= 0); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: sending JOURNAL file/patch of size: " << mwcu::PrintUtil::prettyNumber(journalFileSize) << " bytes."; @@ -4266,9 +4263,8 @@ void RecoveryManager::startPartitionPrimarySync( // this case needs to be handled in a higher component. MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << " PartitionId [" - << pid << "]: " - << "primary sync is already under progress with " + << d_clusterData_p->identity().description() << " Partition [" + << pid << "]: " << "primary sync is already under progress with " << (primarySyncCtx.syncPeer() ? primarySyncCtx.syncPeer()->nodeDescription() : "** null **") @@ -4370,7 +4366,7 @@ void RecoveryManager::processPartitionSyncStateRequest( BSLS_ASSERT_SAFE(!isRecoveryInProgress(req.partitionId())); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: received partition-sync state request: " << req << ", from: " << source->nodeDescription(); @@ -4410,7 +4406,7 @@ void RecoveryManager::processPartitionSyncDataRequest( BSLS_ASSERT_SAFE(req.partitionId() == fs->config().partitionId()); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: received partition-sync data request: " << req << ", from: " << source->nodeDescription(); @@ -4439,7 +4435,7 @@ void RecoveryManager::processPartitionSyncDataRequest( BALL_LOG_WARN << d_clusterData_p->identity().description() << ": Received duplicate partition sync request " << "from: " << source->nodeDescription() - << " for PartitionId [" << req.partitionId() + << " for Partition [" << req.partitionId() << "]. Ignoring this request."; bmqp_ctrlmsg::Status& status = @@ -4497,7 +4493,7 @@ void RecoveryManager::processPartitionSyncDataRequest( if (requesterUptoSeqNum <= requesterLastSeqNum) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << req.partitionId() + << ": For Partition [" << req.partitionId() << "], received partition sync data request from " << source->nodeDescription() << ", with invalid " << " 'last/upto' sequence numbers: " @@ -4517,7 +4513,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // Request should not have been sent to this node. BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << req.partitionId() + << ": For Partition [" << req.partitionId() << "], received partition sync data request from " << source->nodeDescription() << ", with equal or greater" << " last sequence number " << requesterLastSeqNum @@ -4537,7 +4533,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // Request should not have been sent to this node. BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << req.partitionId() + << ": For Partition [" << req.partitionId() << "], received partition sync data request from " << source->nodeDescription() << ", with greater 'upto' " << "sequence number " << requesterUptoSeqNum @@ -4568,7 +4564,7 @@ void RecoveryManager::processPartitionSyncDataRequest( if (req.lastPrimaryLeaseId() != lastSpoPair.syncPoint().primaryLeaseId() && req.lastSequenceNum() < lastSpoPair.syncPoint().sequenceNum()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": For PartitionId [" << req.partitionId() + << ": For Partition [" << req.partitionId() << "], invalid sync-point and primaryLeaseId/seqNum " << "specified in partition sync data request: " << req << ", from " << source->nodeDescription(); @@ -4623,7 +4619,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // point. BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: Last sync point not found: " << lastSpoPair << ", while serving partition-sync data request: " << req << ", from: " << source->nodeDescription() @@ -4651,14 +4647,13 @@ void RecoveryManager::processPartitionSyncDataRequest( // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << req.partitionId() << "]: Encountered invalid sync point JOURNAL offset: " << journalSpOffset << ", JOURNAL size: " << fileSet.journalFileSize() << ", sync point: " << lastSpoPair.syncPoint() - << ", while processing partition-sync" - << " data request: " << req + << ", while processing partition-sync" << " data request: " << req << ", from: " << source->nodeDescription() << MWCTSK_ALARMLOG_END; bmqp_ctrlmsg::Status& status = controlMsg.choice().makeStatus(); @@ -4681,7 +4676,7 @@ void RecoveryManager::processPartitionSyncDataRequest( &fti.qlistFd()); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << req.partitionId() << "]: Failed to open one of JOURNAL/QLIST/DATA file, rc: " << rc << ", reason [" << errorDesc.str() @@ -4723,7 +4718,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << req.partitionId() << "]: Invalid QLIST [" << fileSet.qlistFile() << "] offset in sync point: " << lastSpoPair.syncPoint() << ". Current QLIST file size: " << fileSet.qlistFileSize() @@ -4748,7 +4743,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // Yikes, this is bad. MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << req.partitionId() << "]: Invalid DATA [" << fileSet.dataFile() << "] offset in sync point: " << lastSpoPair.syncPoint() << ". Current DATA file size: " << fileSet.dataFileSize() @@ -4784,7 +4779,7 @@ void RecoveryManager::processPartitionSyncDataRequest( // the 'beginSeqNum'. BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << req.partitionId() + << " Partition [" << req.partitionId() << "]: replaying partition from: " << requesterLastSeqNum << " (exclusive) to: " << requesterUptoSeqNum << " (inclusive) with preceding sync-point JOURNAL offset " @@ -4800,7 +4795,7 @@ void RecoveryManager::processPartitionSyncDataRequest( journalSpOffset); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << req.partitionId() << "]: Failed to replay partition while serving partition sync " << "request: " << req @@ -4862,7 +4857,7 @@ void RecoveryManager::processPartitionSyncDataRequestStatus( d_primarySyncContexts[queryStatus.partitionId()]; if (!primarySyncCtx.primarySyncInProgress()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << queryStatus.partitionId() + << " Partition [" << queryStatus.partitionId() << "]: primary sync is not in progress, for the received" << " status: " << queryStatus; return; // RETURN @@ -4870,7 +4865,7 @@ void RecoveryManager::processPartitionSyncDataRequestStatus( if (primarySyncCtx.syncPeer() != source) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << queryStatus.partitionId() + << " Partition [" << queryStatus.partitionId() << "] received partition sync request status: " << queryStatus << ", from node: " << source->nodeDescription() @@ -4885,7 +4880,7 @@ void RecoveryManager::processPartitionSyncDataRequestStatus( if (status.category() != bmqp_ctrlmsg::StatusCategory::E_SUCCESS) { MWCTSK_ALARMLOG_ALARM("CLUSTER") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << queryStatus.partitionId() << "]: partition-sync with peer: " << source->nodeDescription() << " failed with status: " << status << MWCTSK_ALARMLOG_END; @@ -4895,7 +4890,7 @@ void RecoveryManager::processPartitionSyncDataRequestStatus( } BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << queryStatus.partitionId() + << " Partition [" << queryStatus.partitionId() << "]: partition-sync with peer: " << source->nodeDescription() << " succeeded. Self node will " << "now attempt to sync replica peers."; @@ -4936,7 +4931,7 @@ void RecoveryManager::onNodeStateChange(mqbnet::ClusterNode* node, } else { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << pid + << ": Partition [" << pid << "]: ignoring node state change event from peer: " << node->nodeDescription(); } diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 7df2d78e6..76de65acb 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -210,7 +210,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, // replica is out of sync. MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_state_p->domain()->cluster()->name() << ": PartitionId [" + << d_state_p->domain()->cluster()->name() << ": Partition [" << d_state_p->partitionId() << "]: failed to retrieve storage for remote queue [" << d_state_p->uri() << "], queueKey [" << d_state_p->key() @@ -232,7 +232,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, if (!d_state_p->isStorageCompatible(storageMp)) { MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_state_p->domain()->cluster()->name() << ": PartitionId [" + << d_state_p->domain()->cluster()->name() << ": Partition [" << d_state_p->partitionId() << "]: incompatible storage type for remote queue [" << d_state_p->uri() << "], queueKey [" << d_state_p->key() @@ -267,7 +267,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, rc = d_queueEngine_mp->configure(errorDesc); if (rc != 0) { MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_state_p->domain()->cluster()->name() << ": PartitionId [" + << d_state_p->domain()->cluster()->name() << ": Partition [" << d_state_p->partitionId() << "]: failed to configure queue engine for remote queue [" << d_state_p->uri() << "], queueKey [" << d_state_p->key() diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 8562aa7dc..7d74f968b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -142,7 +142,7 @@ void StorageManager::startRecoveryCb(int partitionId) BALL_LOG_INFO << d_clusterData_p->identity().description() << " ProcessorID [" << dispatcherClientData.processorHandle() - << "] | PartitionId [" << partitionId + << "] | Partition [" << partitionId << "]: Starting first phase of recovery."; // Start recovery for the partition through recovery manager. Note that if @@ -178,7 +178,7 @@ void StorageManager::onPartitionRecovery( if (d_cluster_p->isStopping()) { BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "] cluster is stopping; skipping partition recovery."; return; // RETURN } @@ -200,7 +200,7 @@ void StorageManager::onPartitionRecovery( pinfo.primary() != recoveryPeer && pinfo.primary() != d_clusterData_p->membership().selfNode()) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: scheduling another recovery due to new " << "primary being chosen while previous recovery was " << "in progress with recovery peer: " @@ -216,7 +216,7 @@ void StorageManager::onPartitionRecovery( if (0 != status) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << d_clusterData_p->identity().description() << ": PartitionId [" + << d_clusterData_p->identity().description() << ": Partition [" << partitionId << "] failed to recover with peer " << (recoveryPeer ? recoveryPeer->nodeDescription() : "**NA**") << " with status: " << status @@ -227,7 +227,7 @@ void StorageManager::onPartitionRecovery( // Recovery was successful & there is no need to schedule another one. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "] has successfully recovered with peer " << (recoveryPeer ? recoveryPeer->nodeDescription() : "**NA**") @@ -237,7 +237,7 @@ void StorageManager::onPartitionRecovery( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") << d_clusterData_p->identity().description() - << ": Failed to open PartitionId [" << partitionId + << ": Failed to open Partition [" << partitionId << "] after recovery was finished, rc: " << rc << MWCTSK_ALARMLOG_END; } @@ -245,7 +245,7 @@ void StorageManager::onPartitionRecovery( // Apply 'recoveryEvents' to the file store. BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "] opened successfully, applying " << recoveryEvents.size() << " buffered storage events to the partition."; @@ -255,7 +255,7 @@ void StorageManager::onPartitionRecovery( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("RECOVERY") << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "] failed to apply buffered storage event, rc: " << rc << ". Closing the partition." << MWCTSK_ALARMLOG_END; @@ -270,7 +270,7 @@ void StorageManager::onPartitionRecovery( BALL_LOG_INFO << d_clusterData_p->identity().description() - << ": PartitionId [" << partitionId + << ": Partition [" << partitionId << "] after applying buffered storage events, " << "(recoveryPeerNode, primaryLeaseId, " << "sequenceNumber): (" @@ -539,7 +539,7 @@ void StorageManager::setPrimaryForPartitionDispatched( PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received partition/primary info. Primary: " << (primaryNode ? primaryNode->nodeDescription() : "**null**") @@ -550,7 +550,7 @@ void StorageManager::setPrimaryForPartitionDispatched( if (primaryLeaseId < pinfo.primaryLeaseId()) { MWCTSK_ALARMLOG_ALARM("REPLICATION") - << d_clusterData_p->identity().description() << " PartitionId [" + << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: Smaller new primaryLeaseId specified: " << primaryLeaseId << ", current primaryLeaseId: " @@ -593,9 +593,9 @@ void StorageManager::setPrimaryForPartitionDispatched( // Same leaseId, different node. This is an error. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId - << "]: Same primaryLeaseId specified [" << primaryLeaseId + << d_clusterData_p->identity().description() << " Partition [" + << partitionId << "]: Same primaryLeaseId specified [" + << primaryLeaseId << "] with a different primary node. Current primary: " << pinfo.primary()->nodeDescription() << ", specified primary: " << primaryNode->nodeDescription() @@ -797,7 +797,7 @@ void StorageManager::processPartitionSyncEventDispatched( if (!d_recoveryManager_mp->isPrimarySyncInProgress(partitionId)) { BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received a partition sync event from: " << source->nodeDescription() << ", while self is not under partition-sync."; @@ -806,7 +806,7 @@ void StorageManager::processPartitionSyncEventDispatched( if (source != d_recoveryManager_mp->primarySyncPeer(partitionId)) { BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received a partition sync event from: " << source->nodeDescription() << ", while partition-sync peer is: " @@ -816,7 +816,7 @@ void StorageManager::processPartitionSyncEventDispatched( if (bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE != pinfo.primaryStatus()) { BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received a partition sync event from: " << source->nodeDescription() << ", while self is ACTIVE primary."; @@ -830,7 +830,7 @@ void StorageManager::processPartitionSyncEventDispatched( if (bmqp_ctrlmsg::PrimaryStatus::E_PASSIVE != pinfo.primaryStatus()) { BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received a partition sync event from: " << source->nodeDescription() << ", but self perceives sender (primary) as " @@ -840,7 +840,7 @@ void StorageManager::processPartitionSyncEventDispatched( } else { BALL_LOG_ERROR << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received a partition sync event from: " << source->nodeDescription() << ", but neither self is primary nor the sender is " @@ -893,7 +893,7 @@ void StorageManager::processPartitionSyncStateRequestDispatched( // but will reply anyways. BALL_LOG_WARN << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received partition-sync state request: " << message.choice() .clusterMessage() @@ -955,7 +955,7 @@ void StorageManager::processShutdownEventDispatched(int partitionId) BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: received shutdown event."; // Inform RecoveryMgr, which may cancel ongoing recovery or prevent a @@ -1551,7 +1551,7 @@ void StorageManager::setPrimaryForPartition(int partitionId, d_clusterData_p->membership().selfNodeStatus() != bmqp_ctrlmsg::NodeStatus::E_AVAILABLE) { BALL_LOG_INFO << d_clusterData_p->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: proposed primary is self but self is not " << "AVAILABLE. Self status: " << d_clusterData_p->membership().selfNodeStatus() @@ -1705,7 +1705,7 @@ void StorageManager::processStorageEvent( MWCTSK_ALARMLOG_ALARM("STORAGE") << d_cluster_p->description() << ": Received storage event " << "from node " << source->nodeDescription() << " with " - << "invalid PartitionId [" << pid << "]. Ignoring entire " + << "invalid Partition [" << pid << "]. Ignoring entire " << "storage event." << MWCTSK_ALARMLOG_END; return; // RETURN } diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index dbb25f0a7..cdf19c625 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -512,7 +512,7 @@ void ClusterUtil::assignPartitions( } BALL_LOG_INFO << clusterData.identity().description() - << ": PartitionId [" << pinfo.partitionId() + << ": Partition [" << pinfo.partitionId() << "]: Leader (self) has assigned " << primary->nodeDescription() << " as primary."; @@ -1143,11 +1143,10 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") << cluster->description() << ": re-registering a known queue with a stale view, " - << "but queueKey is not unique. " - << "QueueKey [" << queueKey << "], URI [" << uri - << "], PartitionId [" << partitionId - << "], AppIdInfos [" << storageAppIdInfos << "]." - << MWCTSK_ALARMLOG_END; + << "but queueKey is not unique. " << "QueueKey [" + << queueKey << "], URI [" << uri << "], Partition [" + << partitionId << "], AppIdInfos [" + << storageAppIdInfos << "]." << MWCTSK_ALARMLOG_END; return; // RETURN } @@ -1180,8 +1179,8 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, << cluster->description() << ": registering a queue for an unknown queue, but " << "queueKey is not unique. QueueKey [" << queueKey - << "], URI [" << uri << "], PartitionId [" << partitionId - << "]." << MWCTSK_ALARMLOG_END; + << "], URI [" << uri << "], Partition [" << partitionId << "]." + << MWCTSK_ALARMLOG_END; return; // RETURN } diff --git a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h index 037c89b17..ebed4277d 100644 --- a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h +++ b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h @@ -696,8 +696,7 @@ void PartitionStateTableActions::do_none(const ARGS& args) const int partitionId = args->eventsQueue()->front().second[0].partitionId(); - BALL_LOG_INFO << "PartitionId [" << partitionId - << "]: NO ACTION PERFORMED."; + BALL_LOG_INFO << "Partition [" << partitionId << "]: NO ACTION PERFORMED."; } template diff --git a/src/groups/mqb/mqbc/mqbc_recoveryutil.cpp b/src/groups/mqb/mqbc/mqbc_recoveryutil.cpp index 3b0b1b935..7939c4d90 100644 --- a/src/groups/mqb/mqbc/mqbc_recoveryutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_recoveryutil.cpp @@ -283,7 +283,7 @@ int RecoveryUtil::incrementCurrentSeqNum( // smaller or equal. BALL_LOG_ERROR - << clusterDescription << " PartitionId [" << partitionId + << clusterDescription << " Partition [" << partitionId << "]: incorrect sequence number encountered while attempting " << "to replay partition to peer: " << *currentSeqNum << ". Sequence number cannot be greater than: " << endSeqNum diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp index 940d932a8..ead707f45 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp @@ -331,7 +331,7 @@ struct TestHelper { partitionDesc << d_cluster_mp->_clusterData() ->identity() .description() - << " PartitionId [" << partitionId << "]: "; + << " Partition [" << partitionId << "]: "; mwcu::BlobPosition recordPosition; mwcu::BlobObjectProxy recHeader; diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 60eace567..ead834b26 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -229,7 +229,7 @@ void StorageUtil::registerQueueDispatched( true); // Is new storage? if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "] failed to write QueueCreationRecord for queue [" << storage->queueUri() << "] queueKey [" << storage->queueKey() << "], rc: " << rc << MWCTSK_ALARMLOG_END; @@ -248,7 +248,7 @@ void StorageUtil::registerQueueDispatched( fs->dispatcherFlush(true, false); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] registered [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "] with the storage as primary."; } @@ -329,7 +329,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, false); // is new queue? if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "] failed to write QueueCreationRecord for new appIds " << "for queue [" << storage->queueUri() << "] queueKey [" << storage->queueKey() << "], rc: " << rc @@ -356,7 +356,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, mwcu::Printer printer(&addedIdKeyPairs); BALL_LOG_OUTPUT_STREAM - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "] For an already registered queue [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "], added [" << addedIdKeyPairs.size() << "] new appId/appKey " @@ -380,7 +380,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, timestamp); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "] failed to write QueueDeletionRecord for queue [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "], appId [" << cit->first @@ -398,9 +398,9 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, partitionId); if (0 != rc) { BALL_LOG_ERROR - << clusterDescription << " PartitionId [" << partitionId - << "]: Failed to remove virtual storage for " - << "appKey [" << cit->second << "], appId [" << cit->first + << clusterDescription << " Partition [" << partitionId + << "]: Failed to remove virtual storage for " << "appKey [" + << cit->second << "], appId [" << cit->first << "], for queue [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "], rc: " << rc << "."; @@ -413,7 +413,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, mwcu::Printer printer(&removedIdKeyPairs); BALL_LOG_OUTPUT_STREAM - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "] For an already registered queue [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "], removed [" << removedIdKeyPairs.size() << "] existing appId/appKey " @@ -427,7 +427,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, mwcu::Printer printer1(&addedIdKeyPairs); mwcu::Printer printer2(&removedIdKeyPairs); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "] with the storage as primary: " << "addedIdKeyPairs:" << printer1 @@ -472,7 +472,7 @@ int StorageUtil::addVirtualStoragesInternal( ++cit) { AppKeysInsertRc irc = appKeys->insert(cit->second); if (!irc.second && isCSLMode) { - BALL_LOG_WARN << clusterDescription << " PartitionId [" + BALL_LOG_WARN << clusterDescription << " Partition [" << partitionId << "]: AppKey [" << cit->second << "] already exists, while attempting to add " << "appId [" << cit->first << "], for queue [" @@ -485,14 +485,13 @@ int StorageUtil::addVirtualStoragesInternal( if (0 != (rc = storage->addVirtualStorage(errorDesc, cit->first, cit->second))) { - BALL_LOG_WARN << clusterDescription << " PartitionId [" - << partitionId << "]: " - << "Failed to add virtual storage for AppKey [" - << cit->second << "], appId [" << cit->first - << "], for queue [" << storage->queueUri() - << "], queueKey [" << storage->queueKey() - << "]. Reason: [" << errorDesc.str() - << "], rc: " << rc << "."; + BALL_LOG_WARN + << clusterDescription << " Partition [" << partitionId + << "]: " << "Failed to add virtual storage for AppKey [" + << cit->second << "], appId [" << cit->first + << "], for queue [" << storage->queueUri() + << "], queueKey [" << storage->queueKey() << "]. Reason: [" + << errorDesc.str() << "], rc: " << rc << "."; return rc_VIRTUAL_STORAGE_CREATION_FAILURE; // RETURN } @@ -923,7 +922,7 @@ StorageUtil::printRecoveryPhaseOneBanner(bsl::ostream& out, const int spacesPerLevel = 4; mwcu::MemOutStream header; - header << "RECOVERY PHASE 1: " << clusterDescription << " PartitionId [" + header << "RECOVERY PHASE 1: " << clusterDescription << " Partition [" << partitionId << "]"; bdlb::Print::newlineAndIndent(out, level + 1, spacesPerLevel); @@ -1085,7 +1084,7 @@ bool StorageUtil::validateStorageEvent( MWCTSK_ALARMLOG_ALARM("STORAGE") << clusterDescription << ": Received storage " << "event from node " << source->nodeDescription() << " for " - << "PartitionId [" << partitionId << "] which has no primary as " + << "Partition [" << partitionId << "] which has no primary as " << "perceived by this node. Ignoring entire storage event." << MWCTSK_ALARMLOG_END; return false; // RETURN @@ -1099,7 +1098,7 @@ bool StorageUtil::validateStorageEvent( MWCTSK_ALARMLOG_ALARM("STORAGE") << clusterDescription << ": Received storage " << "event from node " << source->nodeDescription() << " for " - << "PartitionId [" << partitionId << "] which has different " + << "Partition [" << partitionId << "] which has different " << "primary as perceived by this node: " << primary->nodeDescription() << " Ignoring entire " << "storage event." << MWCTSK_ALARMLOG_END; @@ -1114,7 +1113,7 @@ bool StorageUtil::validateStorageEvent( MWCTSK_ALARMLOG_ALARM("STORAGE") << clusterDescription << ": Received storage " - << "event for PartitionId [" << partitionId + << "event for Partition [" << partitionId << "] from: " << source->nodeDescription() << ", which is perceived as " << "non-active primary. Primary status: " << status @@ -1145,7 +1144,7 @@ bool StorageUtil::validatePartitionSyncEvent( if (partitionInfo.primary() != clusterData.membership().selfNode() && partitionInfo.primary() != source) { BALL_LOG_ERROR << clusterData.identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Received partition-sync event from peer: " << source->nodeDescription() << " but neither self nor peer is primary. Perceived" @@ -1162,7 +1161,7 @@ bool StorageUtil::validatePartitionSyncEvent( // be perceived as a passive one. BALL_LOG_ERROR << clusterData.identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Received partition-sync event from: " << source->nodeDescription() << " but primary status is: " @@ -1321,7 +1320,7 @@ void StorageUtil::clearPrimaryForPartition( return; // RETURN } - BALL_LOG_INFO << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId << "]: processing 'clear-primary' event. Current primary: " << partitionInfo->primary()->nodeDescription() << ", current leaseId: " << partitionInfo->primaryLeaseId() @@ -1413,7 +1412,7 @@ void StorageUtil::onPartitionPrimarySync( // handled. MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData->identity().description() << " PartitionId [" + << clusterData->identity().description() << " Partition [" << partitionId << "]: new primary (" << pinfo->primary()->nodeDescription() << ") with leaseId " << pinfo->primaryLeaseId() @@ -1428,7 +1427,7 @@ void StorageUtil::onPartitionPrimarySync( if (0 != status) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData->identity().description() << " PartitionId [" + << clusterData->identity().description() << " Partition [" << partitionId << "]: node failed to sync " << "after being chosen as primary, with status: " << status << MWCTSK_ALARMLOG_END; @@ -1519,7 +1518,7 @@ void StorageUtil::recoveredQueuesCb( bmqt::Uri uri(qinfo.canonicalQueueUri()); if (!uri.isValid()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: encountered invalid CanonicalQueueUri [" << uri << "]." << MWCTSK_ALARMLOG_END; mqbu::ExitUtil::terminate(mqbu::ExitCode::e_RECOVERY_FAILURE); @@ -1540,13 +1539,13 @@ void StorageUtil::recoveredQueuesCb( // Duplicate AppId. MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" - << partitionId << "]: " + << clusterDescription << ": Partition [" << partitionId + << "]: " << "encountered a duplicate AppId while processing " - << "recovered queue [" << uri << "], " - << "queueKey [" << qit->first << "]. AppId [" - << *(appIdsIrc.first) << "]. AppKey [" << p.second - << "]." << MWCTSK_ALARMLOG_END; + << "recovered queue [" << uri << "], " << "queueKey [" + << qit->first << "]. AppId [" << *(appIdsIrc.first) + << "]. AppKey [" << p.second << "]." + << MWCTSK_ALARMLOG_END; mqbu::ExitUtil::terminate( mqbu::ExitCode::e_RECOVERY_FAILURE); // EXIT @@ -1563,8 +1562,8 @@ void StorageUtil::recoveredQueuesCb( // CQH::onQueueAssigned -> // StorageMgr::register/UpdateQueueReplica. MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" - << partitionId << "]: " + << clusterDescription << ": Partition [" << partitionId + << "]: " << "encountered a duplicate AppKey while processing " << "recovered queue [" << uri << "], queueKey [" << qit->first << "]. AppKey [" << *(appKeysIrc.first) @@ -1585,8 +1584,8 @@ void StorageUtil::recoveredQueuesCb( // Print the unique list of retrieved domain names (useful for debugging // purposes). mwcu::MemOutStream os; - os << clusterDescription << ": PartitionId [" << partitionId << "]: " - << "retrieved " + os << clusterDescription << ": Partition [" << partitionId + << "]: " << "retrieved " << mwcu::PrintUtil::prettyNumber( static_cast(queueKeyInfoMap.size())) << " queues belonging to " @@ -1612,7 +1611,7 @@ void StorageUtil::recoveredQueuesCb( for (DomainMapIter dit = domainMap.begin(); dit != domainMap.end(); ++dit) { - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "]: requesting domain for [" << dit->first << "]."; domainFactory->createDomain( @@ -1627,12 +1626,12 @@ void StorageUtil::recoveredQueuesCb( partitionId)); } - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "]: about to wait for [" << domainMap.size() << "] domains to be created."; latch.wait(); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "]: domain creation step complete. Checking if all " << "domains were created successfully."; @@ -1677,11 +1676,11 @@ void StorageUtil::recoveredQueuesCb( const mqbs::ReplicatedStorage* rs = it->second; MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: encountered queueKey [" << queueKey << "] again, for uri [" << queueUri - << "]. Uri associated with original queueKey: " - << "[" << rs->queueUri() << "]." << MWCTSK_ALARMLOG_END; + << "]. Uri associated with original queueKey: " << "[" + << rs->queueUri() << "]." << MWCTSK_ALARMLOG_END; mqbu::ExitUtil::terminate(mqbu::ExitCode::e_RECOVERY_FAILURE); // EXIT } @@ -1725,7 +1724,7 @@ void StorageUtil::recoveredQueuesCb( unrecognizedDomains->end()); } - BALL_LOG_INFO << clusterDescription << ": PartitionId [" + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "]: encountered queueUri [" << queueUri << "] again. QueueKey of this uri [" << queueKey << "]."; @@ -1740,10 +1739,10 @@ void StorageUtil::recoveredQueuesCb( const StorageSp& rstorage = spmapIt->second; MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: encountered queueUri [" << queueUri << "] again. QueueKey of this uri [" << queueKey - << "]. Details of original queueUri:: PartitionId [" + << "]. Details of original queueUri:: Partition [" << rstorage->partitionId() << "], queueKey [" << rstorage->queueKey() << "]." << MWCTSK_ALARMLOG_END; mqbu::ExitUtil::terminate(mqbu::ExitCode::e_RECOVERY_FAILURE); @@ -1771,7 +1770,7 @@ void StorageUtil::recoveredQueuesCb( mqbi::Domain* domain = dit->second; BSLS_ASSERT_SAFE(domain->cluster()); - BALL_LOG_INFO << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId << "] creating storage for queue [" << queueUri << "], queueKey [" << queueKey << "]."; @@ -1781,7 +1780,7 @@ void StorageUtil::recoveredQueuesCb( if (domainCfg.mode().isUndefinedValue()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: Domain for queue [" << queueUri << "], queueKey [" << queueKey << "] has invalid queue mode. Aborting broker." << MWCTSK_ALARMLOG_END; @@ -1791,7 +1790,7 @@ void StorageUtil::recoveredQueuesCb( if (storageDef.config().isUndefinedValue()) { MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: Domain for queue [" << queueUri << "], queueKey [" << queueKey << "] has invalid storage config. Aborting broker." << MWCTSK_ALARMLOG_END; @@ -1808,7 +1807,7 @@ void StorageUtil::recoveredQueuesCb( // clustered setup. MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: Queue [" << queueUri << "], queueKey [" << queueKey << "] is clustered, is setup with incompatible " << "config. In-memory storage: " << bsl::boolalpha @@ -1848,8 +1847,7 @@ void StorageUtil::recoveredQueuesCb( // TBD: does this mean storage is corrupt? Should we abort? MWCTSK_ALARMLOG_ALARM("RECOVERY") - << clusterDescription << ": PartitionId [" - << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: failed to create virtual storage with appId [" << appId << "], appKey [" << appKey << "] for queueUri [" << queueUri << "], queueKey [" @@ -1859,7 +1857,7 @@ void StorageUtil::recoveredQueuesCb( } BALL_LOG_INFO - << clusterDescription << " PartitionId [" << partitionId + << clusterDescription << " Partition [" << partitionId << "]: Created virtual storage with appId [" << appId << "], appKey [" << appKey << "] for queueUri [" << queueUri << "], queueKey [" << queueKey << "]."; @@ -1884,14 +1882,14 @@ void StorageUtil::recoveredQueuesCb( // TBD: does this mean storage is corrupt? Should we abort? BALL_LOG_WARN - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: failed to create default virtual storage" << " for queueUri [" << queueUri << "], queueKey [" << queueKey << "]. Reason: [" << errorDesc.str() << "], rc: " << rc << "."; } - BALL_LOG_INFO << clusterDescription << " PartitionId [" + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId << "]: Created default virtual storage " << " for queueUri [" << queueUri << "], queueKey [" @@ -1903,7 +1901,7 @@ void StorageUtil::recoveredQueuesCb( // storages. Note that any virtual storages associated with each // file-backed storage will also be populated at this time. - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "], total number of " << "records found during recovery: " << fs->numRecords(); @@ -1984,9 +1982,9 @@ void StorageUtil::recoveredQueuesCb( // If queue is not recovered MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterDescription << ": PartitionId [" << partitionId - << "], dropping record " - << "because queue key '" << queueKey << "' not found in the " + << clusterDescription << ": Partition [" << partitionId + << "], dropping record " << "because queue key '" << queueKey + << "' not found in the " << "list of recovered queues, record: " << fsIt << MWCTSK_ALARMLOG_END; continue; // CONTINUE @@ -2003,7 +2001,7 @@ void StorageUtil::recoveredQueuesCb( if (!rs->isPersistent()) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "]: For queue [" << rs->queueUri() << "] queueKey [" << queueKey << "] which is configured with in-memory storage, " @@ -2064,7 +2062,7 @@ void StorageUtil::recoveredQueuesCb( if (!appKey.isNull() && !rs->hasVirtualStorage(appKey)) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterDescription << ": PartitionId [" << partitionId + << clusterDescription << ": Partition [" << partitionId << "], appKey [" << appKey << "] specified in " << fsIt.type() << " record, with guid [" << guid << "] not found in the list of virtual " @@ -2300,12 +2298,12 @@ void StorageUtil::shutdown(int partitionId, if (fs) { BSLS_ASSERT_SAFE(fs->inDispatcherThread()); - BALL_LOG_INFO << clusterDescription << ": Closing PartitionId [" + BALL_LOG_INFO << clusterDescription << ": Closing Partition [" << partitionId << "]."; fs->close(clusterConfig.partitionConfig().flushAtShutdown()); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] closed."; } @@ -2389,7 +2387,7 @@ void StorageUtil::registerQueue( if (queueMode.isUndefinedValue()) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << "PartitionId [" << partitionId + << "Partition [" << partitionId << "] Invalid queue-mode in the domain configuration while " << "attempting to register queue '" << uri << "', queueKey '" << queueKey << "'." << MWCTSK_ALARMLOG_END; @@ -2404,7 +2402,7 @@ void StorageUtil::registerQueue( if (storageDef.config().isInMemoryValue() != queueMode.isBroadcastValue()) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << "PartitionId [" << partitionId << "] Incompatible " + << "Partition [" << partitionId << "] Incompatible " << "queue mode (" << queueMode.selectionName() << ") " << "and storage type (" << storageDef.config().selectionName() << ") while attempting to register clustered queue '" << uri @@ -2416,7 +2414,7 @@ void StorageUtil::registerQueue( BSLS_ASSERT_SAFE(storageDef.config().isInMemoryValue() || storageDef.config().isFileBackedValue()); - BALL_LOG_INFO << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << " Partition [" << partitionId << "]: Registering queue '" << uri << "', queueKey: '" << queueKey << "' to storage."; @@ -2634,7 +2632,7 @@ void StorageUtil::unregisterQueueDispatched( if (clusterData->membership().selfNode() != pinfo.primary()) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData->identity().description() << " PartitionId [" + << clusterData->identity().description() << " Partition [" << partitionId << "]: queue [" << uri << "] unregistration requested but self is not primary. Current " << "primary: " @@ -2648,7 +2646,7 @@ void StorageUtil::unregisterQueueDispatched( if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE != pinfo.primaryStatus()) { MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData->identity().description() << " PartitionId [" + << clusterData->identity().description() << " Partition [" << partitionId << "]: queue [" << uri << "] unregistration requested but self is not ACTIVE primary." << ". Current leaseId: " << pinfo.primaryLeaseId() @@ -2669,7 +2667,7 @@ void StorageUtil::unregisterQueueDispatched( // registered with StorageManager at the primary node, and thus, this // 'if' snippet will be executed. BALL_LOG_WARN - << clusterData->identity().description() << " PartitionId [" + << clusterData->identity().description() << " Partition [" << partitionId << "]: queue [" << uri << "] requested for unregistration not found in storage manager."; return; // RETURN @@ -2689,7 +2687,7 @@ void StorageUtil::unregisterQueueDispatched( // the case here, its an error. MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData->identity().description() << ": PartitionId [" + << clusterData->identity().description() << ": Partition [" << partitionId << "]: cannot unregister queue" << " because it has [" << numMsgs << "] outstanding messages. Queue [" << uri << "], queueKey [" @@ -2699,7 +2697,7 @@ void StorageUtil::unregisterQueueDispatched( // Storage has no outstanding messages. - BALL_LOG_INFO << clusterData->identity().description() << ": PartitionId [" + BALL_LOG_INFO << clusterData->identity().description() << ": Partition [" << partitionId << "], Deleting storage for queue [" << uri << "], queueKey [" << storage->queueKey() << "] as primary, as it has no outstanding messages."; @@ -2716,7 +2714,7 @@ void StorageUtil::unregisterQueueDispatched( if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << clusterData->identity().description() << ": PartitionId [" + << clusterData->identity().description() << ": Partition [" << partitionId << "] failed to write QueueDeletionRecord for queue [" << uri << "], queueKey [" << storage->queueKey() << "], rc: " << rc @@ -2789,7 +2787,7 @@ int StorageUtil::updateQueue(StorageSpMap* storageMap, if (storageMap->end() == it) { mwcu::Printer printer1(&addedIdKeyPairs); mwcu::Printer printer2(&removedIdKeyPairs); - BALL_LOG_ERROR << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_ERROR << clusterDescription << " Partition [" << partitionId << "]: Error when updating queue '" << uri << "' with addedAppIds: [" << printer1 << "], removedAppIds: [" << printer2 @@ -2967,7 +2965,7 @@ void StorageUtil::registerQueueReplicaDispatched( storageMap->insert(bsl::make_pair(uri, rs_sp)); fs->registerStorage(rs_sp.get()); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] registered [" << uri << "], queueKey [" << queueKey << "] with the storage as replica."; @@ -3023,10 +3021,10 @@ void StorageUtil::unregisterQueueReplicaDispatched( // replication. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << clusterDescription << " PartitionId [" << partitionId - << "]: unaware of uri while deleting " - << "storage for queue [ " << uri << "], queueKey [" << queueKey - << "]. Ignoring this event." << MWCTSK_ALARMLOG_END; + << clusterDescription << " Partition [" << partitionId + << "]: unaware of uri while deleting " << "storage for queue [ " + << uri << "], queueKey [" << queueKey << "]. Ignoring this event." + << MWCTSK_ALARMLOG_END; if (status) { *status = rc_UNKNOWN_QUEUE_URI; } @@ -3041,12 +3039,11 @@ void StorageUtil::unregisterQueueReplicaDispatched( // This really means that cluster state is out of sync across nodes. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << clusterDescription << " PartitionId [" << partitionId - << "]: queueKey mismatch while deleting " - << "storage for queue [ " << uri << "]. Specified queueKey [" - << queueKey << "], queueKey associated with storage [" - << rs->queueKey() << "]. Ignoring this event." - << MWCTSK_ALARMLOG_END; + << clusterDescription << " Partition [" << partitionId + << "]: queueKey mismatch while deleting " << "storage for queue [ " + << uri << "]. Specified queueKey [" << queueKey + << "], queueKey associated with storage [" << rs->queueKey() + << "]. Ignoring this event." << MWCTSK_ALARMLOG_END; if (status) { *status = rc_QUEUE_KEY_MISMATCH; } @@ -3062,7 +3059,7 @@ void StorageUtil::unregisterQueueReplicaDispatched( mqbu::StorageKey::k_NULL_KEY); if (0 != numMsgs) { MWCTSK_ALARMLOG_ALARM("REPLICATION") - << clusterDescription << " PartitionId [" << partitionId + << clusterDescription << " Partition [" << partitionId << "]: Attempt to delete storage for queue [ " << uri << "], queueKey [" << queueKey << "] which has [" << numMsgs << "] outstanding messages." @@ -3081,7 +3078,7 @@ void StorageUtil::unregisterQueueReplicaDispatched( } } - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "], Deleting storage for queue [" << uri << "], queueKey [" << queueKey << "] as replica."; @@ -3106,11 +3103,11 @@ void StorageUtil::unregisterQueueReplicaDispatched( partitionId); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("REPLICATION") - << clusterDescription << " PartitionId [" << partitionId - << "]: Failed to remove virtual storage " - << "for appKey [" << appKey << "] for queue [" << uri - << "] and queueKey [" << queueKey << ", rc: " << rc - << ". Ignoring this event." << MWCTSK_ALARMLOG_END; + << clusterDescription << " Partition [" << partitionId + << "]: Failed to remove virtual storage " << "for appKey [" + << appKey << "] for queue [" << uri << "] and queueKey [" + << queueKey << ", rc: " << rc << ". Ignoring this event." + << MWCTSK_ALARMLOG_END; if (status) { *status = rc_MISC; } @@ -3118,7 +3115,7 @@ void StorageUtil::unregisterQueueReplicaDispatched( return; // RETURN } - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "], Removed virtual storage for appKey [" << appKey << "] for queue [" << uri << "], queueKey [" << queueKey << "] as replica."; @@ -3219,10 +3216,10 @@ void StorageUtil::updateQueueReplicaDispatched( } mwcu::Printer printer(&appIdKeyPairs); - BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId + BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << uri << "], queueKey [" << queueKey - << "] with the storage as replica: " - << "addedIdKeyPairs:" << printer; + << "] with the storage as replica: " << "addedIdKeyPairs:" + << printer; if (status) { *status = rc_SUCCESS; @@ -3248,7 +3245,7 @@ void StorageUtil::setQueueDispatched( StorageSpMapIter it = storageMap->find(uri); if (it == storageMap->end()) { - BALL_LOG_ERROR << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_ERROR << clusterDescription << " Partition [" << partitionId << "]: queue [" << uri << "] not found in storage manager."; return; // RETURN @@ -3350,7 +3347,7 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( if (source == pinfo->primary()) { if (advisory.primaryLeaseId() != pinfo->primaryLeaseId()) { MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << clusterDescription << " PartitionId [" + << clusterDescription << " Partition [" << advisory.partitionId() << "]: received primary advisory: " << advisory << ", from perceived primary: " << source->nodeDescription() @@ -3376,7 +3373,7 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( // Primary status advisory from a different node. MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << clusterDescription << " PartitionId [" + << clusterDescription << " Partition [" << advisory.partitionId() << "]: received primary advisory: " << advisory << ", from: " << source->nodeDescription() @@ -3399,7 +3396,7 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( // partition/primary mapping, so this needs to be reviewed carefully. // See 'ClusterStateMgr::processPrimaryStatusAdvisory' as well. - BALL_LOG_WARN << clusterDescription << " PartitionId [" + BALL_LOG_WARN << clusterDescription << " Partition [" << advisory.partitionId() << "]: received primary advisory: " << advisory << ", from: " << source->nodeDescription() @@ -3408,7 +3405,7 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( << "node as primary."; } - BALL_LOG_INFO << clusterDescription << " PartitionId [" + BALL_LOG_INFO << clusterDescription << " Partition [" << advisory.partitionId() << "]: received primary status advisory: " << advisory << ", from: " << source->nodeDescription(); @@ -3440,7 +3437,7 @@ void StorageUtil::processReplicaStatusAdvisoryDispatched( BSLS_ASSERT_SAFE(pinfo.primary() == clusterData->membership().selfNode()); if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == pinfo.primaryStatus()) { BALL_LOG_INFO << clusterData->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: Received node status: " << status << ", from replica node: " << source->nodeDescription() << ". Self is ACTIVE primary, force-issuing a primary " @@ -3450,7 +3447,7 @@ void StorageUtil::processReplicaStatusAdvisoryDispatched( } else { BALL_LOG_INFO << clusterData->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: not issuing a primary status advisory or SyncPt " << "upon receiving status advisory: " << status << ", from replica: " << source->nodeDescription() @@ -3493,13 +3490,13 @@ void StorageUtil::processShutdownEventDispatched(ClusterData* clusterData, int rc = fs->issueSyncPoint(); if (0 != rc) { BALL_LOG_ERROR - << clusterData->identity().description() << "PartitionId [" + << clusterData->identity().description() << "Partition [" << partitionId << "]: failed to force-issue SyncPt, rc: " << rc; } else { BALL_LOG_INFO - << clusterData->identity().description() << "PartitionId [" + << clusterData->identity().description() << "Partition [" << partitionId << "]: force-issued SyncPt: " << fs->syncPoints().back() << "."; @@ -3507,7 +3504,7 @@ void StorageUtil::processShutdownEventDispatched(ClusterData* clusterData, } else { BALL_LOG_INFO << clusterData->identity().description() - << " PartitionId [" << partitionId + << " Partition [" << partitionId << "]: not issuing a sync point while shutting " << "down because self is not an active primary."; } @@ -3915,7 +3912,7 @@ void StorageUtil::onDomain(const bmqp_ctrlmsg::Status& status, BSLS_ASSERT_SAFE(0 == domain); *out = 0; - BALL_LOG_ERROR << clusterDescription << " PartitionId [" << partitionId + BALL_LOG_ERROR << clusterDescription << " Partition [" << partitionId << "]: Failed to create domain for [" << domainName << "], reason: " << status; } @@ -3963,13 +3960,13 @@ void StorageUtil::forceIssueAdvisoryAndSyncPt(mqbc::ClusterData* clusterData, } int rc = fs->issueSyncPoint(); if (0 == rc) { - BALL_LOG_INFO << clusterData->identity().description() - << "PartitionId [" << fs->config().partitionId() + BALL_LOG_INFO << clusterData->identity().description() << "Partition [" + << fs->config().partitionId() << "]: successfully issued a forced SyncPt."; } else { BALL_LOG_ERROR << clusterData->identity().description() - << "PartitionId [" << fs->config().partitionId() + << "Partition [" << fs->config().partitionId() << "]: failed to force-issue SyncPt, rc: " << rc; } } diff --git a/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp b/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp index e25d5989f..5bef5e820 100644 --- a/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp +++ b/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp @@ -395,7 +395,7 @@ void printFileStoreSummary(bsl::ostream& os, { using namespace mwcu::PrintUtil; - os << indent(level, spacesPerLevel) << "PartitionId [" << partitionId + os << indent(level, spacesPerLevel) << "Partition [" << partitionId << "]:" << newlineAndIndent(level, spacesPerLevel) << "----------------" << newlineAndIndent(level + 1, spacesPerLevel) << "Primary Node: " << summary.primaryNodeDescription() @@ -503,12 +503,12 @@ void printClusterStorageSummary(bsl::ostream& os, ++cit) { if (cit->state() == FileStoreState::CLOSED) { os << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << "PartitionId [" << cit->partitionId() << "]: NOT OPEN."; + << "Partition [" << cit->partitionId() << "]: NOT OPEN."; continue; // CONTINUE } else if (cit->state() == FileStoreState::STOPPING) { os << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << "PartitionId [" << cit->partitionId() << "]: STOPPING."; + << "Partition [" << cit->partitionId() << "]: STOPPING."; continue; // CONTINUE } diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 6b403816e..9e1c53a01 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -269,7 +269,7 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue) d_queue_p->stats()->setQueueContentRaw(numMessage, numByte); BALL_LOG_INFO << "Associated queue [" << queue->uri() << "] with key [" - << queueKey() << "] and PartitionId [" + << queueKey() << "] and Partition [" << queue->partitionId() << "] with its storage having " << mwcu::PrintUtil::prettyNumber(numMessage) << " messages and " @@ -565,9 +565,8 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) RecordHandleMapIter it = d_handles.find(guid); if (it == d_handles.end()) { BALL_LOG_WARN - << "#STORAGE_PURGE_ERROR " - << "PartitionId [" << partitionId() << "]" - << ": Attempting to purge GUID '" << guid + << "#STORAGE_PURGE_ERROR " << "Partition [" << partitionId() + << "]" << ": Attempting to purge GUID '" << guid << "' from virtual storage with appId '" << appId << "' & appKey '" << appKey << "' for queue '" << queueUri() << "' & queueKey '" << queueKey() @@ -580,7 +579,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) // Outstanding refCount for this message is already zero. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << ": Attempting to purge GUID '" << guid << "' from virtual storage with appId '" << appId << "' & appKey '" << appKey << "] for queue '" << queueUri() @@ -609,7 +608,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << "PartitionId [" << partitionId() << "] failed to write " + << "Partition [" << partitionId() << "] failed to write " << "DELETION record for GUID: " << guid << ", for queue '" << d_queueUri << "', queueKey '" << d_queueKey << "' while attempting to purge the message, rc: " << rc @@ -722,7 +721,7 @@ int FileBackedStorage::gcExpiredMessages( secondsFromEpoch); if (0 != rc) { MWCTSK_ALARMLOG_ALARM("FILE_IO") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << " failed to write DELETION record for " << "GUID: " << cit->first << ", for queue '" << d_queueUri << "', queueKey '" << d_queueKey << "' while attempting to GC " @@ -839,7 +838,7 @@ void FileBackedStorage::processMessageRecord( // exists. This is an error. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << " received MESSAGE record for GUID '" << guid << "' for queue '" << queueUri() << "', queueKey '" << queueKey() << "' for which an entry already exists. Ignoring this message." @@ -871,7 +870,7 @@ void FileBackedStorage::processConfirmRecord( RecordHandleMapIter it = d_handles.find(guid); if (it == d_handles.end()) { MWCTSK_ALARMLOG_ALARM("REPLICATION") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << " received CONFIRM record for GUID '" << guid << "' for queue '" << queueUri() << "', queueKey '" << queueKey() << "' for which no entry exists. Ignoring this message." @@ -882,7 +881,7 @@ void FileBackedStorage::processConfirmRecord( if (0 == it->second.d_refCount) { // Outstanding refCount for this message is already zero at this node. MWCTSK_ALARMLOG_ALARM("REPLICATION") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << "' received CONFIRM record for GUID '" << guid << "' for queue '" << queueUri() << "', queueKey '" << queueKey() << "' for which refCount is already zero. Ignoring this message." @@ -901,8 +900,8 @@ void FileBackedStorage::processConfirmRecord( mqbi::StorageResult::Enum rc = d_virtualStorageCatalog.remove(guid, appKey); if (mqbi::StorageResult::e_SUCCESS != rc) { - BALL_LOG_ERROR << "#STORAGE_INVALID_CONFIRM " - << "PartitionId [" << partitionId() << "]" + BALL_LOG_ERROR << "#STORAGE_INVALID_CONFIRM " << "Partition [" + << partitionId() << "]" << "' attempting to confirm GUID '" << guid << "' for appKey '" << appKey << "' which does not exist in its virtual storage, " @@ -919,7 +918,7 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) RecordHandleMapIter it = d_handles.find(guid); if (it == d_handles.end()) { MWCTSK_ALARMLOG_ALARM("REPLICATION") - << "PartitionId [" << partitionId() << "]" + << "Partition [" << partitionId() << "]" << " received DELETION record for GUID '" << guid << "' for queue '" << queueUri() << "', queueKey '" << queueKey() << "' for which no entry exists. Ignoring this message." diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index de99589ab..9ae230b8e 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -5195,7 +5195,7 @@ FileStore::FileStore(const DataStoreConfig& config, BSLS_ASSERT(1 <= clusterSize()); mwcu::MemOutStream os; - os << "PartitionId [" << d_config.partitionId() + os << "Partition [" << d_config.partitionId() << "] (cluster: " << d_cluster_p->name() << "): "; d_partitionDescription.assign(os.str().data(), os.str().length()); diff --git a/src/groups/mqb/mqbs/mqbs_filestoreutil.cpp b/src/groups/mqb/mqbs/mqbs_filestoreutil.cpp index 9af0ea0e5..79db6261c 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreutil.cpp @@ -848,7 +848,7 @@ void FileStoreUtil::deleteArchiveFiles(int partitionId, const unsigned int numFilesToDelete = archivedFiles.size() - numFilesToKeep; - BALL_LOG_INFO << cluster << ": PartitionId [" << partitionId + BALL_LOG_INFO << cluster << ": Partition [" << partitionId << "], deleting " << numFilesToDelete << " files."; for (unsigned int i = 0; i < numFilesToDelete; ++i) { @@ -857,14 +857,14 @@ void FileStoreUtil::deleteArchiveFiles(int partitionId, MWCTSK_ALARMLOG_ALARM("FILE_IO") << cluster << ": Failed to remove [" << archivedFiles[i] << "] file during archived storage cleanup for " - << "PartitionId [" << partitionId << "], rc: " << rc + << "Partition [" << partitionId << "], rc: " << rc << MWCTSK_ALARMLOG_END; continue; // CONTINUE } BALL_LOG_INFO << cluster << ": Removed file [" << archivedFiles[i] << "] during archived storage cleanup for " - << "PartitionId [" << partitionId << "]."; + << "Partition [" << partitionId << "]."; } } @@ -1083,7 +1083,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, return 10 * rc + rc_FILE_SET_RETRIEVAL_FAILURE; // RETURN } - BALL_LOG_INFO << "PartitionId [" << partitionId << "]: Number of file " + BALL_LOG_INFO << "Partition [" << partitionId << "]: Number of file " << "sets found for potential recovery: " << fileSets.size(); if (fileSets.empty()) { @@ -1103,7 +1103,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, const bsls::Types::Int64 journalFileSize = fs.journalFileSize(); const bsls::Types::Int64 dataFileSize = fs.dataFileSize(); - BALL_LOG_INFO << "PartitionId [" << partitionId << "]" + BALL_LOG_INFO << "Partition [" << partitionId << "]" << ": Checking file set: " << fs; mwcu::MemOutStream errorDesc; @@ -1131,7 +1131,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, } if (rc != 0) { - BALL_LOG_WARN << "PartitionId [" << partitionId + BALL_LOG_WARN << "Partition [" << partitionId << "]: file set: " << fs << " failed to open. Reason: " << errorDesc.str() << ", rc: " << rc; @@ -1146,7 +1146,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, if (rc != 0) { // Close this set before checking others, if any. - BALL_LOG_ERROR << "PartitionId [" << partitionId + BALL_LOG_ERROR << "Partition [" << partitionId << "]: file set: " << fs << " validation failed, rc: " << rc; FileSystemUtil::close(journalFd); @@ -1168,7 +1168,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, } else { // No valid first sync point record in this file. - BALL_LOG_INFO << "PartitionId [" << partitionId << "]" + BALL_LOG_INFO << "Partition [" << partitionId << "]" << ": No valid first sync point found in journal" << "file [" << fs.journalFile() << "], rc: " << rc; @@ -1204,7 +1204,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, } // Found a recoverable set. Archive the remaining file sets. - BALL_LOG_INFO << "PartitionId [" << partitionId << "]: archiving " + BALL_LOG_INFO << "Partition [" << partitionId << "]: archiving " << archivingIndices.size() << " file sets."; for (unsigned int i = 0; i < archivingIndices.size(); ++i) { @@ -1217,7 +1217,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, rc = FileSystemUtil::move(archivingFileSet.dataFile(), config.archiveLocation()); if (rc != 0) { - BALL_LOG_WARN << "PartitionId [" << partitionId << "]: Failed to " + BALL_LOG_WARN << "Partition [" << partitionId << "]: Failed to " << "archive data file [" << archivingFileSet.dataFile() << "], rc: " << rc; } @@ -1225,7 +1225,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, rc = FileSystemUtil::move(archivingFileSet.qlistFile(), config.archiveLocation()); if (0 != rc) { - BALL_LOG_WARN << "PartitionId [" << partitionId << "]: Failed to " + BALL_LOG_WARN << "Partition [" << partitionId << "]: Failed to " << "archive qlist file [" << archivingFileSet.qlistFile() << "], rc: " << rc; } @@ -1233,7 +1233,7 @@ int FileStoreUtil::openRecoveryFileSet(bsl::ostream& errorDescription, rc = FileSystemUtil::move(archivingFileSet.journalFile(), config.archiveLocation()); if (0 != rc) { - BALL_LOG_WARN << "PartitionId [" << partitionId << "]: Failed to " + BALL_LOG_WARN << "Partition [" << partitionId << "]: Failed to " << "archive journal file [" << archivingFileSet.journalFile() << "], rc: " << rc; } diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index e66af2b4f..896ea87a6 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -122,7 +122,7 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue) d_queue_p->stats()->setQueueContentRaw(numMessage, numByte); BALL_LOG_INFO << "Associated queue [" << queue->uri() << "] with key [" - << queueKey() << "] and PartitionId [" + << queueKey() << "] and Partition [" << queue->partitionId() << "] with its storage having [" << mwcu::PrintUtil::prettyNumber(numMessage) << " messages and " diff --git a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp index 205919e13..1d268a495 100644 --- a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp @@ -153,7 +153,7 @@ void StoragePrintUtil::printRecoveredStorages( // Needed to protect access to `storageMap` and its elements. bslmt::LockGuard guard(storagesLock); // LOCK - out << clusterDescription << ": PartitionId [" << partitionId + out << clusterDescription << ": Partition [" << partitionId << "]: Number of recovered storages: " << storageMap.size() << ". Time taken for recovery: " << mwcu::PrintUtil::prettyTimeInterval( @@ -207,7 +207,7 @@ bool StoragePrintUtil::printStorageRecoveryCompletion( for (unsigned int i = 0; i < fileStores.size(); ++i) { const bool isOpen = fileStores[i]->isOpen(); const char* isOpenStr = isOpen ? "opened" : "closed"; - out << "\nPartitionId [" << i << "] status: " << isOpenStr; + out << "\nPartition [" << i << "] status: " << isOpenStr; success = success && isOpen; } From 4a46378e3a696173f048b463a5bfc23119def11f Mon Sep 17 00:00:00 2001 From: Chris Beard Date: Mon, 30 Sep 2024 14:15:03 -0400 Subject: [PATCH 2/3] Fix[MQB]: Add missing epoch to TTL logline (#430) I noticed that we print two timestamps, but only one epoch. Adding this epoch makes it a little easier to reason about the timestamps being logged in the context of TTL. Signed-off-by: Christopher Beard --- src/groups/mqb/mqbs/mqbs_filestore.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 9ae230b8e..906008dee 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -7020,6 +7020,7 @@ bool FileStore::gcExpiredMessages(const bdlt::Datetime& currentTimeUtc) << "Timestamp (UTC) of the latest encountered message: " << bdlt::EpochUtil::convertFromTimeT64( latestMsgTimestamp) + << " (Epoch: " << latestMsgTimestamp << "). Current time (UTC): " << currentTimeUtc << " (Epoch: " << currentSecondsFromEpoch << ")." << " Num messages remaining in the storage: " From 136ec3d77956a9360b81053a100e1760cc27e013 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Tue, 1 Oct 2024 00:38:59 +0300 Subject: [PATCH 3/3] Fix[bmqt_messageguid.h]: fix possible UB due to misalignment (#423) Signed-off-by: Evgeny Malygin Co-authored-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/bmq/bmqt/bmqt_messageguid.h | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/groups/bmq/bmqt/bmqt_messageguid.h b/src/groups/bmq/bmqt/bmqt_messageguid.h index bd72735c5..2f83aab85 100644 --- a/src/groups/bmq/bmqt/bmqt_messageguid.h +++ b/src/groups/bmq/bmqt/bmqt_messageguid.h @@ -363,11 +363,14 @@ MessageGUIDHashAlgo::operator()(const void* data, } }; - const bsls::Types::Uint64* start = - reinterpret_cast(data); - const bsls::Types::Uint64 h1 = LocalFuncs::mix(start[0]); - const bsls::Types::Uint64 h2 = LocalFuncs::mix(start[1]); - d_result = LocalFuncs::combine(h1, h2); + // `data` buffer might not be aligned to 8 bytes, so recasting the pointer + // might lead to UB + bsls::Types::Uint64 parts[2]; + bsl::memcpy(parts, data, bmqt::MessageGUID::e_SIZE_BINARY); + + parts[0] = LocalFuncs::mix(parts[0]); + parts[1] = LocalFuncs::mix(parts[1]); + d_result = LocalFuncs::combine(parts[0], parts[1]); } inline MessageGUIDHashAlgo::result_type MessageGUIDHashAlgo::computeHash()