Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add metrics for log ingestion #4411

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::util::to_pipeline_version;
Expand All @@ -40,6 +41,10 @@ use crate::error::{
use crate::http::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -298,14 +303,27 @@ async fn ingest_logs_inner(
pipeline_data: PipelineValue,
query_ctx: QueryContextRef,
) -> Result<HttpResponse> {
let start = std::time::Instant::now();
let db = query_ctx.get_db_string();
let exec_timer = std::time::Instant::now();

let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let transform_timer = std::time::Instant::now();
let transformed_data: Rows = pipeline
.exec(pipeline_data)
.map_err(|reason| PipelineTransformSnafu { reason }.build())
.inspect(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.map_err(|reason| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
PipelineTransformSnafu { reason }.build()
})
.context(PipelineSnafu)?;

let insert_request = RowInsertRequest {
Expand All @@ -317,9 +335,26 @@ async fn ingest_logs_inner(
};
let output = state.insert_logs(insert_requests, query_ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_HTTP_LOGS_INGESTION_COUNTER
.with_label_values(&[db.as_str()])
.inc_by(*rows as u64);
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_HTTP_LOGS_INGESTION_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}

let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(start.elapsed().as_millis() as u64);
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}

Expand Down
24 changes: 24 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub(crate) const METRIC_POSTGRES_SIMPLE_QUERY: &str = "simple";
pub(crate) const METRIC_POSTGRES_EXTENDED_QUERY: &str = "extended";
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_RESULT_LABEL: &str = "result";

pub(crate) const METRIC_SUCCESS_VALUE: &str = "success";
pub(crate) const METRIC_FAILURE_VALUE: &str = "failure";

lazy_static! {
pub static ref METRIC_ERROR_COUNTER: IntCounterVec = register_int_counter_vec!(
Expand Down Expand Up @@ -130,6 +134,26 @@ lazy_static! {
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_http_logs_ingestion_counter",
"servers http logs ingestion counter",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_ingestion_elapsed",
"servers http logs ingestion elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_logs_transform_elapsed",
"servers http logs transform elapsed",
&[METRIC_DB_LABEL, METRIC_RESULT_LABEL]
)
.unwrap();
pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!(
"greptime_servers_mysql_connection_count",
"servers mysql connection count"
Expand Down
Loading