Skip to content

Commit

Permalink
Merge branch 'bloomberg:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-e1off authored Oct 31, 2024
2 parents 6a523b2 + f10d75f commit 45185bb
Show file tree
Hide file tree
Showing 40 changed files with 4,272 additions and 2,469 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ jobs:
--reruns=3 \
-n 4 -v
- name: Upload failure-logs as artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: failure_logs_${{ matrix.mode }}_${{ matrix.cluster }}
path: ${{ github.workspace }}/src/integration-tests/failure-logs
retention-days: 5

fuzz_tests_ubuntu:
name: Fuzz test [${{ matrix.request }}]
strategy:
Expand Down
53 changes: 53 additions & 0 deletions .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: Generate documentation

on:
push:
branches:
- main
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
documentation-main:
name: Build Doxygen documentation on `main`

runs-on: ubuntu-latest

permissions:
# Let the default GITHUB_TOKEN commit and push.
contents: write

steps:
- name: Checkout `main`
uses: actions/checkout@v4

- name: Clear out `docs/` subdirectory
run: rm -rf docs

- name: Checkout `gh-pages` into `docs/`
uses: actions/checkout@v4
with:
path: docs
ref: gh-pages

- name: Set up dependencies
run: |
sudo apt-get update
sudo apt-get install -qy doxygen
- name: Build documentation
# Best way to pass Doxygen config overrides on the command line is
# using stdin.
run: |
( cat Doxyfile ; echo "PROJECT_NUMBER=${{ github.sha }}" ) | doxygen -
- name: Commit new API documentation to `gh-pages`
run: |
cd docs/
git config --global user.name "${{ github.actor }}"
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
git commit -s -am "Update C++ API docs from commit ${{ github.sha }} on main"
git push
2 changes: 1 addition & 1 deletion src/groups/bmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ set(BMQ_PRIVATE_PACKAGES
bmqst
bmqstm
bmqsys
bmqt
bmqtsk
bmqtst
bmqu
bmqvt
)
target_bmq_style_uor( bmq PRIVATE_PACKAGES ${BMQ_PRIVATE_PACKAGES} )

Expand Down
13 changes: 8 additions & 5 deletions src/groups/bmq/bmqa/bmqa_mocksession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <bmqp_protocol.h>
#include <bmqp_protocolutil.h>
#include <bmqp_pusheventbuilder.h>
#include <bmqst_statcontext.h>
#include <bmqsys_time.h>
#include <bmqt_messageguid.h>
#include <bmqt_uri.h>
Expand Down Expand Up @@ -742,7 +743,7 @@ void MockSession::initializeStats()
start.setLevel(0).setIndex(0);
end.setLevel(0).setIndex(1);
bmqimp::QueueStatsUtil::initializeStats(d_queuesStats_sp.get(),
&d_rootStatContext,
d_rootStatContext_mp.get(),
start,
end,
d_allocator_p);
Expand Down Expand Up @@ -961,8 +962,9 @@ MockSession::MockSession(const bmqt::SessionOptions& options,
bslma::Default::allocator(allocator)),
bslma::Default::allocator(allocator))
, d_postedEvents(bslma::Default::allocator(allocator))
, d_rootStatContext(bmqst::StatContextConfiguration("MockSession", allocator),
allocator)
, d_rootStatContext_mp(bslma::ManagedPtrUtil::makeManaged<bmqst::StatContext>(
bmqst::StatContextConfiguration("MockSession", allocator),
allocator))
, d_queuesStats_sp(new(*bslma::Default::allocator(allocator))
bmqimp::Stat(bslma::Default::allocator(allocator)),
bslma::Default::allocator(allocator))
Expand Down Expand Up @@ -1004,8 +1006,9 @@ MockSession::MockSession(bslma::ManagedPtr<SessionEventHandler> eventHandler,
bslma::Default::allocator(allocator)),
bslma::Default::allocator(allocator))
, d_postedEvents(bslma::Default::allocator(allocator))
, d_rootStatContext(bmqst::StatContextConfiguration("MockSession", allocator),
allocator)
, d_rootStatContext_mp(bslma::ManagedPtrUtil::makeManaged<bmqst::StatContext>(
bmqst::StatContextConfiguration("MockSession", allocator),
allocator))
, d_queuesStats_sp(new(*bslma::Default::allocator(allocator))
bmqimp::Stat(bslma::Default::allocator(allocator)),
bslma::Default::allocator(allocator))
Expand Down
8 changes: 5 additions & 3 deletions src/groups/bmq/bmqa/bmqa_mocksession.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@
/// `emitting` on the @ref BMQA_EXPECT_CALL macro in synchronous mode is
/// meaningless.
///
///```
/// ```
/// void unitTest()
/// {
/// // MockSession created without an eventHandler.
Expand Down Expand Up @@ -548,7 +548,6 @@
#include <bmqa_openqueuestatus.h>
#include <bmqa_queueid.h>
#include <bmqa_session.h> // for 'bmqa::SessionEventHandler'
#include <bmqst_statcontext.h>
#include <bmqt_queueoptions.h>
#include <bmqt_sessionoptions.h>

