From 0cf44e1e47076e6d5188d2756afe79653034d9d1 Mon Sep 17 00:00:00 2001 From: localhost Date: Thu, 26 Dec 2024 11:06:25 +0800 Subject: [PATCH] chore: add more info for pipeline dryrun API (#5232) --- src/servers/src/error.rs | 1 + src/servers/src/http/event.rs | 31 ++++++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index c1c331c33744..88a0ad21b623 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -159,6 +159,7 @@ pub enum Error { #[snafu(display("Pipeline management api error"))] Pipeline { + #[snafu(source)] source: pipeline::error::Error, #[snafu(implicit)] location: Location, diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index c0926af833d6..14e8ad7dd5e4 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -30,6 +30,7 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, Json, TypedHeader}; use bytes::Bytes; +use common_error::ext::ErrorExt; use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; @@ -41,13 +42,13 @@ use pipeline::util::to_pipeline_version; use pipeline::{GreptimeTransformer, PipelineVersion}; use prost::Message; use serde::{Deserialize, Serialize}; -use serde_json::{Deserializer, Map, Value}; +use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, - ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, + status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, + ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; use crate::http::extractor::LogTableName; use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; @@ -404,6 +405,14 @@ fn check_data_valid(data_len: usize) -> Result<()> { Ok(()) } +fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response { + let body = Json(json!({ + "error": format!("{}: {}", step_msg,e.output_msg()), + })); + + (status_code_to_http_status(&e.status_code()), body).into_response() +} + #[axum_macros::debug_handler] pub async fn pipeline_dryrun( State(log_state): State, @@ -431,8 +440,20 @@ pub async fn pipeline_dryrun( dryrun_pipeline_inner(data, &pipeline) } Some(pipeline) => { - let pipeline = handler.build_pipeline(&pipeline)?; - dryrun_pipeline_inner(data, &pipeline) + let pipeline = handler.build_pipeline(&pipeline); + match pipeline { + Ok(pipeline) => match dryrun_pipeline_inner(data, &pipeline) { + Ok(response) => Ok(response), + Err(e) => Ok(add_step_info_for_pipeline_dryrun_error( + "Failed to exec pipeline", + e, + )), + }, + Err(e) => Ok(add_step_info_for_pipeline_dryrun_error( + "Failed to build pipeline", + e, + )), + } } } }