Skip to content

Commit

Permalink
Abort uploading checkpoint if s3 lock client is not initialized (#9655)…
Browse files Browse the repository at this point in the history
… (#9659)

close #9394

Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: Calvin Neo <[email protected]>
  • Loading branch information
ti-chi-bot and CalvinNeo authored Nov 20, 2024
1 parent bf389c9 commit 990562e
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h>
#include <Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
#include <TestUtils/InputStreamTestUtils.h>

Expand All @@ -40,6 +41,8 @@ class DeltaMergeStoreVectorTest
void SetUp() override
{
TiFlashStorageTestBasic::SetUp();
auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);
store = reload();
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic
ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client));
TiFlashStorageTestBasic::SetUp();
auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);
if (global_context.getSharedContextDisagg()->remote_data_store == nullptr)
{
already_initialize_data_store = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,7 @@ class VectorIndexSegmentOnS3Test
TiFlashStorageTestBasic::SetUp();

auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);

global_context.getSharedContextDisagg()->initRemoteDataStore(
global_context.getFileProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class DMStoreForSegmentReadTaskTest : public DeltaMergeStoreTest
void SetUp() override
{
DeltaMergeStoreTest::SetUp();
auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);
initReadNodePageCacheIfUninitialized();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ class SegmentReplaceStableDataDisaggregated
TiFlashStorageTestBasic::SetUp();

auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);

ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store == nullptr);
global_context.getSharedContextDisagg()->initRemoteDataStore(
Expand Down
15 changes: 12 additions & 3 deletions dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ TMTContext::TMTContext(

void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
{
if (!raftproxy_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()
&& !context.getSharedContextDisagg()->isDisaggregatedComputeMode())
if (S3::ClientFactory::instance().isEnabled() && !context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
kvstore->fetchProxyConfig(proxy_helper);
if (kvstore->getProxyConfigSummay().valid)
Expand All @@ -185,7 +184,7 @@ void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
/*id*/ kvstore->getProxyConfigSummay().engine_addr,
etcd_client);
}
else
else if (!raftproxy_config.pd_addrs.empty())
{
LOG_INFO(
Logger::get(),
Expand All @@ -194,9 +193,19 @@ void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
s3gc_owner
= OwnerManager::createS3GCOwner(context, /*id*/ raftproxy_config.advertise_engine_addr, etcd_client);
}
else
{
#ifdef DBMS_PUBLIC_GTEST
s3gc_owner = OwnerManager::createMockOwner("mocked");
#else
LOG_INFO(Logger::get(), "quit init s3 gc manager, no effective pd addr");
return;
#endif
}
s3gc_owner->campaignOwner(); // start campaign
s3lock_client = std::make_shared<S3::S3LockClient>(cluster.get(), s3gc_owner);

LOG_INFO(Logger::get(), "Build s3lock client success");
S3::S3GCConfig remote_gc_config;
{
Int64 gc_method_int = context.getSettingsRef().remote_gc_method;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ class RegionKVStoreTestFAP : public KVStoreTestBase
public:
void SetUp() override
{
// Need S3 for S3 lock client, otherwise UniversalPageStorage::write would block waiting.
DB::tests::TiFlashTestEnv::enableS3Config();
test_path = TiFlashTestEnv::getTemporaryPath("/region_kvs_fap_test");
auto & global_context = TiFlashTestEnv::getGlobalContext();
global_context.getTMTContext().initS3GCManager(nullptr);
// clean data and create path pool instance
path_pool = TiFlashTestEnv::createCleanPathPool(test_path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ bool UniversalPageStorageService::uploadCheckpoint()
return false;
}
auto s3lock_client = tmt.getS3LockClient();
if (s3lock_client == nullptr)
{
LOG_INFO(log, "Skip checkpoint because s3lock_client is not initialized");
return false;
}

const bool force_upload = upload_all_at_next_upload.load();
bool upload_done = uploadCheckpointImpl(store_info, s3lock_client, remote_store, force_upload);
if (force_upload && upload_done)
Expand Down

0 comments on commit 990562e

Please sign in to comment.