Skip to content

Commit

Permalink
Perf[MQB]: make independent item pools for channels
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Oct 28, 2024
1 parent 8550a81 commit 1dfbaf2
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 98 deletions.
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbmock/mqbmock_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ void Cluster::_initializeNetcluster()
d_netCluster_mp.load(new (*d_allocator_p)
mqbnet::MockCluster(d_clusterDefinition,
d_bufferFactory_p,
&d_itemPool,
d_allocator_p),
d_allocator_p);

Expand Down Expand Up @@ -221,7 +220,6 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory,
, d_timeSource(&d_scheduler)
, d_isStarted(false)
, d_clusterDefinition(allocator)
, d_itemPool(mqbnet::Channel::k_ITEM_SIZE, allocator)
, d_channels(allocator)
, d_negotiator_mp()
, d_transportManager(&d_scheduler,
Expand Down
11 changes: 0 additions & 11 deletions src/groups/mqb/mqbmock/mqbmock_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ class Cluster : public mqbi::Cluster {
mqbcfg::ClusterDefinition d_clusterDefinition;
// Cluster definition

mqbnet::Channel::ItemPool d_itemPool;
// Item pool

TestChannelMap d_channels;
// Test channels

Expand Down Expand Up @@ -429,9 +426,6 @@ class Cluster : public mqbi::Cluster {
/// Get a modifiable reference to this object's time source.
bdlmt::EventSchedulerTestTimeSource& _timeSource();

/// Get a modifiable reference to this object's item pool.
mqbnet::Channel::ItemPool& _itemPool();

/// Get a modifiable reference to this object's cluster data.
mqbc::ClusterData* _clusterData();

Expand Down Expand Up @@ -585,11 +579,6 @@ inline bdlmt::EventSchedulerTestTimeSource& Cluster::_timeSource()
return d_timeSource;
}

inline mqbnet::Channel::ItemPool& Cluster::_itemPool()
{
return d_itemPool;
}

inline mqbc::ClusterData* Cluster::_clusterData()
{
return d_clusterData_mp.get();
Expand Down
27 changes: 14 additions & 13 deletions src/groups/mqb/mqbnet/mqbnet_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,18 @@ void Channel::Stats::reset()
// -------------

Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator)
: d_allocators(allocator)
, d_allocator_p(d_allocators.get(bsl::string("Channel-") + name))
, d_allocator_p(d_allocators.get("Channel"))
, d_putBuilder(blobBufferFactory, d_allocator_p)
, d_pushBuilder(blobBufferFactory, d_allocator_p)
, d_ackBuilder(blobBufferFactory, d_allocator_p)
, d_confirmBuilder(blobBufferFactory, d_allocator_p)
, d_rejectBuilder(blobBufferFactory, d_allocator_p)
, d_itemPool_p(itemPool)
, d_itemPool(sizeof(Item),
bsls::BlockGrowth::BSLS_CONSTANT,
d_allocators.get(bsl::string("ItemPool")))
, d_buffer(1024, allocator)
, d_secondaryBuffer(1024, allocator)
, d_doStop(false)
Expand Down Expand Up @@ -108,7 +109,7 @@ Channel::~Channel()

void Channel::deleteItem(void* item, void* cookie)
{
static_cast<Channel*>(cookie)->d_itemPool_p->deleteObject(
static_cast<Channel*>(cookie)->d_itemPool.deleteObject(
static_cast<Item*>(item));
}

Expand All @@ -119,7 +120,7 @@ Channel::writePut(const bmqp::PutHeader& ph,
bool keepWeakPtr)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(ph, data, keepWeakPtr, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -136,7 +137,7 @@ Channel::writePush(const bsl::shared_ptr<bdlbb::Blob>& payload,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -160,7 +161,7 @@ Channel::writePush(int queueId,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -182,7 +183,7 @@ Channel::writeAck(int status,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(status, correlationId, guid, queueId, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -195,7 +196,7 @@ Channel::writeConfirm(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -213,7 +214,7 @@ Channel::writeReject(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -230,7 +231,7 @@ Channel::writeBlob(const bdlbb::Blob& data,
bmqp::EventType::Enum type,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(data, type, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -253,7 +254,7 @@ void Channel::resetChannel()
d_stateCondition.signal();
}
// Wake up the writing thread in case it is blocked by 'popFront'
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down Expand Up @@ -336,7 +337,7 @@ void Channel::flush()
return; // RETURN
}

bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down
12 changes: 3 additions & 9 deletions src/groups/mqb/mqbnet/mqbnet_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ class Channel {

public:
// PUBLIC TYPES

typedef bdlma::ConcurrentPool ItemPool;
typedef bmqc::MonitoredQueue<
bdlcc::SingleConsumerQueue<bslma::ManagedPtr<Item> > >
ItemQueue;
Expand All @@ -366,18 +364,15 @@ class Channel {
e_HWM = 5 // HWM
};

public:
// CONSTANTS
static const int k_ITEM_SIZE = sizeof(Item); // for ItemPool

private:
// CONSTANTS
static const int k_NAGLE_PACKET_SIZE = 1024 * 1024; // 1MB;

// DATA
/// Allocator store to spawn new allocators for sub-components
bmqma::CountingAllocatorStore d_allocators;
// Counting allocator

/// Counting allocator
bslma::Allocator* d_allocator_p;

bmqp::PutEventBuilder d_putBuilder;
Expand All @@ -390,7 +385,7 @@ class Channel {

bmqp::RejectEventBuilder d_rejectBuilder;

ItemPool* d_itemPool_p;
bdlma::ConcurrentPool d_itemPool;
// Pool of 'Item' objects.

ItemQueue d_buffer;
Expand Down Expand Up @@ -513,7 +508,6 @@ class Channel {

/// Create a new object using the specified `allocator`.
Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator);

Expand Down
24 changes: 6 additions & 18 deletions src/groups/mqb/mqbnet/mqbnet_channel.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,7 @@ static void test1_write()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -793,9 +791,7 @@ static void test2_highWatermark()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -885,9 +881,7 @@ static void test3_highWatermarkInWriteCb()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -977,9 +971,7 @@ static void test4_controlBlob()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1048,9 +1040,7 @@ static void test5_reconnect()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1129,9 +1119,7 @@ static void test6_weakData()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down
12 changes: 5 additions & 7 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ namespace mqbnet {
ClusterNodeImp::ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
: d_allocators(allocator)
, d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), d_allocators.get(config.name()))
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -181,10 +181,8 @@ ClusterImp::ClusterImp(const bsl::string& name,
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_name(name, allocator)
: d_name(name, allocator)
, d_nodesConfig(nodesConfig, allocator)
, d_selfNodeId(selfNodeId)
, d_selfNode(0) // set below
Expand All @@ -199,7 +197,7 @@ ClusterImp::ClusterImp(const bsl::string& name,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_nodesConfig.begin(); nodeIt != d_nodesConfig.end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
if (nodeIt->id() == selfNodeId) {
d_selfNode = d_nodesList.back();
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class ClusterNodeImp : public ClusterNode {

private:
// DATA
/// Allocator store to spawn new allocators for sub-components
bmqma::CountingAllocatorStore d_allocators;

ClusterImp* d_cluster_p;
// Cluster this node belongs to

Expand Down Expand Up @@ -113,7 +116,6 @@ class ClusterNodeImp : public ClusterNode {
ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor.
Expand Down Expand Up @@ -196,9 +198,6 @@ class ClusterImp : public Cluster {

private:
// DATA
bslma::Allocator* d_allocator_p;
// Allocator to use

bsl::string d_name;
// Name of this Cluster

Expand Down Expand Up @@ -277,7 +276,6 @@ class ClusterImp : public Cluster {
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_dummysession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,14 @@ static void test1_BreathingTest()

mqbcfg::ClusterDefinition clusterConfig(s_allocator_p);
bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::MockCluster mockCluster(clusterConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

mqbcfg::ClusterNode clusterNodeConfig(s_allocator_p);
mqbnet::MockClusterNode mockClusterNode(&mockCluster,
clusterNodeConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

bsl::shared_ptr<bmqio::TestChannel> testChannel;
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_mockcluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ namespace mqbnet {
MockClusterNode::MockClusterNode(MockCluster* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), allocator)
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -156,7 +155,6 @@ void MockCluster::notifyObserversOfNodeStateChange(ClusterNode* node,

MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_config(config, allocator)
Expand All @@ -171,7 +169,7 @@ MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_config.nodes().begin(); nodeIt != d_config.nodes().end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
}
}
Expand Down
Loading

0 comments on commit 1dfbaf2

Please sign in to comment.