-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generating unique Subscription ids for each request #135
Conversation
To @678098, assuming that this is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
@@ -5239,7 +5245,13 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue, | |||
|
|||
bmqp_ctrlmsg::Subscription subscription(d_allocator_p); | |||
|
|||
subscription.sId() = cit->first.id(); | |||
static bsls::AtomicUint s_nextId(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This static local variable initialization is not thread safe in general. The probability to observe a bug is small though.
Here are some details and advices:
https://bloomberg.github.io/bde-resources/doxygen/bde_api_prod/group__bslmt__once.html#_details
And here is how we did a safe initialization for another component:
blazingmq/src/groups/bmq/bmqt/bmqt_correlationid.cpp
Lines 33 to 42 in 08a6d16
CorrelationId CorrelationId::autoValue() | |
{ | |
static bsls::AtomicInt* g_id_p = 0; // A unique id for each AutoValue | |
BSLMT_ONCE_DO | |
{ | |
g_id_p = new bsls::AtomicInt(0); | |
// Heap allocate it to prevent 'exit-time-destructor needed' compiler | |
// warning. Causes valgrind-reported memory leak. | |
} |
subscription.sId() = cit->first.id(); | ||
static bsls::AtomicUint s_nextId(0); | ||
|
||
unsigned int internalSubscriptionId = s_nextId.add(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsigned int internalSubscriptionId = s_nextId.add(1); | |
const unsigned int internalSubscriptionId = s_nextId.add(1); |
src/groups/bmq/bmqimp/bmqimp_event.h
Outdated
@@ -500,7 +500,7 @@ class Event { | |||
/// event's type() is MESSAGEEVENT, 'messageEventMode()' is READ and the | |||
/// underlying raw event is of type ACK, PUT or PUSH. | |||
void addCorrelationId(const bmqt::CorrelationId& correlationId, | |||
unsigned int subscriptionId = | |||
unsigned int subscriptionHandleId = | |||
bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value of bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID
is a good default to initialize handle id in this case, but these integers have different meanings now, so might be good to introduce another constant k_DEFAULT_SUBSCRIPTION_HANDLE_ID
.
@@ -363,8 +352,9 @@ class QueueManager { | |||
/// `correlationId`, and return a shared pointer to the Queue object (if | |||
/// found), or an empty shared pointer (if not found). | |||
QueueSp lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId, | |||
int queueId, | |||
unsigned int subscriptionId) const; | |||
unsigned int* subscriptionHandle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have subscriptionHandle
in some places like here, and subscriptionHandleId
in other places.
|
||
SubscriptionHandle result(cit->second); | ||
|
||
d_registeredInternalSubscriptionIds.erase(cit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct that Queue
is accessed from one thread only? So modifications of d_registeredInternalSubscriptionIds
container do not break anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is the FSM thread. So is createConfigureQueueContext
! So, there is no need for atomic or Once
(or static).
Will commit new changes.
Signed-off-by: Vitaly Dzhitenov <[email protected]>
Signed-off-by: Vitaly Dzhitenov <[email protected]>
Signed-off-by: Vitaly Dzhitenov <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -43,7 +43,7 @@ const int Subscription::k_DEFAULT_CONSUMER_PRIORITY = 0; | |||
|
|||
unsigned int SubscriptionHandle::nextId() | |||
{ | |||
static bsls::AtomicUint s_id = 0; | |||
static bsls::AtomicUint s_id = k_INVALID_HANDLE_ID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created an issue about this line. It's out of the scope of this PR and could be fixed in another PR.
Issue number of the reported bug or feature request: #133
2 Subscription Ids is not enough; we need 3.
In the order of increasing uniqueness:
CorrelationId
- user provided.SubscriptionHandle
- uniquely identifies a Subscription inQueueOptions
. But if reused for different App in the same queue, creates the issueThe existing logic involved storing request context associated with subscriptions (like,
CorrelationId
) until response arrives which makes the context effective. The logic is slightly refactored.Ideally, the context belongs to
BrokerSession::RequestManagerType::RequestSp
but that calls for a bigger refactoring of the SDK.