From 2d89761c8d9bcea41967b3da98afb02e0667a877 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Thu, 17 Oct 2024 15:55:06 +0100 Subject: [PATCH] Fix[MWC]: ntc channel execute after close Signed-off-by: Evgeny Malygin --- src/groups/bmq/bmqio/bmqio_ntcchannel.cpp | 11 +- src/groups/bmq/bmqio/bmqio_ntcchannel.h | 5 +- src/groups/bmq/bmqio/bmqio_ntcchannel.t.cpp | 403 ++++++++++++++++++ .../bmq/bmqio/bmqio_ntcchannelfactory.t.cpp | 10 +- 4 files changed, 417 insertions(+), 12 deletions(-) create mode 100644 src/groups/bmq/bmqio/bmqio_ntcchannel.t.cpp diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp index a5099ce95..7e5737a94 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp @@ -1333,8 +1333,12 @@ void NtcChannel::close(const Status& status) int NtcChannel::execute(const ExecuteCb& cb) { - d_streamSocket_sp->execute(cb); - return 0; + bslmt::LockGuard lock(&d_mutex); + if (d_streamSocket_sp) { + d_streamSocket_sp->execute(cb); + return 0; // RETURN + } + return -1; } bdlmt::SignalerConnection NtcChannel::onClose(const CloseFn& cb) @@ -1632,7 +1636,8 @@ int NtcListener::listen(bmqio::Status* status, #if BMQIO_NTCLISTENER_BIND_ASYNC == 0 - bsl::shared_ptr resolver = ntsf::System::createResolver(); + bsl::shared_ptr resolver = ntsf::System::createResolver( + d_allocator_p); ntsa::EndpointOptions endpointOptions; endpointOptions.setTransport(ntsa::Transport::e_TCP_IPV4_STREAM); diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.h b/src/groups/bmq/bmqio/bmqio_ntcchannel.h index 0f28079d1..2dce66553 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannel.h +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.h @@ -354,7 +354,8 @@ class NtcChannel : public bmqio::Channel, /// this channel for the write to succeed. void write(Status* status, const bdlbb::Blob& blob, - bsls::Types::Int64 watermark) BSLS_KEYWORD_OVERRIDE; + bsls::Types::Int64 watermark = bsl::numeric_limits::max()) + BSLS_KEYWORD_OVERRIDE; /// Cancel the operation. void cancel() BSLS_KEYWORD_OVERRIDE; @@ -369,7 +370,7 @@ class NtcChannel : public bmqio::Channel, /// Shutdown this channel, and cancel all pending read requests (but do /// not invoke them). Pass the specified `status` to any registered /// `CloseFn`s. - void close(const Status& status) BSLS_KEYWORD_OVERRIDE; + void close(const Status& status = Status()) BSLS_KEYWORD_OVERRIDE; /// Execute the specified `cb` serialized with calls to any registered /// read callbacks, or any `close` or `watermark` event handlers for diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.t.cpp b/src/groups/bmq/bmqio/bmqio_ntcchannel.t.cpp new file mode 100644 index 000000000..940555331 --- /dev/null +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.t.cpp @@ -0,0 +1,403 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// bmqio_ntcchannel.t.cpp -*-C++-*- +#include + +// BMQ +#include + +// NTC +#include +#include +#include + +// BDE +#include +#include +#include +#include + +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; +using namespace bmqio; + +namespace { + +const size_t k_WRITE_QUEUE_LOW_WM = 512 * 1024; +const size_t k_WRITE_QUEUE_HIGH_WM = 512 * 1024; + +/// Create the ntca::InterfaceConfig to use given the specified +/// `sessionOptions`. Use the specified `allocator` for any memory +/// allocation. +ntca::InterfaceConfig ntcCreateInterfaceConfig(bslma::Allocator* allocator) +{ + ntca::InterfaceConfig config(allocator); + + config.setThreadName("test"); + + config.setMaxThreads(4); + config.setMaxConnections(128); + config.setWriteQueueLowWatermark(k_WRITE_QUEUE_LOW_WM); + config.setWriteQueueHighWatermark(k_WRITE_QUEUE_HIGH_WM); + + config.setDriverMetrics(false); + config.setDriverMetricsPerWaiter(false); + config.setSocketMetrics(false); + config.setSocketMetricsPerHandle(false); + + config.setAcceptGreedily(false); + config.setSendGreedily(false); + config.setReceiveGreedily(false); + + config.setNoDelay(true); + config.setKeepAlive(true); + config.setKeepHalfOpen(false); + + return config; +} + +void doFail() +{ + ASSERT(false && "Must not be invoked"); +} + +void executeOnClosedChannelFunc(bmqio::NtcChannel* channel, + const Status& status) +{ + // PRECONDITIONS + ASSERT(channel); + + BSLA_MAYBE_UNUSED bslma::Allocator* alloc = channel->allocator(); + BSLA_MAYBE_UNUSED int id = channel->channelId(); + BSLA_MAYBE_UNUSED ntsa::Endpoint peerEndpoint = channel->peerEndpoint(); + BSLA_MAYBE_UNUSED ntsa::Endpoint sourceEndpoint = + channel->sourceEndpoint(); + BSLA_MAYBE_UNUSED const bsl::string& peerUri = channel->peerUri(); + BSLA_MAYBE_UNUSED bmqvt::PropertyBag& properties = channel->properties(); + + channel->setChannelId(id); + channel->setWriteQueueLowWatermark(k_WRITE_QUEUE_LOW_WM); + channel->setWriteQueueHighWatermark(k_WRITE_QUEUE_HIGH_WM); + + // These operations on a closed channel should be no-op + channel->cancel(); + channel->cancelRead(); + channel->execute(bdlf::BindUtil::bind(doFail)); + + // The second `close` should be no-op + channel->close(); +} + +// ============ +// class Tester +// ============ + +/// Helper class testing a NtcChannel, wrapping its creation and providing +/// convenient wrappers for calling its functions and checking its results. +/// +/// Many of the functions take a `int line` argument, which is always +/// passed `L_` and is used in assertion messages to find where an error +/// occurred. +class Tester { + // DATA + bslma::Allocator* d_allocator_p; + bsl::shared_ptr d_blobBufferFactory_sp; + bsl::shared_ptr d_interface_sp; + bsl::shared_ptr d_listener_sp; + bmqio::ChannelFactory::ResultCallback d_listenResultCallback; + bmqio::ChannelFactory::ResultCallback d_connectResultCallback; + bsl::vector > d_listenChannels; + bsl::vector > d_connectChannels; + bslmt::Semaphore d_semaphore; + bslmt::Mutex d_mutex; + + // NOT IMPLEMENTED + Tester(const Tester&); + Tester& operator=(const Tester&); + + // PRIVATE MANIPULATORS + void destroy(); + + void + onAcceptConnection(const bsl::shared_ptr& acceptor, + const bsl::shared_ptr& streamSocket, + const ntca::AcceptEvent& event); + + void onChannelResult(ChannelFactoryEvent::Enum event, + const Status& status, + const bsl::shared_ptr& channel); + + public: + // TRAITS + BSLMF_NESTED_TRAIT_DECLARATION(Tester, bslma::UsesBslmaAllocator) + + // CREATORS + explicit Tester(bslma::Allocator* basicAllocator = 0); + ~Tester(); + + // MANIPULATORS + + /// (Re-)create the object being tested and reset the state of any + /// supporting objects. + void init(); + + bsl::shared_ptr connect(); +}; + +// ------------ +// class Tester +// ------------ + +// CREATORS +Tester::Tester(bslma::Allocator* basicAllocator) +: d_allocator_p(bslma::Default::allocator(basicAllocator)) +, d_blobBufferFactory_sp() +, d_interface_sp() +, d_listener_sp() +, d_listenResultCallback(bdlf::BindUtil::bindS(s_allocator_p, + &Tester::onChannelResult, + this, + bdlf::PlaceHolders::_1, + bdlf::PlaceHolders::_2, + bdlf::PlaceHolders::_3)) +, d_connectResultCallback(bdlf::BindUtil::bindS(s_allocator_p, + &Tester::onChannelResult, + this, + bdlf::PlaceHolders::_1, + bdlf::PlaceHolders::_2, + bdlf::PlaceHolders::_3)) +, d_listenChannels(d_allocator_p) +, d_connectChannels(d_allocator_p) +, d_semaphore() +, d_mutex() +{ +} + +Tester::~Tester() +{ + destroy(); +} + +// MANIPULATORS +void Tester::destroy() +{ + bslmt::LockGuard guard(&d_mutex); + for (size_t i = 0; i < d_listenChannels.size(); i++) { + d_listenChannels[i]->close(); + } + for (size_t i = 0; i < d_connectChannels.size(); i++) { + d_connectChannels[i]->close(); + } + d_listenChannels.clear(); + d_connectChannels.clear(); + if (d_listener_sp) { + d_listener_sp->close(); + d_listener_sp->shutdown(); + d_listener_sp.reset(); + } + if (d_interface_sp) { + d_interface_sp->shutdown(); + d_interface_sp->linger(); + d_interface_sp.reset(); + } +} + +void Tester::onAcceptConnection( + const bsl::shared_ptr& acceptor, + const bsl::shared_ptr& streamSocket, + const ntca::AcceptEvent& event) +{ + if (event.isError()) { + return; + } + + bsl::shared_ptr channel; + channel.createInplace(d_allocator_p, + d_interface_sp, + d_listenResultCallback, + d_allocator_p); + + channel->import(streamSocket); + + ntsa::Error error = acceptor->accept( + ntca::AcceptOptions(), + bdlf::BindUtil::bindS(s_allocator_p, + &Tester::onAcceptConnection, + this, + bdlf::PlaceHolders::_1, + bdlf::PlaceHolders::_2, + bdlf::PlaceHolders::_3)); + ASSERT_EQ(error, ntsa::Error::e_OK); + + d_listenChannels.push_back(channel); + d_semaphore.post(); +} + +void Tester::onChannelResult(ChannelFactoryEvent::Enum event, + const Status& status, + const bsl::shared_ptr& channel) +{ + d_connectChannels.push_back(channel); +} + +void Tester::init() +{ + // 0. Cleanup + destroy(); + + // 1. Create blob buffer factory + d_blobBufferFactory_sp.createInplace(d_allocator_p, 4096, d_allocator_p); + + // 2. Start ntc intefrace + ntca::InterfaceConfig config = ntcCreateInterfaceConfig(d_allocator_p); + config.setThreadName("test"); + + d_interface_sp = ntcf::System::createInterface(config, + d_blobBufferFactory_sp, + d_allocator_p); + ntsa::Error error = d_interface_sp->start(); + ASSERT_EQ(error, ntsa::Error::e_OK); + + // 3. Start listener + const int backlog = 10; + + ntca::ListenerSocketOptions listenerSocketOptions; + listenerSocketOptions.setTransport(ntsa::Transport::e_TCP_IPV4_STREAM); + listenerSocketOptions.setReuseAddress(true); + listenerSocketOptions.setKeepHalfOpen(true); + listenerSocketOptions.setBacklog(backlog); + listenerSocketOptions.setSourceEndpoint( + ntsa::Endpoint(ntsa::Ipv4Address::loopback(), 0)); + + d_listener_sp = d_interface_sp->createListenerSocket(listenerSocketOptions, + d_allocator_p); + ASSERT(d_listener_sp); + + error = d_listener_sp->open(); + ASSERT_EQ(error, ntsa::Error::e_OK); + + error = d_listener_sp->listen(backlog); + ASSERT_EQ(error, ntsa::Error::e_OK); + + const ntsa::Endpoint endpoint = d_listener_sp->sourceEndpoint(); + ASSERT(endpoint.isIp()); + ASSERT(endpoint.ip().host().isV4()); +} + +bsl::shared_ptr Tester::connect() +{ + bsl::shared_ptr channel; + channel.createInplace(d_allocator_p, + d_interface_sp, + d_connectResultCallback, + d_allocator_p); + + bmqio::Status status(d_allocator_p); + bmqio::ConnectOptions options(d_allocator_p); + options.setEndpoint(d_listener_sp->sourceEndpoint().text()) + .setNumAttempts(1) + .setAttemptInterval(bsls::TimeInterval(1)); + const int rc = channel->connect(&status, options); + ASSERT_EQ(rc, 0); + ASSERT_EQ(status.category(), bmqio::StatusCategory::e_SUCCESS); + + ntci::AcceptCallback acceptCallback = d_listener_sp->createAcceptCallback( + bdlf::BindUtil::bindS(d_allocator_p, + &Tester::onAcceptConnection, + this, + bdlf::PlaceHolders::_1, + bdlf::PlaceHolders::_2, + bdlf::PlaceHolders::_3), + d_allocator_p); + + ntsa::Error error = d_listener_sp->accept(ntca::AcceptOptions(), + acceptCallback); + ASSERT_EQ(error, ntsa::Error::e_OK); + + d_semaphore.wait(); + + return channel; +} + +} // close unnamed namespace + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- +static void test1_breathingTest() +// ------------------------------------------------------------------------ +// BREATHING TEST +// +// Concerns: +// a) Listening for a connection and connecting to the same port +// establishes both connections +// b) Closing a Channel closes both ends +// c) Operations executed over closed channel are handled gracefully +// ------------------------------------------------------------------------ +{ + bmqtst::TestHelper::printTestName("Breathing Test"); + + Tester tester(s_allocator_p); + tester.init(); + + bsl::shared_ptr channel = tester.connect(); + bdlbb::PooledBlobBufferFactory blobFactory(4096, s_allocator_p); + bdlbb::Blob blob(&blobFactory, s_allocator_p); + + bsl::string message("test", s_allocator_p); + bsl::shared_ptr messagePtr; + messagePtr.reset(message.data(), + bslstl::SharedPtrNilDeleter(), + s_allocator_p); + bdlbb::BlobBuffer buffer(messagePtr, message.size()); + + blob.appendDataBuffer(buffer); + bmqio::Status status(s_allocator_p); + channel->write(&status, blob); + ASSERT_EQ(status.category(), bmqio::StatusCategory::e_SUCCESS); + + bmqio::Channel::CloseFn closeCb = bdlf::BindUtil::bindS( + s_allocator_p, + executeOnClosedChannelFunc, + channel.get(), + bdlf::PlaceHolders::_1); + channel->onClose(closeCb); + + channel->close(); +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT); + + switch (_testCase) { + case 0: + case 1: test1_breathingTest(); break; + default: { + cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; + s_testStatus = -1; + } break; + } + + TEST_EPILOG(0); +} diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannelfactory.t.cpp b/src/groups/bmq/bmqio/bmqio_ntcchannelfactory.t.cpp index ceafeea57..ad78e928a 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannelfactory.t.cpp +++ b/src/groups/bmq/bmqio/bmqio_ntcchannelfactory.t.cpp @@ -44,19 +44,15 @@ using namespace BloombergLP; using namespace bsl; using namespace bmqio; -// IMPLEMENTATION NOTES: This test driver is nearly identical to the test -// driver for bmqio_tcpchannelfactory with the following differences: +// IMPLEMENTATION NOTES: // // 1) There is no resolution map or reliance on a process-wide mechanism to // override the results of name resolution. NTC supports object-specific // name resolution and it is assumed those facilities work correctly and the // machine is capable of resolving "localhost" to 127.0.0.1. // -// 2) Connecting to an unresolvable or invalid name fails asynchronously -// in NTC, rather than synchronously as in BTE. The test to check that -// connecting to something like "localfoohost" or "localhost:a" fails -// synchronously has been changed to check that the operation fails -// asynchronously. +// 2) Connecting to an unresolvable or invalid name (like "localfoohost" or +// "localhost:a") fails asynchronously in NTC. // // 3) When immediately cancelling a connection to a valid listening socket, // NTC can detect that the peer has closed the connection while the accepted