Skip to content

Commit

Permalink
Storages: Shutdown the LocalIndexScheduler before shutting down PageS…
Browse files Browse the repository at this point in the history
…torage/DeltaMergeStore (#9712)

close #9714

Storages: Shutdown the LocalIndexScheduler before shutting down PageStorage/DeltaMergeStore
* Add a method `LocalIndexerScheduler::shutdown()` and ensure the running task are all finished before shutting down the GlobalPageStorage in `ContextShared::shutdown()`.

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Dec 13, 2024
1 parent c2c041c commit b16a5f9
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 35 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ struct ContextShared
return;
shutdown_called = true;

// The local index scheduler must be shutdown to stop all
// running tasks before shutting down `global_storage_pool`.
if (global_local_indexer_scheduler)
{
global_local_indexer_scheduler->shutdown();
}

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (elem.read_rows != 0)
{
LOG_INFO(
LOG_DEBUG(
execute_query_logger,
"Read {} rows, {} in {:.3f} sec., {} rows/sec., {}/sec.",
elem.read_rows,
Expand Down Expand Up @@ -421,7 +421,7 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in)
in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_INFO(logger, pipeline_log_str());
LOG_DEBUG(logger, pipeline_log_str());
}

BlockIO executeQuery(const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage)
Expand Down
17 changes: 14 additions & 3 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options)
start();
}

LocalIndexerScheduler::~LocalIndexerScheduler()
void LocalIndexerScheduler::shutdown()
{
LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish...");
LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish...");

// First quit the scheduler. Don't schedule more tasks.
is_shutting_down = true;
Expand All @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler()

// Then wait all running tasks to finish.
pool.reset();
LOG_INFO(logger, "LocalIndexerScheduler is shutdown.");
}

LocalIndexerScheduler::~LocalIndexerScheduler()
{
if (!is_shutting_down)
{
shutdown();
}
LOG_INFO(logger, "LocalIndexerScheduler is destroyed");
}

Expand Down Expand Up @@ -295,7 +303,10 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock<std::mutex> & lock
}
};

RUNTIME_CHECK(pool);
if (is_shutting_down || !pool)
// shutting down, retry again
return false;

if (!pool->trySchedule(real_job))
// Concurrent task limit reached
return false;
Expand Down
15 changes: 11 additions & 4 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class LocalIndexerScheduler

~LocalIndexerScheduler();

/**
* @brief Stop the scheduler and wait for running tasks to finish.
* Note that this method won't clear the task pushed.
*/
void shutdown();

/**
* @brief Start the scheduler. In some tests we need to start scheduler
* after some tasks are pushed.
Expand All @@ -101,7 +107,7 @@ class LocalIndexerScheduler

/**
* @brief Blocks until there is no tasks remaining in the queue and there is no running tasks.
* Should be only used in tests.
* **Should be only used in tests**.
*/
void waitForFinish();

Expand All @@ -114,6 +120,7 @@ class LocalIndexerScheduler

/**
* @brief Drop all tasks matching specified keyspace id and table id.
* Note that this method won't drop the running tasks.
*/
size_t dropTasks(KeyspaceID keyspace_id, TableID table_id);

Expand Down Expand Up @@ -147,9 +154,6 @@ class LocalIndexerScheduler
void moveBackReadyTasks(std::unique_lock<std::mutex> & lock);

private:
bool is_started = false;
std::thread scheduler_thread;

/// Try to add a task to the pool. Returns false if the pool is full
/// (for example, reaches concurrent task limit or memory limit).
/// When pool is full, we will not try to schedule any more tasks at this moment.
Expand All @@ -160,6 +164,9 @@ class LocalIndexerScheduler
/// heavy pressure.
bool tryAddTaskToPool(std::unique_lock<std::mutex> & lock, const InternalTaskPtr & task);

std::thread scheduler_thread;
bool is_started = false;

KeyspaceID last_schedule_keyspace_id = 0;
std::map<KeyspaceID, TableID> last_schedule_table_id_by_ks;

Expand Down
40 changes: 25 additions & 15 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( //
DMContext & context,
PageIdU64 segment_id)
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Segment::SegmentMetaInfo segment_info;
readSegmentMetaInfo(buf, segment_info);
try
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
readSegmentMetaInfo(buf, segment_info);

return segment;
auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);

return segment;
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
return {};
}

Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class Segment
PageIdU64 next_segment_id{};
PageIdU64 delta_id{};
PageIdU64 stable_id{};

String toString() const
{
return fmt::format(
"{{version={} epoch={} range={} segment_id={} next_segment_id={} delta_id={} stable_id={}}}",
version,
epoch,
range.toString(),
segment_id,
next_segment_id,
delta_id,
stable_id);
}
};

using SegmentMetaInfos = std::vector<SegmentMetaInfo>;
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ namespace DB::ErrorCodes
extern const int DT_DELTA_INDEX_ERROR;
}

namespace DB
{
namespace DM
{
namespace GC
namespace DB::DM::GC
{
bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
const DMContext & context, //
Expand All @@ -63,7 +59,7 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
double invalid_data_ratio_threshold,
const LoggerPtr & log);
}
namespace tests
namespace DB::DM::tests
{

class SegmentOperationTest : public SegmentTestBasic
Expand Down Expand Up @@ -1366,7 +1362,4 @@ try
}
CATCH


} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB::DM::tests
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Checksum.h>
#include <IO/Encryption/MockKeyManager.h>
#include <IO/FileProvider/FileProvider.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -871,7 +872,7 @@ class PageStorageControlV3
ChecksumClass digest;
digest.update(buffer, size);
auto checksum = digest.checksum();
fmt::print("checksum: 0x{:X}\n", checksum);
fmt::println("checksum: 0x{:X}", checksum);

auto hex_str = Redact::keyToHexString(buffer, size);
delete[] buffer;
Expand Down

0 comments on commit b16a5f9

Please sign in to comment.