Skip to content

Commit

Permalink
Enhance FAP metrics (#9751)
Browse files Browse the repository at this point in the history
ref #8673

Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CalvinNeo and ti-chi-bot[bot] authored Dec 30, 2024
1 parent 08abd71 commit 00df1eb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_failed_other, {{"type", "failed_other"}}), \
F(type_failed_cancel, {{"type", "failed_cancel"}}), \
F(type_failed_no_suitable, {{"type", "failed_no_suitable"}}), \
F(type_failed_timeout, {{"type", "failed_timeout"}}), \
F(type_failed_no_candidate, {{"type", "failed_no_candidate"}}), \
F(type_failed_baddata, {{"type", "failed_baddata"}}), \
F(type_failed_repeated, {{"type", "failed_repeated"}}), \
F(type_failed_build_chkpt, {{"type", "failed_build_chkpt"}}), \
Expand Down
102 changes: 59 additions & 43 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

/// FastAddPeer Result Metrics:
/// - type_total counts the total number of FAP tasks.
/// - type_success_transform counts how many FAP snapshots have been written.
/// - type_failed_other counts errors not originated from FAP, such as a thread pool failure.
/// - type_failed_cancel counts when a FAP task is canceled outside of the FAP builder thread, due to either timeout or not.
/// - type_failed_no_suitable / type_failed_no_candidate are failures in selecting stage.
/// - type_failed_repeated is a special case where we found an FAP snapshot exists when we are doing write staging.
/// - type_failed_baddata counts when a exception throws from FAP builder thread, mostly it could be a unhandled exception from UniPS or DeltaTree.
/// It results in a `BadData` returncode to Proxy.
/// - type_failed_build_chkpt is used especially to track when exception throws when building segments in write stage.
/// - type_reuse_chkpt_cache counts how many times a checkpoint is reused to build an FAP snapshot.
/// - type_restore counts the number FAP snapshot is restored for applying after a restart.
/// - type_succeed count succeed FAP tasks, when an FAP snapshot is applied.

using raft_serverpb::PeerState;
using raft_serverpb::RaftApplyState;
using raft_serverpb::RegionLocalState;

