Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring App registration #493

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,7 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
oldCfgAppIds,
newCfgAppIds);

// TODO: This should be one call - one QueueUpdateAdvisory for all Apps
kaikulimu marked this conversation as resolved.
Show resolved Hide resolved
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
Expand Down
78 changes: 35 additions & 43 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,27 @@ void createQueueUriKey(bmqt::Uri* out,
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfos& appInfos)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
queue->queueEngine()->afterAppIdRegistered(appInfos);
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfos& appInfos)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
queue->queueEngine()->afterAppIdUnregistered(appInfos);
}

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
Expand Down Expand Up @@ -4375,10 +4373,6 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
return; // RETURN
}

if (!uri.isValid()) {
// This is an appID update for the entire domain, instead of any
// individual queue. Nothing to do for the queue helper.
Expand All @@ -4394,53 +4388,51 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
one,
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
*cit),
queue);

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(
partitionId,
uri,
qiter->second->key(),
cit->second);
}
}
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(partitionId,
uri,
qiter->second->key(),
cit->second);
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
*cit),
queue);
}
if (queue) {
// TODO: replace with one call
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
addedAppIds),
queue);

d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
removedAppIds),
queue);
}

bmqu::Printer<AppInfos> printer1(&addedAppIds);
Expand Down
71 changes: 0 additions & 71 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,6 @@ void queueHolderDummy(const bsl::shared_ptr<mqbi::Queue>& queue)
BALL_LOG_INFO << "Deleted queue '" << queue->uri().canonical() << "'";
}

void afterAppIdRegisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}

void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

// Note: Inputing nullKey here is okay since this routine will be removed
// when we switch to CSL workflow.
queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}

/// Validates an application subscription.
bool validdateSubscriptionExpression(bsl::ostream& errorDescription,
const mqbconfm::Expression& expression,
Expand Down Expand Up @@ -463,51 +437,6 @@ int Domain::configure(bsl::ostream& errorDescription,
BSLS_ASSERT_OPT(oldConfig.has_value());
BSLS_ASSERT_OPT(d_config.has_value());

// In non-CSL mode, manually dispatch AppId registration callbacks.
if (!d_cluster_sp->isCSLModeEnabled() &&
d_config.value().mode().isFanoutValue()) {
// Compute list of added and removed App IDs.
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldConfig.value().mode().fanout().appIDs().cbegin(),
oldConfig.value().mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
d_config.value().mode().fanout().appIDs().cbegin(),
d_config.value().mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);

// Invoke callbacks for each added and removed ID on each queue.
bsl::unordered_set<bsl::string>::const_iterator it =
addedIds.cbegin();
QueueMap::const_iterator qIt;
for (; it != addedIds.cend(); it++) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
}

// Notify the 'cluster' of the updated configuration, so it can write
// any needed update-advisories to the CSL.
d_cluster_sp->onDomainReconfigured(*this,
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_allocator_p);
}

rc = d_queueEngine_mp->configure(errorDescription);
rc = d_queueEngine_mp->configure(errorDescription, isReconfigure);
if (rc != 0) {
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ void Queue::convertToLocalDispatched()

d_state.setId(bmqp::QueueId::k_PRIMARY_QUEUE_ID);
createLocal();
rc = d_localQueue_mp->configure(errorDescription, true);
rc = d_localQueue_mp->configure(errorDescription, false);
kaikulimu marked this conversation as resolved.
Show resolved Hide resolved
if (rc != 0) {
BALL_LOG_ERROR
<< "#QUEUE_CONVERTION_FAILURE " << d_state.uri()
Expand Down Expand Up @@ -483,7 +483,6 @@ Queue::Queue(const bmqt::Uri& uri,
// storage.

d_state.setStorageManager(storageManager)
.setAppKeyGenerator(storageManager)
.setMiscWorkThreadPool(threadPool)
.setRoutingConfig(routingCfg)
.setMessageThrottleConfig(messageThrottleConfig);
Expand Down
34 changes: 12 additions & 22 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,26 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value)
return *this;
}

void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key)
void QueueConsumptionMonitor::registerSubStream(const bsl::string& id)
{
// Should always be called from the queue thread, but will be invoked from
// the cluster thread once upon queue creation.

// PRECONDITIONS
BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY ||
d_subStreamInfos.empty());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) ==
d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(id) == d_subStreamInfos.end());

d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo()));
d_subStreamInfos.insert(bsl::make_pair(id, SubStreamInfo()));
}

void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key)
void QueueConsumptionMonitor::unregisterSubStream(const bsl::string& id)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

SubStreamInfoMapConstIter iter = d_subStreamInfos.find(key);
SubStreamInfoMapConstIter iter = d_subStreamInfos.find(id);
BSLS_ASSERT_SAFE(iter != d_subStreamInfos.end());
d_subStreamInfos.erase(iter);
}
Expand Down Expand Up @@ -231,7 +227,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
iter != last;
++iter) {
SubStreamInfo& info = iter->second;
const mqbu::StorageKey& appKey = iter->first;
const bsl::string& id = iter->first;
if (info.d_messageSent) {
// Queue is 'alive' because at least one message was sent
// since the last 'timer'.
Expand All @@ -241,7 +237,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)

if (info.d_state == State::e_IDLE) {
// object was in idle state
onTransitionToAlive(&info, appKey);
onTransitionToAlive(&info, id);
continue; // CONTINUE
}

Expand All @@ -253,7 +249,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
// No delivered messages in the last 'maxIdleTime'.

// Call callback to log alarm if there are undelivered messages.
const bool haveUndelivered = d_loggingCb(appKey,
const bool haveUndelivered = d_loggingCb(id,
info.d_state ==
State::e_ALIVE);

Expand All @@ -269,16 +265,15 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
// so transition to alive.
if (info.d_state == State::e_IDLE) {
info.d_lastKnownGoodTimer = d_currentTimer;
onTransitionToAlive(&info, appKey);
onTransitionToAlive(&info, id);
}
}
}
}
}

void QueueConsumptionMonitor::onTransitionToAlive(
SubStreamInfo* subStreamInfo,
const mqbu::StorageKey& appKey)
void QueueConsumptionMonitor::onTransitionToAlive(SubStreamInfo* subStreamInfo,
const bsl::string& id)
{
// executed by the *QUEUE DISPATCHER* thread

Expand All @@ -291,12 +286,7 @@ void QueueConsumptionMonitor::onTransitionToAlive(
bdlma::LocalSequentialAllocator<2048> localAllocator(0);

bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
bsl::string appId;

if (!appKey.isNull() &&
d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
uriBuilder.setId(appId);
}
uriBuilder.setId(id);

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);
Expand Down
Loading
Loading