Skip to content

Commit

Permalink
Use BrokerSession::d_nextInternalSubscriptionId
Browse files Browse the repository at this point in the history
  • Loading branch information
dorjesinpo committed Oct 26, 2023
1 parent 0c2681e commit d0c12fe
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5245,9 +5245,8 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,

bmqp_ctrlmsg::Subscription subscription(d_allocator_p);

static bsls::AtomicUint s_nextId(0);

unsigned int internalSubscriptionId = s_nextId.add(1);
const unsigned int internalSubscriptionId =
++d_nextInternalSubscriptionId;

subscription.sId() = internalSubscriptionId;
// Using unique id instead of 'SubscriptionHandle::id()'
Expand Down Expand Up @@ -5636,6 +5635,7 @@ BrokerSession::BrokerSession(
, d_messageExpirationTimeoutHandle()
, d_nextRequestGroupId(k_NON_BUFFERED_REQUEST_GROUP_ID)
, d_queueRetransmissionTimeoutMap(allocator)
, d_nextInternalSubscriptionId(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_scheduler_p->clockType() ==
Expand Down
3 changes: 3 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,9 @@ class BrokerSession BSLS_CPP11_FINAL {
// retransmission timeout provided by
// the broker

unsigned int d_nextInternalSubscriptionId;
// Assists generating unique ids for Configure requests.

private:
// NOT IMPLEMENTED
BrokerSession(const BrokerSession&);
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqimp/bmqimp_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ class Event {
/// underlying raw event is of type ACK, PUT or PUSH.
void addCorrelationId(const bmqt::CorrelationId& correlationId,
unsigned int subscriptionHandleId =
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID);
bmqt::SubscriptionHandle::k_INVALID_HANDLE_ID);

/// Insert the specified `queue` to the queues and the specified
/// `corrId` to the list of correlationIds associated with this event.
Expand Down
11 changes: 6 additions & 5 deletions src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,16 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event)
unsigned int subscriptionId;
bmqp::RdaInfo rdaInfo;
bmqt::CorrelationId correlationId;
unsigned int subscriptionHandle;
unsigned int subscriptionHandleId;

iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo);

QueueManager::QueueSp queue =
d_queueManager_p->lookupQueueBySubscriptionId(&correlationId,
&subscriptionHandle,
qId,
subscriptionId);
d_queueManager_p->lookupQueueBySubscriptionId(
&correlationId,
&subscriptionHandleId,
qId,
subscriptionId);
BSLS_ASSERT_SAFE(queue);

out << "PUSH Message #" << ++msgNum << ": "
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ void QueueManager::resetState()

const QueueManager::QueueSp
QueueManager::observePushEvent(bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandle,
unsigned int* subscriptionHandleId,
const bmqp::EventUtilQueueInfo& info)
{
// Update stats
const QueueSp queue = lookupQueueBySubscriptionIdLocked(
correlationId,
subscriptionHandle,
subscriptionHandleId,
info.d_header.queueId(),
info.d_subscriptionId);

Expand Down
8 changes: 4 additions & 4 deletions src/groups/bmq/bmqimp/bmqimp_queuemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class QueueManager {
const bmqp::PushMessageIterator& iterator);

const QueueSp observePushEvent(bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandle,
unsigned int* subscriptionHandleId,
const bmqp::EventUtilQueueInfo& info);

/// Update stats for the queue(s) corresponding to the messages pointed
Expand Down Expand Up @@ -352,7 +352,7 @@ class QueueManager {
/// `correlationId`, and return a shared pointer to the Queue object (if
/// found), or an empty shared pointer (if not found).
QueueSp lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandle,
unsigned int* subscriptionHandleId,
int queueId,
unsigned int subscriptionId) const;

Expand Down Expand Up @@ -438,14 +438,14 @@ QueueManager::lookupQueue(const bmqp::QueueId& queueId) const

inline QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionId(
bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandle,
unsigned int* subscriptionHandleId,
int queueId,
unsigned int internalSubscriptionId) const
{
bsls::SpinLockGuard guard(&d_queuesLock); // d_queuesLock LOCKED

return lookupQueueBySubscriptionIdLocked(correlationId,
subscriptionHandle,
subscriptionHandleId,
queueId,
internalSubscriptionId);
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_optionsview.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ inline OptionsView::Iterator::Iterator(const OptionsView* optionsView,
const unsigned int offset)
: d_optionsView_p(optionsView)
, d_offset(offset)
, d_value(static_cast<const bmqp::OptionType::Enum>(d_offset))
, d_value(static_cast<bmqp::OptionType::Enum>(d_offset))
{
}

Expand Down
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ struct Protocol {
// RemainingDeliveryAttempts counter value.

static const unsigned int k_DEFAULT_SUBSCRIPTION_ID = 0;
// Internal unique id in Configure request

// CLASS METHODS

Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqt/bmqt_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const int Subscription::k_DEFAULT_CONSUMER_PRIORITY = 0;

unsigned int SubscriptionHandle::nextId()
{
static bsls::AtomicUint s_id = 0;
static bsls::AtomicUint s_id = k_INVALID_HANDLE_ID;

return ++s_id;
}
Expand Down
6 changes: 5 additions & 1 deletion src/groups/bmq/bmqt/bmqt_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class SubscriptionHandle {
friend class bmqa::MessageImpl;
friend class bmqa::MessageIterator;

public:
static const unsigned int k_INVALID_HANDLE_ID = 0;
// Initial (invalid) value for 'bmqt::SubscriptionHandle::d_id'

private:
// PRIVATE DATA
unsigned int d_id;
Expand Down Expand Up @@ -281,7 +285,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs);
// ----------------------

inline SubscriptionHandle::SubscriptionHandle()
: d_id(0)
: d_id(k_INVALID_HANDLE_ID)
, d_correlationId()
{
// NOTHING
Expand Down

0 comments on commit d0c12fe

Please sign in to comment.