FastAddPeerRes genFastAddPeerRes(
FastAddPeerStatus status,
std::string && apply_str,
Expand Down Expand Up @@ -107,11 +125,6 @@ std::vector<StoreID> getCandidateStoreIDsForRegion(TMTContext & tmt_context, UIn
return store_ids;
}

using raft_serverpb::PeerState;
using raft_serverpb::RaftApplyState;
using raft_serverpb::RegionLocalState;


std::optional<CheckpointRegionInfoAndData> tryParseRegionInfoFromCheckpointData(
ParsedCheckpointDataHolderPtr checkpoint_data_holder,
UInt64 remote_store_id,
Expand Down Expand Up @@ -219,15 +232,18 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(

// Get candidate stores.
const auto & settings = tmt.getContext().getSettingsRef();
auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id();
const auto current_store_id = tmt.getKVStore()->clonedStoreMeta().id();
std::vector<StoreID> candidate_store_ids = getCandidateStoreIDsForRegion(tmt, region_id, current_store_id);

fiu_do_on(FailPoints::force_fap_worker_throw, { throw Exception(ErrorCodes::LOGICAL_ERROR, "mocked throw"); });

// It could be the first TiFlash peer(most cases), thus there's no candidate for FAP at all.
// NOTE that it is unpredictable that which TiFlash node are scheduled for the peer first,
// it could be always TiFlash node "p1" if we schedule from 0 replica -> 2 replica.
if (candidate_store_ids.empty())
{
LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id);
GET_METRIC(tiflash_fap_task_result, type_failed_no_suitable).Increment();
GET_METRIC(tiflash_fap_task_result, type_failed_no_candidate).Increment();
return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable);
}
LOG_DEBUG(log, "Begin to select checkpoint for region_id={}", region_id);
Expand All @@ -244,51 +260,50 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
auto [data_seq, checkpoint_data] = fap_ctx->getNewerCheckpointData(tmt.getContext(), store_id, checked_seq);

checked_seq_map[store_id] = data_seq;
if (data_seq > checked_seq)
if (data_seq <= checked_seq)
continue;
RUNTIME_CHECK(checkpoint_data != nullptr);
auto maybe_region_info
= tryParseRegionInfoFromCheckpointData(checkpoint_data, store_id, region_id, proxy_helper);
if (!maybe_region_info.has_value())
continue;
const auto & checkpoint_info = std::get<0>(maybe_region_info.value());
auto & region = std::get<1>(maybe_region_info.value());
auto & region_state = std::get<3>(maybe_region_info.value());
if (tryResetPeerIdInRegion(region, region_state, new_peer_id))
{
RUNTIME_CHECK(checkpoint_data != nullptr);
auto maybe_region_info
= tryParseRegionInfoFromCheckpointData(checkpoint_data, store_id, region_id, proxy_helper);
if (!maybe_region_info.has_value())
continue;
const auto & checkpoint_info = std::get<0>(maybe_region_info.value());
auto & region = std::get<1>(maybe_region_info.value());
auto & region_state = std::get<3>(maybe_region_info.value());
if (tryResetPeerIdInRegion(region, region_state, new_peer_id))
{
LOG_INFO(
log,
"Select checkpoint with data_seq={}, remote_store_id={} elapsed={} size(candidate_store_id)={} "
"region_id={}",
data_seq,
checkpoint_info->remote_store_id,
watch.elapsedSeconds(),
candidate_store_ids.size(),
region_id);
GET_METRIC(tiflash_fap_task_duration_seconds, type_select_stage).Observe(watch.elapsedSeconds());
return maybe_region_info.value();
}
else
{
LOG_DEBUG(
log,
"Checkpoint with seq {} doesn't contain reusable region info region_id={} from_store_id={}",
data_seq,
region_id,
store_id);
}
LOG_INFO(
log,
"Select checkpoint with data_seq={}, remote_store_id={} elapsed={} size(candidate_store_id)={} "
"region_id={}",
data_seq,
checkpoint_info->remote_store_id,
watch.elapsedSeconds(),
candidate_store_ids.size(),
region_id);
GET_METRIC(tiflash_fap_task_duration_seconds, type_select_stage).Observe(watch.elapsedSeconds());
return maybe_region_info.value();
}
else
{
LOG_DEBUG(
log,
"Checkpoint with seq {} doesn't contain reusable region info region_id={} from_store_id={}",
data_seq,
region_id,
store_id);
}
}
{
if (watch.elapsedSeconds() >= settings.fap_wait_checkpoint_timeout_seconds)
{
// This could happen if there are too many pending tasks in queue,
// This could happen if the checkpoint we got is not fresh enough.
LOG_INFO(
log,
"FastAddPeer timeout when select checkpoints region_id={} new_peer_id={}",
region_id,
new_peer_id);
GET_METRIC(tiflash_fap_task_result, type_failed_timeout).Increment();
GET_METRIC(tiflash_fap_task_result, type_failed_no_suitable).Increment();
return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable);
}
SYNC_FOR("in_FastAddPeerImplSelect::before_sleep");
Expand Down Expand Up @@ -501,7 +516,8 @@ FastAddPeerRes FastAddPeerImpl(
new_peer_id,
std::move(std::get<CheckpointRegionInfoAndData>(res)),
start_time);
GET_METRIC(tiflash_fap_task_result, type_success_transform).Increment();
if (final_res.status == FastAddPeerStatus::Ok)
GET_METRIC(tiflash_fap_task_result, type_success_transform).Increment();
return final_res;
}
return std::get<FastAddPeerRes>(res);
Expand Down Expand Up @@ -705,6 +721,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
// We need to schedule the task.
auto current_time = FAPAsyncTasks::getCurrentMillis();
GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Increment();
GET_METRIC(tiflash_fap_task_result, type_total).Increment();
auto job_func = [server, region_id, new_peer_id, fap_ctx, current_time]() {
std::string origin_name = getThreadName();
SCOPE_EXIT({ setThreadName(origin_name.c_str()); });
Expand Down Expand Up @@ -758,7 +775,6 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
new_peer_id,
region_id,
elapsed);
GET_METRIC(tiflash_fap_task_result, type_total).Increment();
GET_METRIC(tiflash_fap_task_duration_seconds, type_phase1_total).Observe(elapsed / 1000.0);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase

global_context.getSharedContextDisagg()->initFastAddPeerContext(25);
proxy_instance = std::make_unique<MockRaftStoreProxy>();
proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3}})";
proxy_helper = proxy_instance->generateProxyHelper();
KVStoreTestBase::reloadKVSFromDisk(false);
{
Expand Down

0 comments on commit 00df1eb

Please sign in to comment.