Skip to content

Commit

Permalink
fix: generate AppKey only once
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Oct 18, 2024
1 parent 8082564 commit 953c072
Show file tree
Hide file tree
Showing 47 changed files with 583 additions and 553 deletions.
26 changes: 14 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2228,10 +2228,8 @@ void Cluster::onRecoveryStatusDispatched(
BSLS_ASSERT_SAFE(itMp->storage()->partitionId() ==
static_cast<int>(pid));
if (isCSLModeEnabled()) {
AppIdKeyPairs appIdKeyPairs;
itMp->storage()->loadVirtualStorageDetails(&appIdKeyPairs);
AppIdInfos appIdInfos(appIdKeyPairs.cbegin(),
appIdKeyPairs.cend());
AppInfos appIdInfos;
itMp->storage()->loadVirtualStorageDetails(&appIdInfos);

d_clusterOrchestrator.registerQueueInfo(
uri,
Expand All @@ -2245,7 +2243,7 @@ void Cluster::onRecoveryStatusDispatched(
uri,
pid,
itMp->storage()->queueKey(),
AppIdInfos(),
AppInfos(),
false); // Force-update?
}

Expand Down Expand Up @@ -2862,18 +2860,22 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
}

// Compute list of added and removed App IDs.
bsl::vector<bsl::string> oldCfgAppIds(oldDefn.mode().fanout().appIDs(),
d_allocator_p);
bsl::vector<bsl::string> newCfgAppIds(newDefn.mode().fanout().appIDs(),
d_allocator_p);

bsl::vector<bsl::string> addedIds, removedIds;
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldDefn.mode().fanout().appIDs().cbegin(),
oldDefn.mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
newDefn.mode().fanout().appIDs().cbegin(),
newDefn.mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bsl::vector<bsl::string>::const_iterator it = addedIds.begin();
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId,
Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,9 @@ class Cluster : public mqbi::Cluster,

private:
// PRIVATE TYPES
typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs;

typedef mqbc::ClusterStatePartitionInfo ClusterStatePartitionInfo;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

typedef mqbc::ClusterMembership::ClusterNodeSessionSp ClusterNodeSessionSp;

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void ClusterOrchestrator::processBufferedQueueAdvisories()
void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate)
{
// executed by the *DISPATCHER* thread
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ClusterOrchestrator {

typedef bdlmt::EventScheduler::RecurringEventHandle RecurringEventHandle;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -516,7 +516,7 @@ class ClusterOrchestrator {
void registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate);

/// Executed by any thread.
Expand Down
82 changes: 35 additions & 47 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,29 +140,29 @@ void createQueueUriKey(bmqt::Uri* out,
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
Expand Down Expand Up @@ -2153,27 +2153,18 @@ bsl::shared_ptr<mqbi::Queue> ClusterQueueHelper::createQueueFactory(
// queue but the queue is never opened, it will not be registered with
// the StorageMgr. This is ok.

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
context.d_queueContext_p->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
appIdKeyPairs,
context.d_domain_p);
}
else {
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
context.d_domain_p);
}
// Use keys in the CSL instead of generating new ones to keep CSL and
// non-CSL consistent.

const AppInfos& appIdInfos =
context.d_queueContext_p->d_stateQInfo_sp->appInfos();

d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
appIdInfos,
context.d_domain_p);

// Queue must have been registered with storage manager before
// registering it with the domain, otherwise Queue.configure() will
Expand Down Expand Up @@ -3700,25 +3691,22 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
// 'createQueueFactory').

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
queueContext->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());
const AppInfos& appIdInfos =
queueContext->d_stateQInfo_sp->appInfos();

d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
appIdKeyPairs,
appIdInfos,
qinfo.d_queue_sp->domain());
}
else {
d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
mqbi::Storage::AppInfos(),
qinfo.d_queue_sp->domain());
}

Expand Down Expand Up @@ -4110,6 +4098,7 @@ void ClusterQueueHelper::onQueueAssigned(
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
// REVISIT
return; // RETURN
}