Expand Down Expand Up @@ -586,6 +585,9 @@ class MessageCorrelationIdContainer;
namespace bmqimp {
struct Stat;
}
namespace bmqst {
class StatContext;
}

namespace bmqa {

Expand Down Expand Up @@ -1040,7 +1042,7 @@ class MockSession : public AbstractSession {
mutable bslmt::Mutex d_mutex;

/// Top level stat context for this mocked Session.
bmqst::StatContext d_rootStatContext;
bslma::ManagedPtr<bmqst::StatContext> d_rootStatContext_mp;

/// Stats for all queues
StatImplSp d_queuesStats_sp;
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ void AdminSession::finalizeAdminCommand(

response.choice().adminCommandResponse().text() = commandExecResults;

BALL_LOG_INFO << description() << ": Send response message: " << response;

int rc = d_state.d_schemaEventBuilder.setMessage(
response,
bmqp::EventType::e_CONTROL);
Expand Down
203 changes: 126 additions & 77 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,131 @@ void RootQueueEngine::onTimer(bsls::Types::Int64 currentTimer)
d_consumptionMonitor.onTimer(currentTimer);
}

bsl::ostream&
RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream,
const mqbu::StorageKey& appKey) const
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

// Get AppState by appKey.
Apps::const_iterator cItApp = d_apps.findByKey2(AppKeyCount(appKey, 0));
if (cItApp == d_apps.end()) {
BALL_LOG_WARN << "No app found for appKey: " << appKey;
stream << "\nSubscription info: no app found for appKey: " << appKey;
return stream; // RETURN
}

const AppStateSp& app = cItApp->value();
return logAppSubscriptionInfo(stream, app);
}

bsl::ostream&
RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream,
const AppStateSp& appState) const
{
mqbi::Storage* const storage = d_queueState_p->storage();

// Log un-delivered messages info
stream << "\nFor appId: " << appState->appId() << "\n\n";
stream << "Put aside list size: "
<< bmqu::PrintUtil::prettyNumber(static_cast<bsls::Types::Int64>(
appState->putAsideListSize()))
<< '\n';
stream << "Redelivery list size: "
<< bmqu::PrintUtil::prettyNumber(static_cast<bsls::Types::Int64>(
appState->redeliveryListSize()))
<< '\n';
stream << "Number of messages: "
<< bmqu::PrintUtil::prettyNumber(
storage->numMessages(appState->appKey()))
<< '\n';
stream << "Number of bytes: "
<< bmqu::PrintUtil::prettyBytes(
storage->numBytes(appState->appKey()))
<< "\n\n";

// Log consumer subscriptions
mqbblp::Routers::QueueRoutingContext& routingContext =
appState->routing()->d_queue;
mqbcmd::Routing routing;
routingContext.loadInternals(&routing);
const bsl::vector<mqbcmd::SubscriptionGroup>& subscrGroups =
routing.subscriptionGroups();
if (!subscrGroups.empty()) {
// Limit to log only k_EXPR_NUM_LIMIT expressions
static const size_t k_EXPR_NUM_LIMIT = 50;
if (subscrGroups.size() > k_EXPR_NUM_LIMIT) {
stream << "First " << k_EXPR_NUM_LIMIT
<< " of consumer subscription expressions: \n";
}
else {
stream << "Consumer subscription expressions: \n";
}

size_t exprNum = 0;
for (bsl::vector<mqbcmd::SubscriptionGroup>::const_iterator cIt =
subscrGroups.begin();
cIt != subscrGroups.end() && exprNum < k_EXPR_NUM_LIMIT;
++cIt, ++exprNum) {
if (cIt->expression().empty()) {
stream << "<Empty>\n";
}
else {
stream << cIt->expression() << '\n';
}
}
stream << '\n';
}

// Log the first (oldest) message in a put aside list and its properties
if (!appState->putAsideList().empty()) {
bslma::ManagedPtr<mqbi::StorageIterator> storageIt_mp;
mqbi::StorageResult::Enum rc = storage->getIterator(
&storageIt_mp,
appState->appKey(),
appState->putAsideList().first());
if (rc == mqbi::StorageResult::e_SUCCESS) {
// Log timestamp
stream << "Oldest message in the 'Put aside' list:\n";
mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessage(&result.makeMessage(),
storage,
*storageIt_mp);
mqbcmd::HumanPrinter::print(stream, result);
stream << '\n';
// Log message properties
const bsl::shared_ptr<bdlbb::Blob>& appData =
storageIt_mp->appData();
const bmqp::MessagePropertiesInfo& logic =
storageIt_mp->attributes().messagePropertiesInfo();
bmqp::MessageProperties properties;
int ret = properties.streamIn(*appData, logic.isExtended());
if (!ret) {
stream << "Message Properties: " << properties << '\n';
}
else {
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
stream << "Message Properties: Failed to acquire [rc: " << rc
<< "]\n";
}
}
else {
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< appState->putAsideList().first()
<< ", rc = " << rc;
stream << "'Put aside' list: Failed to acquire [rc: " << rc
<< "]\n";
}
}

return stream;
}

bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey,
bool enableLog) const
{
Expand Down Expand Up @@ -1711,83 +1836,7 @@ bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey,
<< " consumers." << ss.str() << '\n';

// Log un-delivered messages info
out << "\nFor appId: " << app->appId() << '\n';
out << "Put aside list size: " << app->putAsideListSize() << '\n';
out << "Redelivery list size: " << app->redeliveryListSize() << '\n';
out << "Number of messages: " << storage->numMessages(appKey) << '\n';
out << "Number of bytes: " << storage->numBytes(appKey) << "\n\n";

// Log consumer subscriptions
mqbblp::Routers::QueueRoutingContext& routingContext =
app->routing()->d_queue;
mqbcmd::Routing routing;
routingContext.loadInternals(&routing);
const bsl::vector<mqbcmd::SubscriptionGroup>& subscrGroups =
routing.subscriptionGroups();

// Limit to log only k_EXPR_NUM_LIMIT expressions
static const size_t k_EXPR_NUM_LIMIT = 50;
ss.reset();
size_t exprNum = 0;
for (bsl::vector<mqbcmd::SubscriptionGroup>::const_iterator cIt =
subscrGroups.begin();
cIt != subscrGroups.end() && exprNum < k_EXPR_NUM_LIMIT;
++cIt) {
if (!cIt->expression().empty()) {
ss << cIt->expression() << '\n';
++exprNum;
}
}
if (exprNum) {
if (exprNum == k_EXPR_NUM_LIMIT) {
out << "First " << k_EXPR_NUM_LIMIT
<< " of consumer subscription expressions: ";
}
else {
out << "Consumer subscription expressions: ";
}
out << '\n' << ss.str() << '\n';
}

// Log the first (oldest) message in a put aside list and its properties
if (!app->putAsideList().empty()) {
bslma::ManagedPtr<mqbi::StorageIterator> storageIt_mp;
mqbi::StorageResult::Enum rc = storage->getIterator(
&storageIt_mp,
appKey,
app->putAsideList().first());
if (rc == mqbi::StorageResult::e_SUCCESS) {
// Log timestamp
out << "Oldest message in the 'Put aside' list:\n";
mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessage(&result.makeMessage(),
storage,
*storageIt_mp);
mqbcmd::HumanPrinter::print(out, result);
out << '\n';
// Log message properties
const bsl::shared_ptr<bdlbb::Blob>& appData =
storageIt_mp->appData();
const bmqp::MessagePropertiesInfo& logic =
storageIt_mp->attributes().messagePropertiesInfo();
bmqp::MessageProperties properties;
int ret = properties.streamIn(*appData, logic.isExtended());
if (!ret) {
out << "Message Properties: " << properties << '\n';
}
else {
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
out << "Message Properties: Failed to acquire [rc: " << rc
<< "]\n";
}
}
else {
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->putAsideList().first() << ", rc = " << rc;
out << "'Put aside' list: Failed to acquire [rc: " << rc << "]\n";
}
}
logAppSubscriptionInfo(out, app);

// Print the 10 oldest messages in the queue
static const int k_NUM_MSGS = 10;
Expand Down
Loading

0 comments on commit 45185bb

Please sign in to comment.