Skip to content

Commit

Permalink
refactor: unify the execution of show stmt (GreptimeTeam#1340)
Browse files Browse the repository at this point in the history
* refactor: unify the execution of show stmt
  • Loading branch information
MichaelScofield authored and paomian committed Oct 19, 2023
1 parent 1cf6ceb commit eb31c0e
Show file tree
Hide file tree
Showing 14 changed files with 434 additions and 327 deletions.
74 changes: 74 additions & 0 deletions docs/how-to/how-to-implement-sql-statement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
This document introduces how to implement SQL statements in GreptimeDB.

The execution entry point for SQL statements locates at Frontend Instance. You can see it has
implemented `SqlQueryHandler`:

```rust
impl SqlQueryHandler for Instance {
type Error = Error;

async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
// ...
}
}
```

Normally, when a SQL query arrives at GreptimeDB, the `do_query` method will be called. After some parsing work, the SQL
will be feed into `StatementExecutor`:

```rust
// in Frontend Instance:
self.statement_executor.execute_sql(stmt, query_ctx).await
```

That's where we handle our SQL statements. You can just create a new match arm for your statement there, then the
statement is implemented for both GreptimeDB Standalone and Cluster. You can see how `DESCRIBE TABLE` is implemented as
an example.

Now, what if the statements should be handled differently for GreptimeDB Standalone and Cluster? You can see there's
a `SqlStatementExecutor` field in `StatementExecutor`. Each GreptimeDB Standalone and Cluster has its own implementation
of `SqlStatementExecutor`. If you are going to implement the statements differently in the two mode (
like `CREATE TABLE`), you have to implement them in their own `SqlStatementExecutor`s.

Summarize as the diagram below:

```text
SQL query
|
v
+---------------------------+
| SqlQueryHandler::do_query |
+---------------------------+
|
| SQL parsing
v
+--------------------------------+
| StatementExecutor::execute_sql |
+--------------------------------+
|
| SQL execution
v
+----------------------------------+
| commonly handled statements like |
| "plan_exec" for selection or |
+----------------------------------+
| |
For Standalone | | For Cluster
v v
+---------------------------+ +---------------------------+
| SqlStatementExecutor impl | | SqlStatementExecutor impl |
| in Datanode Instance | | in Frontend DistInstance |
+---------------------------+ +---------------------------+
```

Note that some SQL statements can be executed in our QueryEngine, in the form of `LogicalPlan`. You can follow the
invocation path down to the `QueryEngine` implementation from `StatementExecutor::plan_exec`. For now, there's only
one `DatafusionQueryEngine` for both GreptimeDB Standalone and Cluster. That lone query engine works for both modes is
because GreptimeDB read/write data through `Table` trait, and each mode has its own `Table` implementation.

We don't have any bias towards whether statements should be handled in query engine or `StatementExecutor`. You can
implement one kind of statement in both places. For example, `Insert` with selection is handled in query engine, because
we can easily do the query part there. However, `Insert` without selection is not, for the cost of parsing statement
to `LogicalPlan` is not neglectable. So generally if the SQL query is simple enough, you can handle it
in `StatementExecutor`; otherwise if it is complex or has some part of selection, it should be parsed to `LogicalPlan`
and handled in query engine.
14 changes: 8 additions & 6 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_trait::async_trait;
use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutor;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl Instance {
match stmt {
// TODO(LFC): Remove SQL execution branch here.
// Keep this because substrait can't handle much of SQLs now.
QueryStatement::Sql(Statement::Query(_)) => {
QueryStatement::Sql(Statement::Query(_)) | QueryStatement::Promql(_) => {
let plan = self
.query_engine
.planner()
Expand All @@ -77,7 +78,9 @@ impl Instance {
.await
.context(ExecuteLogicalPlanSnafu)
}
_ => self.execute_stmt(stmt, ctx).await,
QueryStatement::Sql(stmt) => {
self.execute_sql(stmt, ctx).await.context(ExecuteSqlSnafu)
}
}
}
Query::LogicalPlan(plan) => self.execute_logical(plan).await,
Expand Down Expand Up @@ -242,12 +245,11 @@ mod test {
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));

let stmt = QueryLanguageParser::parse_sql(
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(
"INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)",
)
.unwrap();
) else { unreachable!() };
let output = instance
.execute_stmt(stmt, QueryContext::arc())
.execute_sql(stmt, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
Expand Down
59 changes: 18 additions & 41 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_telemetry::logging::info;
use common_telemetry::timer;
use query::error::QueryExecutionSnafu;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::StatementHandler;
use query::query_engine::SqlStatementExecutor;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::ast::ObjectName;
Expand All @@ -31,27 +31,23 @@ use table::engine::TableReference;
use table::requests::{CopyDirection, CopyTableRequest, CreateDatabaseRequest, DropTableRequest};

use crate::error::{
self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result,
TableIdProviderNotFoundSnafu,
self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, NotSupportSqlSnafu,
PlanStatementSnafu, Result, TableIdProviderNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics;
use crate::sql::{SqlHandler, SqlRequest};

impl Instance {
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
async fn do_execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
QueryStatement::Sql(Statement::Insert(insert)) => {
Statement::Insert(insert) => {
let request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx)
.await?;
self.sql_handler.insert(request).await
}
QueryStatement::Sql(Statement::CreateDatabase(create_database)) => {
Statement::CreateDatabase(create_database) => {
let request = CreateDatabaseRequest {
db_name: create_database.name.to_string(),
create_if_not_exists: create_database.if_not_exists,
Expand All @@ -64,7 +60,7 @@ impl Instance {
.await
}

QueryStatement::Sql(Statement::CreateTable(create_table)) => {
Statement::CreateTable(create_table) => {
let table_id = self
.table_id_provider
.as_ref()
Expand All @@ -88,10 +84,7 @@ impl Instance {
.execute(SqlRequest::CreateTable(request), query_ctx)
.await
}
QueryStatement::Sql(Statement::CreateExternalTable(_create_external_table)) => {
unimplemented!()
}
QueryStatement::Sql(Statement::Alter(alter_table)) => {
Statement::Alter(alter_table) => {
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
Expand All @@ -100,7 +93,7 @@ impl Instance {
.execute(SqlRequest::Alter(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::DropTable(drop_table)) => {
Statement::DropTable(drop_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
let req = DropTableRequest {
Expand All @@ -112,20 +105,7 @@ impl Instance {
.execute(SqlRequest::DropTable(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowDatabases(show_databases)) => {
self.sql_handler
.execute(SqlRequest::ShowDatabases(show_databases), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowTables(show_tables)) => {
self.sql_handler
.execute(SqlRequest::ShowTables(show_tables), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => {
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
}
QueryStatement::Sql(Statement::Copy(copy_table)) => {
Statement::Copy(copy_table) => {
let req = match copy_table {
CopyTable::To(copy_table) => {
let CopyTableArgument {
Expand Down Expand Up @@ -173,13 +153,10 @@ impl Instance {
.execute(SqlRequest::CopyTable(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::Query(_))
| QueryStatement::Sql(Statement::Explain(_))
| QueryStatement::Sql(Statement::Use(_))
| QueryStatement::Sql(Statement::Tql(_))
| QueryStatement::Sql(Statement::Delete(_))
| QueryStatement::Sql(Statement::DescribeTable(_))
| QueryStatement::Promql(_) => unreachable!(),
_ => NotSupportSqlSnafu {
msg: format!("not supported to execute {stmt:?}"),
}
.fail(),
}
}

Expand Down Expand Up @@ -276,13 +253,13 @@ pub fn table_idents_to_full_name(
}

#[async_trait]
impl StatementHandler for Instance {
async fn handle_statement(
impl SqlStatementExecutor for Instance {
async fn execute_sql(
&self,
stmt: QueryStatement,
stmt: Statement,
query_ctx: QueryContextRef,
) -> query::error::Result<Output> {
self.execute_stmt(stmt, query_ctx)
self.do_execute_sql(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)
Expand Down
14 changes: 1 addition & 13 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ use common_error::prelude::BoxedError;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use query::sql::{show_databases, show_tables};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::*;
use table::{Table, TableRef};

use crate::error::{
self, CloseTableEngineSnafu, ExecuteSqlSnafu, Result, TableEngineNotFoundSnafu,
TableNotFoundSnafu,
self, CloseTableEngineSnafu, Result, TableEngineNotFoundSnafu, TableNotFoundSnafu,
};
use crate::instance::sql::table_idents_to_full_name;

Expand All @@ -49,8 +46,6 @@ pub enum SqlRequest {
Alter(AlterTableRequest),
DropTable(DropTableRequest),
FlushTable(FlushTableRequest),
ShowDatabases(ShowDatabases),
ShowTables(ShowTables),
CopyTable(CopyTableRequest),
}

Expand Down Expand Up @@ -92,13 +87,6 @@ impl SqlHandler {
CopyDirection::Export => self.copy_table_to(req).await,
CopyDirection::Import => self.copy_table_from(req).await,
},
SqlRequest::ShowDatabases(req) => {
show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
SqlRequest::ShowTables(req) => {
show_tables(req, self.catalog_manager.clone(), query_ctx.clone())
.context(ExecuteSqlSnafu)
}
SqlRequest::FlushTable(req) => self.flush_table(req).await,
};
if let Err(e) = &result {
Expand Down
11 changes: 6 additions & 5 deletions src/datanode/src/sql/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ mod tests {
use common_base::readable_size::ReadableSize;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::Schema;
use query::parser::QueryLanguageParser;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::query_engine::SqlStatementExecutor;
use session::context::QueryContext;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
Expand Down Expand Up @@ -560,10 +561,10 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() };
let output = instance
.inner()
.execute_stmt(stmt, QueryContext::arc())
.execute_sql(stmt, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
Expand All @@ -577,10 +578,10 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() };
let output = instance
.inner()
.execute_stmt(stmt, QueryContext::arc())
.execute_sql(stmt, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
Expand Down
Loading

0 comments on commit eb31c0e

Please sign in to comment.