Expand Down Expand Up @@ -4230,14 +4219,11 @@ void ClusterQueueHelper::onQueueAssigned(
->domain(),
true); // allowDuplicate

const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
info.appIdInfos().cbegin(),
info.appIdInfos().cend());
d_storageManager_p->updateQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
appIdKeyPairs,
info.appInfos(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
Expand Down Expand Up @@ -4396,8 +4382,8 @@ void ClusterQueueHelper::onQueueUnassigned(

void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds)
const AppInfos& addedAppIds,
const AppInfos& removedAppIds)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -4425,19 +4411,21 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.
mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second);
mqbi::Storage::AppIdKeyPairs appIdKeyPairs(1, appIdKeyPair);

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
appIdKeyPairs,
one,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
Expand All @@ -4451,7 +4439,7 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

for (AppIdInfosCIter cit = removedAppIds.cbegin();
for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
Expand All @@ -4472,8 +4460,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

mwcu::Printer<AppIdInfos> printer1(&addedAppIds);
mwcu::Printer<AppIdInfos> printer2(&removedAppIds);
mwcu::Printer<AppInfos> printer1(&addedAppIds);
mwcu::Printer<AppInfos> printer2(&removedAppIds);
BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri
<< ", addedAppIds: " << printer1
<< ", removedAppIds: " << printer2;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// queue which have a proper valid unique queueId.
typedef bsl::unordered_map<int, QueueContext*> QueueContextByIdMap;

typedef AppIdInfos::const_iterator AppIdInfosCIter;
typedef AppInfos::const_iterator AppInfosCIter;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -998,8 +998,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// dispatcher thread.
virtual void onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds = AppIdInfos())
const AppInfos& addedAppIds,
const AppInfos& removedAppIds = AppInfos())
BSLS_KEYWORD_OVERRIDE;

private:
Expand Down
10 changes: 5 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,12 @@ void ClusterStateManager::onLeaderSyncDataQueryResponse(
const mqbu::StorageKey receivedKey(
mqbu::StorageKey::BinaryRepresentation(),
queueInfo.key().data());
AppIdInfos appIdInfos;
AppInfos appIdInfos;
for (bsl::vector<bmqp_ctrlmsg::AppIdInfo>::const_iterator cit =
queueInfo.appIds().cbegin();
cit != queueInfo.appIds().cend();
++cit) {
AppIdInfo appIdInfo;
AppInfo appIdInfo;
appIdInfo.first = cit->appId();
appIdInfo.second.fromBinary(cit->appKey().data());

Expand Down Expand Up @@ -1181,7 +1181,7 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri,
void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate)
{
// executed by the *DISPATCHER* thread
Expand Down Expand Up @@ -1695,7 +1695,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
uri,
queueKey,
queueInfo.partitionId(),
AppIdInfos());
AppInfos());
BSLS_ASSERT_SAFE(rc == false);
}
else {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory(
d_state_p->assignQueue(uri,
queueKey,
queueInfo.partitionId(),
AppIdInfos());
AppInfos());

d_state_p->domainStates()
.at(uri.qualifiedDomain())
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class ClusterStateManager : public mqbc::ClusterStateObserver,

typedef bsl::vector<QueueAdvisoryAndSource> QueueAdvisories;

typedef mqbc::ClusterStateQueueInfo::AppIdInfo AppIdInfo;
typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfo AppInfo;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

typedef mqbc::ClusterState::UriToQueueInfoMap UriToQueueInfoMap;
typedef mqbc::ClusterState::UriToQueueInfoMapCIter UriToQueueInfoMapCIter;
Expand Down Expand Up @@ -381,7 +381,7 @@ class ClusterStateManager : public mqbc::ClusterStateObserver,
virtual void registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate) BSLS_KEYWORD_OVERRIDE;

/// Unassign the queue in the specified `advisory` by applying the
Expand Down
Loading

0 comments on commit 953c072

Please sign in to comment.