Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more resource group metrics #9427

Merged
merged 8 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions dbms/src/Common/ComputeLabelHolder.cpp

This file was deleted.

46 changes: 0 additions & 46 deletions dbms/src/Common/ComputeLabelHolder.h

This file was deleted.

33 changes: 12 additions & 21 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Common/ComputeLabelHolder.h>
#include <Common/Exception.h>
#include <Common/ProcessCollector_fwd.h>
#include <Common/TiFlashBuildInfo.h>
Expand Down Expand Up @@ -657,25 +656,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute", \
Counter, \
F(type_mpp, \
{{"type", "mpp"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, \
{{"type", "cop"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop_stream, \
{{"type", "cop_stream"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, \
{{"type", "batch"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()})) \
M(tiflash_shared_block_schemas, \
"statistics about shared block schemas of ColumnFiles", \
Gauge, \
Expand Down Expand Up @@ -856,7 +836,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \
F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \
F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"})) \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"}), \
F(type_handling_mpp_task_dispatch, {"type", "handling_mpp_task_dispatch"}), \
F(type_handling_mpp_task_establish, {"type", "handling_mpp_task_establish"}), \
F(type_handling_mpp_task_cancel, {"type", "handling_mpp_task_cancel"}), \
F(type_handling_mpp_task_run, {"type", "handling_mpp_task_run"})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute for each resource group", \
Counter, \
F(type_mpp, {"type", "mpp"}), \
F(type_cop, {"type", "cop"}), \
F(type_cop_stream, {"type", "cop_stream"}), \
F(type_batch, {"type", "batch"}), ) \
M(tiflash_storage_io_limiter_pending_count, \
"I/O limiter pending count", \
Counter, \
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ try
{
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();
const auto & resource_group = dag_context.getResourceGroupName();

auto query_executor = queryExecute(context, internal);
if (!query_executor)
Expand Down Expand Up @@ -152,7 +153,8 @@ try

auto ru_info = update_ru_statistics();
LOG_INFO(log, "cop finish with request unit: cpu={} read={}", ru_info.cpu_ru, ru_info.read_ru);
GET_METRIC(tiflash_compute_request_unit, type_cop).Increment(ru_info.cpu_ru + ru_info.read_ru);
GET_RESOURCE_GROUP_METRIC(tiflash_compute_request_unit, type_cop, resource_group)
.Increment(ru_info.cpu_ru + ru_info.read_ru);
if (dag_context.collect_execution_summaries)
{
ExecutorStatisticsCollector statistics_collector(log->identifier());
Expand Down Expand Up @@ -184,7 +186,8 @@ try
bool need_send = false;
auto ru_info = update_ru_statistics();
LOG_INFO(log, "cop stream finish with request unit: cpu={} read={}", ru_info.cpu_ru, ru_info.read_ru);
GET_METRIC(tiflash_compute_request_unit, type_cop_stream).Increment(ru_info.cpu_ru + ru_info.read_ru);
GET_RESOURCE_GROUP_METRIC(tiflash_compute_request_unit, type_cop_stream, resource_group)
.Increment(ru_info.cpu_ru + ru_info.read_ru);
if (dag_context.collect_execution_summaries)
{
ExecutorStatisticsCollector statistics_collector(log->identifier());
Expand Down Expand Up @@ -237,7 +240,8 @@ try
bool need_send = false;
auto ru_info = update_ru_statistics();
LOG_INFO(log, "batch cop finish with request unit: cpu={} read={}", ru_info.cpu_ru, ru_info.read_ru);
GET_METRIC(tiflash_compute_request_unit, type_batch).Increment(ru_info.cpu_ru + ru_info.read_ru);
GET_RESOURCE_GROUP_METRIC(tiflash_compute_request_unit, type_batch, resource_group)
.Increment(ru_info.cpu_ru + ru_info.read_ru);
if (dag_context.collect_execution_summaries)
{
ExecutorStatisticsCollector statistics_collector(log->identifier());
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ EstablishCallData::~EstablishCallData()
if (stopwatch)
{
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_establish, resource_group_name)
.Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn)
.Observe(stopwatch->elapsedSeconds());
}
Expand Down Expand Up @@ -167,6 +169,7 @@ void EstablishCallData::initRpc()

connection_id = fmt::format("tunnel{}+{}", request.sender_meta().task_id(), request.receiver_meta().task_id());
query_id = MPPQueryId(request.sender_meta()).toString();
resource_group_name = request.sender_meta().resource_group_name();
auto res = service->establishMPPConnectionAsync(this);

if (!res.ok())
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/EstablishCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class EstablishCallData final
const mpp::EstablishMPPConnectionRequest & getRequest() const { return request; }
grpc::ServerContext * getGrpcContext() { return &ctx; }

String getResourceGroupName() const { return resource_group_name; }

private:
/// WARNING: Since a event from one grpc completion queue may be handled by different
/// thread, it's EXTREMELY DANGEROUS to read/write any data after calling a grpc function
Expand Down Expand Up @@ -133,6 +135,7 @@ class EstablishCallData final
std::shared_ptr<DB::AsyncTunnelSender> async_tunnel_sender;
std::unique_ptr<Stopwatch> stopwatch;
String query_id;
String resource_group_name;
String connection_id;
double waiting_task_time_ms = 0;
};
Expand Down
18 changes: 16 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,12 @@ grpc::Status FlashService::DispatchMPPTask(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
const auto & task_meta = request->meta();
const auto & resource_group = task_meta.resource_group_name();
LOG_INFO(
log,
"Handling mpp dispatch request, task: {}, resource_group: {}, conn_id: {}, conn_alias: {}",
MPPTaskId(task_meta).toString(),
task_meta.resource_group_name(),
resource_group,
task_meta.connection_id(),
task_meta.connection_alias());
auto check_result = checkGrpcContext(grpc_context);
Expand All @@ -514,6 +515,7 @@ grpc::Status FlashService::DispatchMPPTask(

GET_METRIC(tiflash_coprocessor_request_count, type_dispatch_mpp_task).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Increment();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_dispatch, resource_group).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();
if (!tryToResetMaxThreadsMetrics())
Expand All @@ -533,6 +535,7 @@ grpc::Status FlashService::DispatchMPPTask(
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Decrement();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Decrement();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_dispatch, resource_group).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_dispatch_mpp_task)
.Observe(watch.elapsedSeconds());
GET_METRIC(tiflash_coprocessor_response_bytes, type_dispatch_mpp_task).Increment(response->ByteSizeLong());
Expand Down Expand Up @@ -610,6 +613,11 @@ grpc::Status AsyncFlashService::establishMPPConnectionAsync(EstablishCallData *

GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
GET_RESOURCE_GROUP_METRIC(
tiflash_resource_group,
type_handling_mpp_task_establish,
call_data->getResourceGroupName())
.Increment();

call_data->startEstablishConnection();
call_data->tryConnectTunnel();
Expand All @@ -626,6 +634,7 @@ grpc::Status FlashService::EstablishMPPConnection(
// We need to find it out and bind the grpc stream with it.
const auto & receiver_meta = request->receiver_meta();
const auto & sender_meta = request->sender_meta();
const auto & resource_group = receiver_meta.resource_group_name();
assert(receiver_meta.resource_group_name() == sender_meta.resource_group_name());
assert(receiver_meta.connection_id() == sender_meta.connection_id());
assert(receiver_meta.connection_alias() == receiver_meta.connection_alias());
Expand All @@ -635,7 +644,7 @@ grpc::Status FlashService::EstablishMPPConnection(
"conn_alias: {}",
MPPTaskId(receiver_meta).toString(),
MPPTaskId(sender_meta).toString(),
receiver_meta.resource_group_name(),
resource_group,
receiver_meta.connection_id(),
receiver_meta.connection_alias());

Expand All @@ -651,6 +660,7 @@ grpc::Status FlashService::EstablishMPPConnection(

GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_establish, resource_group).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();
if (!tryToResetMaxThreadsMetrics())
Expand All @@ -669,6 +679,7 @@ grpc::Status FlashService::EstablishMPPConnection(
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_establish, resource_group).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn)
.Observe(watch.elapsedSeconds());
// TODO: update the value of metric tiflash_coprocessor_response_bytes.
Expand Down Expand Up @@ -723,11 +734,14 @@ grpc::Status FlashService::CancelMPPTask(
return grpc::Status(grpc::StatusCode::INTERNAL, std::move(err_msg));
}

const auto & resource_group = request->meta().resource_group_name();
GET_METRIC(tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_cancel, resource_group).Increment();
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Decrement();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_handling_mpp_task_cancel, resource_group).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_cancel_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(tiflash_coprocessor_response_bytes, type_cancel_mpp_task).Increment(response->ByteSizeLong());
});
Expand Down
Loading