From 8f62d9195e00ebd1ce7f50a322be7aa33f0aaa08 Mon Sep 17 00:00:00 2001 From: tbierwiaczon Date: Mon, 17 Oct 2022 13:12:35 +0200 Subject: [PATCH 1/7] m_stateTransfer pointer change to unique_ptr --- kvbc/include/Replica.h | 2 +- kvbc/src/Replica.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kvbc/include/Replica.h b/kvbc/include/Replica.h index 3fe7d7055d..0a9261ff30 100644 --- a/kvbc/include/Replica.h +++ b/kvbc/include/Replica.h @@ -243,7 +243,7 @@ class Replica : public IReplica, const bftEngine::ReplicaConfig &replicaConfig_; bftEngine::IReplica::IReplicaPtr m_replicaPtr = nullptr; std::shared_ptr m_cmdHandler = nullptr; - bftEngine::IStateTransfer *m_stateTransfer = nullptr; + std::unique_ptr m_stateTransfer; std::unique_ptr m_metadataStorage; std::unique_ptr replicaStateSync_; std::shared_ptr aggregator_; diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index f2d8f52bcf..76bdb62065 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -65,7 +65,7 @@ Status Replica::initInternals() { auto requestHandler = bftEngine::IRequestsHandler::createRequestsHandler(m_cmdHandler, cronTableRegistry_); requestHandler->setReconfigurationHandler(std::make_shared(*this)); m_replicaPtr = bftEngine::IReplica::createNewRoReplica( - replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage.get()); + replicaConfig_, requestHandler, m_stateTransfer.get(), m_ptrComm.get(), m_metadataStorage.get()); m_stateTransfer->addOnTransferringCompleteCallback([this](std::uint64_t) { std::vector stateFromReservedPages; uint64_t wedgePt{0}; @@ -306,7 +306,7 @@ void Replica::createReplicaAndSyncState() { m_replicaPtr = bftEngine::IReplica::createNewReplica( replicaConfig_, requestHandler, - m_stateTransfer, + m_stateTransfer.get(), m_ptrComm.get(), m_metadataStorage.get(), pm_, @@ -600,7 +600,7 @@ Replica::Replica(ICommunication *comm, m_dbSet.dataDBClient->setAggregator(aggregator_); m_dbSet.metadataDBClient->setAggregator(aggregator_); auto stKeyManipulator = std::shared_ptr{storageFactory->newSTKeyManipulator()}; - m_stateTransfer = bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_); + m_stateTransfer.reset(bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_)); if (!replicaConfig.isReadOnly) { stReconfigurationSM_ = std::make_unique( *m_stateTransfer, *this, this->AdaptivePruningManager_, this->replicaResources_); From 238901c4ef5406bb9800af78b45eadd16f5652b1 Mon Sep 17 00:00:00 2001 From: Efrat1 <42644291+Efrat1@users.noreply.github.com> Date: Wed, 19 Oct 2022 17:31:35 +0300 Subject: [PATCH 2/7] Fix memory leak of PrePrepareMsg --- bftengine/src/bftengine/ReplicaImp.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 495ed6d0b6..92209f1d57 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -884,7 +884,10 @@ void ReplicaImp::startConsensusProcess(PrePrepareMsg *pp) { } void ReplicaImp::startConsensusProcess(PrePrepareMsg *pp, bool isCreatedEarlier) { - if (!isCurrentPrimary()) return; + if (!isCurrentPrimary()) { + delete pp; + return; + } TimeRecorder scoped_timer(*histograms_.startConsensusProcess); if (getReplicaConfig().timeServiceEnabled) { pp->setTime(time_service_manager_->getClockTimePoint()); From fd1fc43edec77da7ce8ed2950561795ff868f2cb Mon Sep 17 00:00:00 2001 From: tbierwiaczon Date: Thu, 20 Oct 2022 09:36:20 +0200 Subject: [PATCH 3/7] Retransmission manager change to unique_ptr --- bftengine/src/bftengine/ReplicaImp.cpp | 7 +++---- bftengine/src/bftengine/ReplicaImp.hpp | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 92209f1d57..9fa7f80727 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -4542,11 +4542,10 @@ ReplicaImp::ReplicaImp(bool firstTime, controller = new ControllerWithSimpleHistory( config_.getcVal(), config_.getfVal(), config_.getreplicaId(), getCurrentView(), primaryLastUsedSeqNum); - if (retransmissionsLogicEnabled) + if (retransmissionsLogicEnabled) { retransmissionsManager = - new RetransmissionsManager(&internalThreadPool, &getIncomingMsgsStorage(), kWorkWindowSize, 0); - else - retransmissionsManager = nullptr; + std::make_unique(&internalThreadPool, &getIncomingMsgsStorage(), kWorkWindowSize, 0); + } ticks_gen_ = concord::cron::TicksGenerator::create(internalBFTClient_, *clientsManager, diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index a025312e4b..30b0cdbd89 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -89,7 +89,7 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { concord::util::SimpleThreadPool internalThreadPool; // TODO(GG): !!!! rename // retransmissions manager (can be disabled) - RetransmissionsManager* retransmissionsManager = nullptr; + unique_ptr retransmissionsManager; // controller ControllerBase* controller = nullptr; From fce037ea608ec95a96adac07d48892231cf01a58 Mon Sep 17 00:00:00 2001 From: Gil Levkovich Date: Thu, 20 Oct 2022 14:55:28 -0700 Subject: [PATCH 4/7] Revert "Merge pull request #59 from glevkovich/stateTransfer_mem_leak_fix2" This reverts commit 25630fd356e8c2707f4c458a106b0a7764a35080, reversing changes made to c48c61b36a8674e7b827aacbc9d43caeff2d2c60. --- kvbc/include/Replica.h | 2 +- kvbc/src/Replica.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kvbc/include/Replica.h b/kvbc/include/Replica.h index 0a9261ff30..3fe7d7055d 100644 --- a/kvbc/include/Replica.h +++ b/kvbc/include/Replica.h @@ -243,7 +243,7 @@ class Replica : public IReplica, const bftEngine::ReplicaConfig &replicaConfig_; bftEngine::IReplica::IReplicaPtr m_replicaPtr = nullptr; std::shared_ptr m_cmdHandler = nullptr; - std::unique_ptr m_stateTransfer; + bftEngine::IStateTransfer *m_stateTransfer = nullptr; std::unique_ptr m_metadataStorage; std::unique_ptr replicaStateSync_; std::shared_ptr aggregator_; diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index 76bdb62065..f2d8f52bcf 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -65,7 +65,7 @@ Status Replica::initInternals() { auto requestHandler = bftEngine::IRequestsHandler::createRequestsHandler(m_cmdHandler, cronTableRegistry_); requestHandler->setReconfigurationHandler(std::make_shared(*this)); m_replicaPtr = bftEngine::IReplica::createNewRoReplica( - replicaConfig_, requestHandler, m_stateTransfer.get(), m_ptrComm.get(), m_metadataStorage.get()); + replicaConfig_, requestHandler, m_stateTransfer, m_ptrComm.get(), m_metadataStorage.get()); m_stateTransfer->addOnTransferringCompleteCallback([this](std::uint64_t) { std::vector stateFromReservedPages; uint64_t wedgePt{0}; @@ -306,7 +306,7 @@ void Replica::createReplicaAndSyncState() { m_replicaPtr = bftEngine::IReplica::createNewReplica( replicaConfig_, requestHandler, - m_stateTransfer.get(), + m_stateTransfer, m_ptrComm.get(), m_metadataStorage.get(), pm_, @@ -600,7 +600,7 @@ Replica::Replica(ICommunication *comm, m_dbSet.dataDBClient->setAggregator(aggregator_); m_dbSet.metadataDBClient->setAggregator(aggregator_); auto stKeyManipulator = std::shared_ptr{storageFactory->newSTKeyManipulator()}; - m_stateTransfer.reset(bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_)); + m_stateTransfer = bftEngine::bcst::create(stConfig, this, m_metadataDBClient, stKeyManipulator, aggregator_); if (!replicaConfig.isReadOnly) { stReconfigurationSM_ = std::make_unique( *m_stateTransfer, *this, this->AdaptivePruningManager_, this->replicaResources_); From c54109a98c322e1485b286bd41d5ebde5cf0a14d Mon Sep 17 00:00:00 2001 From: Gil Levkovich Date: Mon, 24 Oct 2022 12:39:32 +0300 Subject: [PATCH 5/7] bftengine, diagnostics: add trace logs to increase debuggability To easily debug creation/distraction of some objects, we need to add these logs. --- bftengine/src/bcstatetransfer/BCStateTran.cpp | 4 +++- bftengine/src/bcstatetransfer/BCStateTran.hpp | 2 ++ bftengine/src/bftengine/BFTEngine.cpp | 2 ++ bftengine/src/bftengine/ReplicaImp.cpp | 11 ++++++++--- bftengine/src/bftengine/ReplicaImp.hpp | 3 +-- bftengine/src/preprocessor/PreProcessor.cpp | 13 ++++++++++--- communication/src/TlsConnectionManager.cpp | 1 + diagnostics/include/diagnostics_server.h | 1 + 8 files changed, 28 insertions(+), 9 deletions(-) diff --git a/bftengine/src/bcstatetransfer/BCStateTran.cpp b/bftengine/src/bcstatetransfer/BCStateTran.cpp index eff2997cd4..b981a7035f 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.cpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.cpp @@ -338,6 +338,7 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt } BCStateTran::~BCStateTran() { + LOG_TRACE(logger_, "~BCStateTran"); ConcordAssert(!running_); ConcordAssert(cacheOfVirtualBlockForResPages.empty()); ConcordAssert(pendingItemDataMsgs.empty()); @@ -470,7 +471,7 @@ void BCStateTran::startRunningImpl(IReplicaForStateTransfer *r) { // timer is cancelled in the calling context, see ReplicaForStateTransfer::stop void BCStateTran::stopRunningImpl() { - LOG_INFO(logger_, "Stopping"); + LOG_TRACE(logger_, "stopRunningImpl"); ConcordAssert(running_); ConcordAssertNE(replicaForStateTransfer_, nullptr); // This one should always be first! @@ -484,6 +485,7 @@ void BCStateTran::stopRunningImpl() { stReset(g.txn(), true, false, false); } replicaForStateTransfer_ = nullptr; + LOG_TRACE(logger_, "Done stopRunningImpl"); } // Create a CheckpointDesc for the given checkpointNumber. diff --git a/bftengine/src/bcstatetransfer/BCStateTran.hpp b/bftengine/src/bcstatetransfer/BCStateTran.hpp index e625c45c01..90dc5a5c4e 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.hpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.hpp @@ -868,6 +868,7 @@ class BCStateTran : public IStateTransfer { static constexpr uint64_t MAX_PENDING_BLOCKS_SIZE = 1000ULL; Recorders() { + LOG_TRACE(ST_SRC_LOG, "Recorders: Thread ID: " << std::this_thread::get_id() << KVLOG(this)); auto& registrar = concord::diagnostics::RegistrarSingleton::getInstance(); // common component registrar.perf.registerComponent("state_transfer", @@ -898,6 +899,7 @@ class BCStateTran : public IStateTransfer { src_next_block_wait_duration}); } ~Recorders() { + LOG_TRACE(ST_SRC_LOG, "~Recorders: Thread ID: " << std::this_thread::get_id() << KVLOG(this)); auto& registrar = concord::diagnostics::RegistrarSingleton::getInstance(); registrar.perf.unRegisterComponent("state_transfer"); registrar.perf.unRegisterComponent("state_transfer_dest"); diff --git a/bftengine/src/bftengine/BFTEngine.cpp b/bftengine/src/bftengine/BFTEngine.cpp index b0cb9608b9..e86cda82c5 100644 --- a/bftengine/src/bftengine/BFTEngine.cpp +++ b/bftengine/src/bftengine/BFTEngine.cpp @@ -87,12 +87,14 @@ void ReplicaInternal::start() { } void ReplicaInternal::stop() { + LOG_TRACE(GL, "ReplicaInternal::stop started"); unique_lock lk(debugWaitLock_); if (replica_->isRunning()) { replica_->stop(); } debugWait_.notify_all(); + LOG_TRACE(GL, "ReplicaInternal::stop done"); } void ReplicaInternal::SetAggregator(std::shared_ptr a) { replica_->SetAggregator(a); } diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 9fa7f80727..ecfad189a8 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -4357,7 +4357,6 @@ ReplicaImp::ReplicaImp(bool firstTime, timeOfLastStateSynch{getMonotonicTime()}, // TODO(GG): TBD timeOfLastViewEntrance{getMonotonicTime()}, // TODO(GG): TBD timeOfLastAgreedView{getMonotonicTime()}, // TODO(GG): TBD - pm_{pm}, sm_{sm ? sm : std::make_shared()}, metric_view_{metrics_.RegisterGauge("view", 0)}, @@ -4611,13 +4610,19 @@ ReplicaImp::~ReplicaImp() { } void ReplicaImp::stop() { - if (retransmissionsLogicEnabled) timers_.cancel(retranTimer_); + LOG_DEBUG(GL, "ReplicaImp::stop started"); + if (retransmissionsLogicEnabled) { + timers_.cancel(retranTimer_); + } timers_.cancel(slowPathTimer_); timers_.cancel(infoReqTimer_); timers_.cancel(statusReportTimer_); timers_.cancel(clientRequestsRetransmissionTimer_); - if (viewChangeProtocolEnabled) timers_.cancel(viewChangeTimer_); + if (viewChangeProtocolEnabled) { + timers_.cancel(viewChangeTimer_); + } ReplicaForStateTransfer::stop(); + LOG_DEBUG(GL, "ReplicaImp::stop done"); } void ReplicaImp::addTimers() { diff --git a/bftengine/src/bftengine/ReplicaImp.hpp b/bftengine/src/bftengine/ReplicaImp.hpp index 30b0cdbd89..f93cad55aa 100644 --- a/bftengine/src/bftengine/ReplicaImp.hpp +++ b/bftengine/src/bftengine/ReplicaImp.hpp @@ -27,7 +27,6 @@ #include "InternalReplicaApi.hpp" #include "ClientsManager.hpp" #include "CheckpointInfo.hpp" -#include "SimpleThreadPool.hpp" #include "Bitmap.hpp" #include "OpenTracing.hpp" #include "RequestHandler.h" @@ -374,7 +373,7 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer { // InternalReplicaApi bool isCollectingState() const override { - LOG_DEBUG(GL, "Thread ID: " << std::this_thread::get_id()); + LOG_TRACE(GL, "Thread ID: " << std::this_thread::get_id()); return isCollectingState_; } void startCollectingState(std::string&& reason = ""); diff --git a/bftengine/src/preprocessor/PreProcessor.cpp b/bftengine/src/preprocessor/PreProcessor.cpp index 3c6b64f8d7..3fd7d512f3 100644 --- a/bftengine/src/preprocessor/PreProcessor.cpp +++ b/bftengine/src/preprocessor/PreProcessor.cpp @@ -439,13 +439,20 @@ PreProcessor::PreProcessor(shared_ptr &msgsCommunicator, } PreProcessor::~PreProcessor() { + LOG_TRACE(logger(), "~PreProcessor start"); msgLoopDone_ = true; msgLoopSignal_.notify_all(); cancelTimers(); threadPool_.stop(); - if (msgLoopThread_.joinable()) msgLoopThread_.join(); - if (!memoryPoolEnabled_) - for (const auto &result : preProcessResultBuffers_) delete[] result->buffer; + if (msgLoopThread_.joinable()) { + msgLoopThread_.join(); + } + if (!memoryPoolEnabled_) { + for (const auto &result : preProcessResultBuffers_) { + delete[] result->buffer; + } + } + LOG_TRACE(logger(), "~PreProcessor done"); } void PreProcessor::addTimers() { diff --git a/communication/src/TlsConnectionManager.cpp b/communication/src/TlsConnectionManager.cpp index 94e4345b5f..3b6d7404e5 100644 --- a/communication/src/TlsConnectionManager.cpp +++ b/communication/src/TlsConnectionManager.cpp @@ -76,6 +76,7 @@ void ConnectionManager::stop() { LOG_DEBUG(logger_, "Closing connection from: " << config_.selfId_ << ", to: " << id); syncCloseConnection(conn); } + LOG_TRACE(logger_, "Done stopping connection manager for " << config_.selfId_); } void ConnectionManager::setReceiver(NodeNum, IReceiver* receiver) { receiver_ = receiver; } diff --git a/diagnostics/include/diagnostics_server.h b/diagnostics/include/diagnostics_server.h index 7264f6f6ae..d471e62c68 100644 --- a/diagnostics/include/diagnostics_server.h +++ b/diagnostics/include/diagnostics_server.h @@ -158,6 +158,7 @@ class Server { } void stop() { + LOG_INFO(logger, std::boolalpha << KVLOG(shutdown_)); if (!shutdown_) { LOG_INFO(logger, "Shutting down diagnostics server main thread."); shutdown_.store(true); From 25d9db333ba3c5ead4313af5b01b4f6367e2fea4 Mon Sep 17 00:00:00 2001 From: Gil Levkovich Date: Mon, 24 Oct 2022 12:49:10 +0300 Subject: [PATCH 6/7] SimpleThreadPool, ThreadPool: add name for better logs Here we add (and enforce) name to each queue, and delete the default ctor. In addition, some few more log entries are added. --- bftengine/src/bftengine/ReplicaImp.cpp | 2 ++ bftengine/src/bftengine/RequestThreadPool.hpp | 5 ++- bftengine/src/preprocessor/PreProcessor.cpp | 1 + .../preprocessor/tests/preprocessor_test.cpp | 2 +- .../testRequestThreadPool.cpp | 2 +- .../client_pool/src/concord_client_pool.cpp | 6 ++-- .../thin-replica-client/grpc_connection.hpp | 2 +- .../replica_state_snapshot_client.hpp | 2 +- .../replica_stream_snapshot_client_test.cpp | 8 ++--- .../hash_state_benchmark.cpp | 2 +- kvbc/include/categorization/kv_blockchain.h | 4 +-- kvbc/include/v4blockchain/detail/blockchain.h | 2 +- kvbc/include/v4blockchain/v4_blockchain.h | 4 +-- kvbc/src/Replica.cpp | 3 +- .../block_merkle_latest_ver_cf_migration.cpp | 2 +- kvbc/test/v4blockchain/v4_blockchain_test.cpp | 2 +- .../src/migration_bookeeper.cpp | 3 +- storage/include/s3/client.hpp | 2 +- util/include/SimpleThreadPool.hpp | 5 ++- util/include/thread_pool.hpp | 24 ++++++++++--- util/include/throughput.hpp | 2 +- util/src/SimpleThreadPool.cpp | 9 +++++ util/test/multithreading.cpp | 2 +- util/test/thread_pool_test.cpp | 34 +++++++++---------- 24 files changed, 83 insertions(+), 47 deletions(-) diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index ecfad189a8..b899f2fc10 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -4352,7 +4352,9 @@ ReplicaImp::ReplicaImp(bool firstTime, viewChangeProtocolEnabled{config.viewChangeProtocolEnabled}, autoPrimaryRotationEnabled{config.autoPrimaryRotationEnabled}, restarted_{!firstTime}, + internalThreadPool("ReplicaImp::internalThreadPool"), MAIN_THREAD_ID{std::this_thread::get_id()}, + postExecThread_{"ReplicaImp::postExecThread"}, replyBuffer{static_cast(std::malloc(config_.getmaxReplyMessageSize() - sizeof(ClientReplyMsgHeader)))}, timeOfLastStateSynch{getMonotonicTime()}, // TODO(GG): TBD timeOfLastViewEntrance{getMonotonicTime()}, // TODO(GG): TBD diff --git a/bftengine/src/bftengine/RequestThreadPool.hpp b/bftengine/src/bftengine/RequestThreadPool.hpp index a0c4621fdb..45a36775ca 100644 --- a/bftengine/src/bftengine/RequestThreadPool.hpp +++ b/bftengine/src/bftengine/RequestThreadPool.hpp @@ -32,7 +32,10 @@ class RequestThreadPool final { static auto& getThreadPool(uint16_t level) { // Currently we need 2 level thread pools. static std::array threadBag = { - ReplicaConfig::instance().threadbagConcurrencyLevel1, ReplicaConfig::instance().threadbagConcurrencyLevel2}; + concord::util::ThreadPool("RequestThreadPool::threadBag_ConcurrencyLevel1", + ReplicaConfig::instance().threadbagConcurrencyLevel1), + concord::util::ThreadPool("RequestThreadPool::threadBag_ConcurrencyLevel2", + ReplicaConfig::instance().threadbagConcurrencyLevel2)}; return threadBag.at(level); } diff --git a/bftengine/src/preprocessor/PreProcessor.cpp b/bftengine/src/preprocessor/PreProcessor.cpp index 3fd7d512f3..a20c434c62 100644 --- a/bftengine/src/preprocessor/PreProcessor.cpp +++ b/bftengine/src/preprocessor/PreProcessor.cpp @@ -366,6 +366,7 @@ PreProcessor::PreProcessor(shared_ptr &msgsCommunicator, numOfReplicas_(myReplica.getReplicaConfig().numReplicas + myReplica.getReplicaConfig().numRoReplicas), numOfClientProxies_(myReplica.getReplicaConfig().numOfClientProxies), clientBatchingEnabled_(myReplica.getReplicaConfig().clientBatchingEnabled), + threadPool_("PreProcessor::threadPool"), memoryPool_(maxExternalMsgSize_, timers), metricsComponent_{concordMetrics::Component("preProcessor", std::make_shared())}, metricsLastDumpTime_(0), diff --git a/bftengine/src/preprocessor/tests/preprocessor_test.cpp b/bftengine/src/preprocessor/tests/preprocessor_test.cpp index c62ec1a43a..c7fee4257b 100644 --- a/bftengine/src/preprocessor/tests/preprocessor_test.cpp +++ b/bftengine/src/preprocessor/tests/preprocessor_test.cpp @@ -133,7 +133,7 @@ class DummyReplica : public InternalReplicaApi { private: bool primary_ = true; IncomingMsgsStorage* incomingMsgsStorage_ = nullptr; - concord::util::SimpleThreadPool pool_; + concord::util::SimpleThreadPool pool_{""}; bftEngine::impl::ReplicasInfo replicasInfo_; set replicaIds_; }; diff --git a/bftengine/tests/testRequestThreadPool/testRequestThreadPool.cpp b/bftengine/tests/testRequestThreadPool/testRequestThreadPool.cpp index f74df06e0f..1ca19cacd2 100644 --- a/bftengine/tests/testRequestThreadPool/testRequestThreadPool.cpp +++ b/bftengine/tests/testRequestThreadPool/testRequestThreadPool.cpp @@ -85,7 +85,7 @@ TEST(testRequestThreadPool, sameTP) { std::vector> tasks; try { - auto pool = concord::util::ThreadPool{}; + auto pool = concord::util::ThreadPool{""}; for (uint32_t i = 0; i < 10000; i++) { tasks.push_back(pool.async([&thread_pool_addr]() { std::vector thread_pool_addr_tp; diff --git a/client/client_pool/src/concord_client_pool.cpp b/client/client_pool/src/concord_client_pool.cpp index 47e94adad9..264ddfdbd2 100644 --- a/client/client_pool/src/concord_client_pool.cpp +++ b/client/client_pool/src/concord_client_pool.cpp @@ -256,7 +256,8 @@ std::unique_ptr ConcordClientPool::create(config_pool::Concor ConcordClientPool::ConcordClientPool(config_pool::ConcordClientPoolConfig &config, std::shared_ptr aggregator, bool delay_behavior) - : metricsComponent_{concordMetrics::Component("ClientPool", std::make_shared())}, + : jobs_thread_pool_{"ConcordClientPool::jobs_thread_pool"}, + metricsComponent_{concordMetrics::Component("ClientPool", std::make_shared())}, ClientPoolMetrics_{metricsComponent_.RegisterCounter("requests_counter"), metricsComponent_.RegisterCounter("executed_requests_counter"), metricsComponent_.RegisterCounter("rejected_counter"), @@ -286,7 +287,8 @@ ConcordClientPool::ConcordClientPool(config_pool::ConcordClientPoolConfig &confi ConcordClientPool::ConcordClientPool(config_pool::ConcordClientPoolConfig &config, std::shared_ptr aggregator) - : metricsComponent_{concordMetrics::Component("ClientPool", std::make_shared())}, + : jobs_thread_pool_{"ConcordClientPool::jobs_thread_pool"}, + metricsComponent_{concordMetrics::Component("ClientPool", std::make_shared())}, ClientPoolMetrics_{metricsComponent_.RegisterCounter("requests_counter"), metricsComponent_.RegisterCounter("executed_requests_counter"), metricsComponent_.RegisterCounter("rejected_counter"), diff --git a/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp b/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp index 4643f92fa1..0de797ab04 100644 --- a/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp +++ b/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp @@ -86,10 +86,10 @@ class GrpcConnection { : logger_(logging::getLogger("concord.client.thin_replica.trscon")), address_(address), client_id_(client_id), + grpc_connection_pool_{std::make_unique("GrpcConnection::grpc_connection_pool", 1)}, data_timeout_(std::chrono::seconds(data_operation_timeout_seconds)), hash_timeout_(std::chrono::seconds(hash_operation_timeout_seconds)), snapshot_timeout_(std::chrono::seconds(snapshot_operation_timeout_seconds)) { - grpc_connection_pool_ = std::make_unique(1); ConcordAssertNE(grpc_connection_pool_, nullptr); LOG_INFO(logger_, KVLOG(data_operation_timeout_seconds, hash_operation_timeout_seconds, snapshot_operation_timeout_seconds)); diff --git a/client/thin-replica-client/include/client/thin-replica-client/replica_state_snapshot_client.hpp b/client/thin-replica-client/include/client/thin-replica-client/replica_state_snapshot_client.hpp index 686fc3d488..b91a243b4c 100644 --- a/client/thin-replica-client/include/client/thin-replica-client/replica_state_snapshot_client.hpp +++ b/client/thin-replica-client/include/client/thin-replica-client/replica_state_snapshot_client.hpp @@ -58,7 +58,7 @@ class ReplicaStateSnapshotClient { ReplicaStateSnapshotClient(std::unique_ptr config) : logger_(logging::getLogger("concord.client.replica_stream_snapshot")), config_(std::move(config)), - threadpool_(config_->concurrency_level), + threadpool_("ReplicaStateSnapshotClient::threadpool", config_->concurrency_level), count_of_concurrent_request_{0} {} void readSnapshotStream(const SnapshotRequest& request, std::shared_ptr remote_queue); diff --git a/client/thin-replica-client/test/replica_stream_snapshot_client_test.cpp b/client/thin-replica-client/test/replica_stream_snapshot_client_test.cpp index c3b5657bc5..bede11e4dc 100644 --- a/client/thin-replica-client/test/replica_stream_snapshot_client_test.cpp +++ b/client/thin-replica-client/test/replica_stream_snapshot_client_test.cpp @@ -163,7 +163,7 @@ void getGrpcConnections(bool full_fake, vector>& grpc TEST(replica_stream_snapshot_client_test, test_destructor_always_successful) { vector> grpc_connections; getGrpcConnections(true, grpc_connections, 7); - ThreadPool thread_pool{10}; + ThreadPool thread_pool{"", 10}; auto read_snapshot = [&grpc_connections]() { auto rss_config = std::make_unique(grpc_connections, 8); auto rss = std::make_unique(std::move(rss_config)); @@ -208,7 +208,7 @@ TEST(replica_stream_snapshot_client_test, test_real_action) { // responsible for shutting down the server for this call to ever return. (servers[i])->Wait(); }; - ThreadPool server_thread_pool{7}; + ThreadPool server_thread_pool{"", 7}; vector> server_results; for (size_t i = 0; i < 7; ++i) { server_results.push_back(server_thread_pool.async(run_server, i)); @@ -222,7 +222,7 @@ TEST(replica_stream_snapshot_client_test, test_real_action) { vector> grpc_connections; getGrpcConnections(false, grpc_connections, 7); - ThreadPool thread_pool{10}; + ThreadPool thread_pool{"", 10}; auto read_snapshot = [&grpc_connections](size_t len) { auto rss_config = std::make_unique(grpc_connections, 8); auto rss = std::make_unique(std::move(rss_config)); @@ -232,7 +232,7 @@ TEST(replica_stream_snapshot_client_test, test_real_action) { ::client::replica_state_snapshot_client::SnapshotRequest rss_request; rss_request.snapshot_id = len; rss->readSnapshotStream(rss_request, remote_queue); - ThreadPool read_thread_pool{1}; + ThreadPool read_thread_pool{"", 1}; read_thread_pool.async( [&remote_queue](size_t l) { size_t num_received = 0; diff --git a/kvbc/benchmark/state_snapshot_benchmarks/hash_state_benchmark.cpp b/kvbc/benchmark/state_snapshot_benchmarks/hash_state_benchmark.cpp index dd2979f6db..7a93651da4 100644 --- a/kvbc/benchmark/state_snapshot_benchmarks/hash_state_benchmark.cpp +++ b/kvbc/benchmark/state_snapshot_benchmarks/hash_state_benchmark.cpp @@ -184,7 +184,7 @@ int run(int argc, char* argv[]) { point_lookup_batch_size++; } - auto thread_pool = ThreadPool{static_cast(point_lookup_threads)}; + auto thread_pool = ThreadPool{"hash_state_benchmark::thread_pool", static_cast(point_lookup_threads)}; std::cout << "Hashing state with a point lookup batch size = " << point_lookup_batch_size << ", point lookup threads = " << point_lookup_threads diff --git a/kvbc/include/categorization/kv_blockchain.h b/kvbc/include/categorization/kv_blockchain.h index 6d2a5dd070..a19b037a34 100644 --- a/kvbc/include/categorization/kv_blockchain.h +++ b/kvbc/include/categorization/kv_blockchain.h @@ -254,9 +254,9 @@ class KeyValueBlockchain { VersionedRawBlock last_raw_block_; // currently we are operating with single thread - util::ThreadPool thread_pool_{1}; + util::ThreadPool thread_pool_{"categorization::KeyValueBlockchain::thread_pool", 1}; // For concurrent deletion of the categories inside a block. - util::ThreadPool prunning_thread_pool_{2}; + util::ThreadPool prunning_thread_pool_{"categorization::KeyValueBlockchain::prunning_thread_pool_,", 2}; // metrics std::shared_ptr aggregator_; diff --git a/kvbc/include/v4blockchain/detail/blockchain.h b/kvbc/include/v4blockchain/detail/blockchain.h index d2528cd456..e5461fbd1d 100644 --- a/kvbc/include/v4blockchain/detail/blockchain.h +++ b/kvbc/include/v4blockchain/detail/blockchain.h @@ -120,7 +120,7 @@ class Blockchain { std::atomic last_reachable_block_id_{INVALID_BLOCK_ID}; std::atomic genesis_block_id_{INVALID_BLOCK_ID}; std::shared_ptr native_client_; - util::ThreadPool thread_pool_{1}; + util::ThreadPool thread_pool_{"v4blockchain::detail::Blockchain::thread_pool", 1}; std::optional> future_digest_; bool need_compaction_{false}; std::mutex compaction_mutex_; diff --git a/kvbc/include/v4blockchain/v4_blockchain.h b/kvbc/include/v4blockchain/v4_blockchain.h index b067e787ae..e9073d6c3c 100644 --- a/kvbc/include/v4blockchain/v4_blockchain.h +++ b/kvbc/include/v4blockchain/v4_blockchain.h @@ -276,8 +276,8 @@ class KeyValueBlockchain { const ::rocksdb::Snapshot *snap_shot_{nullptr}; std::map chkpnt_snap_shots_; // const ::rocksdb::Snapshot *chkpoint_snap_shot_{nullptr}; - util::ThreadPool thread_pool_{1}; - util::ThreadPool compaction_thread_pool_{1}; + util::ThreadPool thread_pool_{"v4blockchain::KeyValueBlockchain::thread_pool", 1}; + util::ThreadPool compaction_thread_pool_{"v4blockchain::KeyValueBlockchain::compaction_thread_pool_", 1}; // Metrics std::shared_ptr aggregator_; diff --git a/kvbc/src/Replica.cpp b/kvbc/src/Replica.cpp index f2d8f52bcf..1911643482 100644 --- a/kvbc/src/Replica.cpp +++ b/kvbc/src/Replica.cpp @@ -476,7 +476,8 @@ Replica::Replica(ICommunication *comm, aggregator_(aggregator), pm_{pm}, secretsManager_{secretsManager}, - blocks_io_workers_pool((replicaConfig.numWorkerThreadsForBlockIO > 0) ? replicaConfig.numWorkerThreadsForBlockIO + blocks_io_workers_pool("Replica::blocks_io_workers_pool", + (replicaConfig.numWorkerThreadsForBlockIO > 0) ? replicaConfig.numWorkerThreadsForBlockIO : std::thread::hardware_concurrency()), AdaptivePruningManager_{ concord::performance::IntervalMappingResourceManager::createIntervalMappingResourceManager( diff --git a/kvbc/src/migrations/block_merkle_latest_ver_cf_migration.cpp b/kvbc/src/migrations/block_merkle_latest_ver_cf_migration.cpp index 1fb632909c..5d6276bd79 100644 --- a/kvbc/src/migrations/block_merkle_latest_ver_cf_migration.cpp +++ b/kvbc/src/migrations/block_merkle_latest_ver_cf_migration.cpp @@ -111,7 +111,7 @@ void BlockMerkleLatestVerCfMigration::clearExistingLatestVerCf() { void BlockMerkleLatestVerCfMigration::iterateAndMigrate() { auto blockchain = Blockchain{db_}; - ThreadPool tp(iteration_batch_size_); + ThreadPool tp("BlockMerkleLatestVerCfMigration::iterateAndMigrate::tp", iteration_batch_size_); for (auto block_id = INITIAL_GENESIS_BLOCK_ID; block_id <= blockchain.getLastReachableBlockId(); block_id += iteration_batch_size_) { std::vector> tasks; diff --git a/kvbc/test/v4blockchain/v4_blockchain_test.cpp b/kvbc/test/v4blockchain/v4_blockchain_test.cpp index 7e83444a33..5e6b0f2f29 100644 --- a/kvbc/test/v4blockchain/v4_blockchain_test.cpp +++ b/kvbc/test/v4blockchain/v4_blockchain_test.cpp @@ -111,7 +111,7 @@ class v4_kvbc : public Test { v4blockchain::KeyValueBlockchain* blockchain, std::vector>& tasks) { auto last_block = blockchain->getLastReachableBlockId(); - concord::util::ThreadPool tp; + concord::util::ThreadPool tp{""}; // Keys are: // _key__ diff --git a/kvbc/tools/migrations/v4migration_tool/src/migration_bookeeper.cpp b/kvbc/tools/migrations/v4migration_tool/src/migration_bookeeper.cpp index d612887651..34f679f96b 100644 --- a/kvbc/tools/migrations/v4migration_tool/src/migration_bookeeper.cpp +++ b/kvbc/tools/migrations/v4migration_tool/src/migration_bookeeper.cpp @@ -71,7 +71,8 @@ void BookKeeper::initialize_migration() { } } const auto point_lookup_threads = config_["point-lookup-threads"].as(); - batch_thread_pool_ = std::make_unique(static_cast(point_lookup_threads)); + batch_thread_pool_ = std::make_unique("migration_bookeeper::batch_thread_pool", + static_cast(point_lookup_threads)); } void BookKeeper::migrate(const std::shared_ptr& from, diff --git a/storage/include/s3/client.hpp b/storage/include/s3/client.hpp index 0d3b75bbf0..4285876393 100644 --- a/storage/include/s3/client.hpp +++ b/storage/include/s3/client.hpp @@ -392,7 +392,7 @@ class Client : public concord::storage::IDBClient { logging::Logger logger_ = logging::getLogger("concord.storage.s3"); uint16_t initialDelay_ = 100; Metrics metrics_; - util::ThreadPool thread_pool_{std::thread::hardware_concurrency()}; + util::ThreadPool thread_pool_{"GetObjectResponseData::thread_pool", std::thread::hardware_concurrency()}; }; } // namespace concord::storage::s3 diff --git a/util/include/SimpleThreadPool.hpp b/util/include/SimpleThreadPool.hpp index 90d90488a1..f9210ecfef 100644 --- a/util/include/SimpleThreadPool.hpp +++ b/util/include/SimpleThreadPool.hpp @@ -33,7 +33,9 @@ class SimpleThreadPool { ~Job(){}; // should not be deleted directly - use release() }; - SimpleThreadPool() : stopped_(true) {} + SimpleThreadPool(std::string&& name); + SimpleThreadPool() = delete; + ~SimpleThreadPool(); /** * starts the thread pool with desired number of threads @@ -71,6 +73,7 @@ class SimpleThreadPool { protected: std::queue job_queue_; std::mutex queue_lock_; + std::string name_; std::condition_variable queue_cond_; bool stopped_; int num_of_free_threads_ = 0; diff --git a/util/include/thread_pool.hpp b/util/include/thread_pool.hpp index 0870149f1e..c43f5dc729 100644 --- a/util/include/thread_pool.hpp +++ b/util/include/thread_pool.hpp @@ -16,7 +16,8 @@ #pragma once -#include +#include "assertUtils.hpp" +#include "Logger.hpp" #include #include @@ -29,11 +30,20 @@ namespace concord::util { +static logging::Logger logger{logging::getLogger("concord.util.thread-pool")}; + // A thread pool that supports any callable object with any return type. Returns std::future objects to users. class ThreadPool { public: + ThreadPool() = delete; + // Starts the thread pool with thread_count > 0 threads. - ThreadPool(unsigned int thread_count) noexcept { + ThreadPool(std::string&& name, unsigned int thread_count) noexcept : name_(std::move(name)) { + const std::lock_guard lock(task_queue_.mutex); + static int thread_pool_counter_{1}; + name_ += "_" + std::to_string(thread_pool_counter_); + ++thread_pool_counter_; + LOG_DEBUG(logger, "ThreadPool: create:" << KVLOG(name_, thread_count)); ConcordAssert(thread_count > 0); for (auto i = 0u; i < thread_count; ++i) { threads_.emplace_back([this]() { loop(); }); @@ -41,11 +51,13 @@ class ThreadPool { } // Starts the thread pool with the maximum number of concurrent threads supported by the implementation. - ThreadPool() noexcept - : ThreadPool{std::thread::hardware_concurrency() > 0 ? std::thread::hardware_concurrency() : 1} {} + ThreadPool(std::string&& name) noexcept + : ThreadPool{std::move(name), std::thread::hardware_concurrency() > 0 ? std::thread::hardware_concurrency() : 1} { + } // Stops the thread pool. Waits for the currently executing tasks only (will not exhaust the queues). ~ThreadPool() noexcept { + LOG_DEBUG(logger, "ThreadPool: destroy started:" << KVLOG(name_)); { auto lock = std::lock_guard{task_queue_.mutex}; task_queue_.stop = true; @@ -54,6 +66,7 @@ class ThreadPool { for (auto& t : threads_) { t.join(); } + LOG_DEBUG(logger, "ThreadPool: destroy done:" << KVLOG(name_)); } public: @@ -97,7 +110,7 @@ class ThreadPool { try { task(); } catch (const std::exception& e) { - LOG_ERROR(logging::getLogger("concord.util.thread-pool"), e.what()); + LOG_ERROR(logger, e.what()); } } } @@ -119,6 +132,7 @@ class ThreadPool { // A task queue that is shared between pool threads. TaskQueue task_queue_; + std::string name_; // A list of threads. std::vector threads_; diff --git a/util/include/throughput.hpp b/util/include/throughput.hpp index 0f3ac01e5c..1fdae86067 100644 --- a/util/include/throughput.hpp +++ b/util/include/throughput.hpp @@ -42,7 +42,7 @@ class DurationTracker { public: // create a tracker with a given name. start it immediately if do_start is true. DurationTracker(std::string&& name = "", bool do_start = false) - : total_duration_{}, start_time_{}, running_{do_start}, name_(name) { + : total_duration_{}, start_time_{}, running_{do_start}, name_(std::move(name)) { static size_t obj_counter{}; if (do_start) { start_time_ = std::chrono::steady_clock::now(); diff --git a/util/src/SimpleThreadPool.cpp b/util/src/SimpleThreadPool.cpp index 4ffc97ccd2..c355a1ba6f 100644 --- a/util/src/SimpleThreadPool.cpp +++ b/util/src/SimpleThreadPool.cpp @@ -11,6 +11,8 @@ #include "SimpleThreadPool.hpp" #include "Logger.hpp" +#include "kvstream.h" + #include #include #include @@ -18,7 +20,12 @@ logging::Logger SP = logging::getLogger("thread-pool"); namespace concord::util { +SimpleThreadPool::SimpleThreadPool(std::string&& name) : name_(std::move(name)), stopped_(true) {} + +SimpleThreadPool::~SimpleThreadPool() { LOG_DEBUG(SP, "SimpleThreadPool: destroyed:" << KVLOG(name_)); } + void SimpleThreadPool::start(uint8_t num_of_threads) { + LOG_DEBUG(SP, "SimpleThreadPool: start:" << KVLOG(name_, num_of_threads)); stopped_ = false; guard g(queue_lock_); for (auto i = 0; i < num_of_threads; ++i) { @@ -42,6 +49,7 @@ void SimpleThreadPool::start(uint8_t num_of_threads) { } void SimpleThreadPool::stop(bool executeAllJobs) { + LOG_DEBUG(SP, "SimpleThreadPool: stopping:" << KVLOG(name_)); { std::unique_lock l{queue_lock_}; stopped_ = true; @@ -62,6 +70,7 @@ void SimpleThreadPool::stop(bool executeAllJobs) { j->release(); } + LOG_DEBUG(SP, "SimpleThreadPool: stop done:" << KVLOG(name_)); } void SimpleThreadPool::add(Job* j) { diff --git a/util/test/multithreading.cpp b/util/test/multithreading.cpp index 2b6b992d45..ddfc26bb27 100644 --- a/util/test/multithreading.cpp +++ b/util/test/multithreading.cpp @@ -60,7 +60,7 @@ class SimpleThreadPoolFixture : public testing::Test { ASSERT_EQ(pool_.getNumOfJobs(), 0); } - concord::util::SimpleThreadPool pool_; + concord::util::SimpleThreadPool pool_{""}; std::atomic_int result; }; diff --git a/util/test/thread_pool_test.cpp b/util/test/thread_pool_test.cpp index 82c36b8479..4aa7e629ea 100644 --- a/util/test/thread_pool_test.cpp +++ b/util/test/thread_pool_test.cpp @@ -62,14 +62,14 @@ void copy(CopyOnly) {} // Make sure the pool can execute lambdas. TEST(thread_pool, lambda) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async([]() { return answer; }); ASSERT_EQ(answer, future.get()); } // Make sure the pool can execute functions. TEST(thread_pool, functions) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future1 = pool.async(func); auto future2 = pool.async(&func); auto future3 = pool.async(func_ptr); @@ -82,7 +82,7 @@ TEST(thread_pool, functions) { // Make sure the pool can execute std::function objects. TEST(thread_pool, std_func) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto std_func = std::function{func}; auto future = pool.async(std_func); ASSERT_EQ(answer, future.get()); @@ -90,21 +90,21 @@ TEST(thread_pool, std_func) { // Make sure we can execute a void function. TEST(thread_pool, void_func) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async(void_func, answer); ASSERT_NO_THROW(future.wait()); } // Make sure async supports arguments. TEST(thread_pool, arguments_with_return) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async(identity, answer + 1); ASSERT_EQ(answer + 1, future.get()); } // Make sure we can execute tasks that have different return types. TEST(thread_pool, different_task_return_types) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future1 = pool.async(func); auto future2 = pool.async([]() {}); auto future3 = pool.async([]() { return std::string{"s"}; }); @@ -115,20 +115,20 @@ TEST(thread_pool, different_task_return_types) { // Make sure we can move arguments inside async. TEST(thread_pool, move_only_arguments) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async(own, MoveOnly{}); ASSERT_NO_THROW(future.wait()); } // Make sure we can pass copy-only arguments to async. TEST(thread_pool, copy_only_arguments) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async(copy, CopyOnly{}); ASSERT_NO_THROW(future.wait()); } TEST(thread_pool, pointer_arguments_are_copied) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto i = std::atomic_int{5}; auto p = &i; auto future = pool.async( @@ -143,7 +143,7 @@ TEST(thread_pool, pointer_arguments_are_copied) { } TEST(thread_pool, non_const_lvalues_are_copied) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto i = 5; auto future = pool.async([](auto v) { v = 13; }, i); ASSERT_NO_THROW(future.wait()); @@ -151,7 +151,7 @@ TEST(thread_pool, non_const_lvalues_are_copied) { } TEST(thread_pool, std_ref) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto i = std::atomic_int{5}; auto future = pool.async([](auto v) { v.get() = 13; }, std::ref(i)); ASSERT_NO_THROW(future.wait()); @@ -160,14 +160,14 @@ TEST(thread_pool, std_ref) { // Multiple arguments of different types. TEST(thread_pool, multiple_arguments) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto future = pool.async([](auto&& answer, auto&&, auto&&) { return answer; }, answer, std::string{"s"}, 3.14); ASSERT_EQ(answer, future.get()); } // Make sure exceptions from the user-passed function are correctly propagated. TEST(thread_pool, exception) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; // NOLINTNEXTLINE(misc-throw-by-value-catch-by-reference) auto future = pool.async([]() { throw answer; }); ASSERT_THROW(future.get(), decltype(answer)); @@ -181,7 +181,7 @@ TEST(thread_pool, non_blocking_future_dtors) { auto mtx = std::mutex{}; auto cv = std::condition_variable{}; - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; { auto future1 = pool.async([&]() { @@ -208,7 +208,7 @@ TEST(thread_pool, non_blocking_future_dtors) { // Make sure that adding more tasks than the concurrency supported by the system works. TEST(thread_pool, more_tasks_than_concurrency) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; const auto tasks = concurrency * 10; auto futures = std::vector>{}; for (auto i = 0u; i < tasks; ++i) { @@ -221,7 +221,7 @@ TEST(thread_pool, more_tasks_than_concurrency) { // Make sure that the pool works correctly with a single thread. TEST(thread_pool, one_thread) { - auto pool = ThreadPool{1}; + auto pool = ThreadPool{"", 1}; const auto tasks = 16u; auto futures = std::vector>{}; for (auto i = 0u; i < tasks; ++i) { @@ -234,7 +234,7 @@ TEST(thread_pool, one_thread) { // Make sure that adding tasks from different threads works properly. TEST(thread_pool, add_tasks_from_different_threads) { - auto pool = ThreadPool{}; + auto pool = ThreadPool{""}; auto async_future = std::async(std::launch::async, [&pool]() { pool.async(func); }); auto pool_future = pool.async(func); ASSERT_EQ(answer, pool_future.get()); From 33aab3c1fca6128d741986826d166e140f1aef17 Mon Sep 17 00:00:00 2001 From: Efrat1 <42644291+Efrat1@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:35:26 +0200 Subject: [PATCH 7/7] Fix leak in RO Replica (#65) --- bftengine/src/bftengine/ReadOnlyReplica.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bftengine/src/bftengine/ReadOnlyReplica.cpp b/bftengine/src/bftengine/ReadOnlyReplica.cpp index f2e4826b11..88e49cefaa 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.cpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.cpp @@ -287,9 +287,7 @@ void ReadOnlyReplica::registerStatusHandlers() { bj.endNested(); bj.endJson(); - char *cstr = new char[bj.getJson().length() + 1]; - std::strcpy(cstr, bj.getJson().c_str()); - return cstr; + return bj.getJson(); }); concord::diagnostics::RegistrarSingleton::getInstance().status.registerHandler(h); }