Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix[mqbblp::RemoteQueue]: 0 deduplication timeout causes expired PUTs #494

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@
namespace BloombergLP {
namespace mqbblp {

namespace {

/// The default timeout for scheduled PUT expiration clean-up event.
static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES = 5;
static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS =
k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES *
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE;

} // close unnamed namespace

// -----------------
// class RemoteQueue
// -----------------
Expand Down Expand Up @@ -366,7 +376,7 @@ void RemoteQueue::pushMessage(

if (result != mqbi::StorageResult::e_SUCCESS) {
if (d_throttledFailedPushMessages.requestPermission()) {
BALL_LOG_WARN << d_state_p->uri()
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri()
<< " failed to store broadcast PUSH ["
<< msgGUID << "], result = " << result;
}
Expand Down Expand Up @@ -483,6 +493,18 @@ RemoteQueue::RemoteQueue(QueueState* state,
os << '@' << d_state_p->uri().asString();
d_state_p->setDescription(os.str());

if (deduplicationTimeMs <= 0) {
d_pendingPutsTimeoutNs = k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS;
BALL_LOG_WARN << "Remote queue [" << d_state_p->description()
<< "]: cannot schedule PUT expiration timer with a "
<< "non-positive timeout from config ["
<< deduplicationTimeMs << " ms], use a default PUT "
<< "expiration timeout for scheduler instead ["
<< bmqu::PrintUtil::prettyTimeInterval(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's print the minutes, easier to read?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty time interval prints 5.00 m, you want to print the full 5 minutes instead, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is 5.00 m, then it is ok. Just did not want long string of zeroes

d_pendingPutsTimeoutNs)
<< "]";
}

BALL_LOG_INFO << "Remote queue: " << d_state_p->uri()
<< " [id: " << d_state_p->id() << "]";
}
Expand Down Expand Up @@ -676,10 +698,10 @@ void RemoteQueue::onHandleReleased(
it != d_pendingConfirms.end();) {
if (it->d_handle == handle.get()) {
if (d_throttledFailedConfirmMessages.requestPermission()) {
BALL_LOG_WARN << "Dropping CONFIRM because downstream ["
<< handle << "] is gone. [queue: '"
<< d_state_p->description() << "', GUID: '"
<< it->d_guid << "']";
BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because "
<< "downstream [" << handle << "] is gone. "
<< "[queue: '" << d_state_p->description()
<< "', GUID: '" << it->d_guid << "']";
}
it = d_pendingConfirms.erase(it);
++numProcessed;
Expand Down Expand Up @@ -865,7 +887,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,

if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN
<< "#CLIENT_IMPROPER_BEHAVIOR "
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR "
<< "Failed PUT message for queue [" << d_state_p->uri()
<< "] from client [" << source->client()->description()
<< "]. Queue not opened in WRITE mode by the client.";
Expand Down Expand Up @@ -1152,7 +1174,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event)

if (d_throttledFailedAckMessages.requestPermission()) {
BALL_LOG_STREAM(severity)
<< "Received ACK message [" << ackResult
<< "[THROTTLED] Received ACK message [" << ackResult
<< ", queue: " << d_state_p->description()
<< "] for unknown guid: " << ackMessage.messageGUID();
}
Expand Down Expand Up @@ -1313,7 +1335,7 @@ void RemoteQueue::expirePendingMessagesDispatched()

if (numExpired) {
if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired "
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired "
<< bmqu::PrintUtil::prettyNumber(numExpired)
<< " pending PUT messages ("
<< bmqu::PrintUtil::prettyNumber(numMessages -
Expand Down
Loading