Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(planner): merge map operator #10195

Merged
merged 8 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/expression/src/deserializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ where
fn append_data_value(&mut self, value: Scalar, _format: &FormatSettings) -> Result<()> {
let v = value
.as_number()
.ok_or_else(|| ErrorCode::from("Unable to get number value"))?;
.ok_or_else(|| ErrorCode::from(format!("Unable to get number value {}", value)))?;
let num = T::try_downcast_scalar(v).unwrap();
self.builder.push(num);
Ok(())
Expand Down
40 changes: 21 additions & 19 deletions src/query/service/src/interpreters/interpreter_insert_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ pub fn skip_to_next_row<R: AsRef<[u8]>>(reader: &mut Cursor<R>, mut balance: i32

async fn fill_default_value(
binder: &mut ScalarBinder<'_>,
operators: &mut Vec<BlockOperator>,
map_exprs: &mut Vec<Expr>,
field: &DataField,
schema: &DataSchema,
) -> Result<()> {
Expand All @@ -796,30 +796,29 @@ async fn fill_default_value(
target_type: Box::new(field.data_type().clone()),
})
}

let expr = scalar
.as_expr_with_col_index()?
.project_column_ref(|index| schema.index_of(&index.to_string()).unwrap());
operators.push(BlockOperator::Map { expr });
map_exprs.push(expr);
} else {
// If field data type is nullable, then we'll fill it with null.
if field.data_type().is_nullable() {
operators.push(BlockOperator::Map {
expr: Expr::Constant {
span: None,
scalar: DataScalar::Null,
data_type: field.data_type().clone(),
},
});
let expr = Expr::Constant {
span: None,
scalar: DataScalar::Null,
data_type: field.data_type().clone(),
};
map_exprs.push(expr);
} else {
let data_type = field.data_type().clone();
let default_value = data_type.default_value();
operators.push(BlockOperator::Map {
expr: Expr::Constant {
span: None,
scalar: default_value,
data_type,
},
});
let expr = Expr::Constant {
span: None,
scalar: default_value,
data_type,
};
map_exprs.push(expr);
}
}
Ok(())
Expand All @@ -841,7 +840,6 @@ async fn exprs_to_scalar(
exprs
)));
}
let mut operators = Vec::with_capacity(schema_fields_len);
let mut scalar_binder = ScalarBinder::new(
bind_context,
ctx.clone(),
Expand All @@ -850,12 +848,13 @@ async fn exprs_to_scalar(
&[],
);

let mut map_exprs = Vec::with_capacity(exprs.len());
for (i, expr) in exprs.iter().enumerate() {
// `DEFAULT` in insert values will be parsed as `Expr::ColumnRef`.
if let AExpr::ColumnRef { column, .. } = expr {
if column.name.eq_ignore_ascii_case("default") {
let field = schema.field(i);
fill_default_value(&mut scalar_binder, &mut operators, field, schema).await?;
fill_default_value(&mut scalar_binder, &mut map_exprs, field, schema).await?;
continue;
}
}
Expand All @@ -873,9 +872,12 @@ async fn exprs_to_scalar(
let expr = scalar
.as_expr_with_col_index()?
.project_column_ref(|index| schema.index_of(&index.to_string()).unwrap());
operators.push(BlockOperator::Map { expr });
map_exprs.push(expr);
}

let mut operators = Vec::with_capacity(schema_fields_len);
operators.push(BlockOperator::Map { exprs: map_exprs });

let one_row_chunk = DataBlock::new(
vec![BlockEntry {
data_type: DataType::Number(NumberDataType::UInt8),
Expand Down
15 changes: 7 additions & 8 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,19 @@ impl PipelineBuilder {
fn build_eval_scalar(&mut self, eval_scalar: &EvalScalar) -> Result<()> {
self.build_pipeline(&eval_scalar.input)?;

let operators = eval_scalar
let exprs = eval_scalar
.exprs
.iter()
.map(|(scalar, _)| {
Ok(BlockOperator::Map {
expr: scalar.as_expr(&BUILTIN_FUNCTIONS),
})
})
.collect::<Result<Vec<_>>>()?;
.map(|(scalar, _)| scalar.as_expr(&BUILTIN_FUNCTIONS))
.collect::<Vec<_>>();

let op = BlockOperator::Map { exprs };

let func_ctx = self.ctx.get_function_context()?;

self.main_pipeline.add_transform(|input, output| {
let transform =
CompoundBlockOperator::create(input, output, func_ctx, operators.clone());
CompoundBlockOperator::create(input, output, func_ctx, vec![op.clone()]);

if self.enable_profiling {
Ok(ProcessorPtr::create(ProfileWrapper::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ where Self: Transform
mut const_values: Vec<DataScalar>,
) -> Result<ProcessorPtr> {
let fields = output_schema.fields();
let mut ops = Vec::with_capacity(fields.len());
let mut exprs = Vec::with_capacity(fields.len());

for f in fields.iter() {
let expr = if !input_schema.has_field(f.name()) {
Expr::Constant {
Expand All @@ -70,13 +71,13 @@ where Self: Transform
display_name: field.name().clone(),
}
};
ops.push(BlockOperator::Map { expr });
exprs.push(expr);
}

let func_ctx = ctx.get_function_context()?;
let expression_transform = CompoundBlockOperator {
ctx: func_ctx,
operators: ops,
operators: vec![BlockOperator::Map { exprs }],
};

Ok(ProcessorPtr::create(Transformer::create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where Self: Transform
.map(DataField::from)
.collect::<Vec<_>>();

let mut ops = Vec::with_capacity(fields.len());
let mut exprs = Vec::with_capacity(fields.len());
for f in fields.iter() {
let expr = if !input_schema.has_field(f.name()) {
if let Some(default_expr) = f.default_expr() {
Expand Down Expand Up @@ -87,13 +87,13 @@ where Self: Transform
display_name: field.name().clone(),
}
};
ops.push(BlockOperator::Map { expr });
exprs.push(expr);
}

let func_ctx = ctx.get_function_context()?;
let expression_transform = CompoundBlockOperator {
ctx: func_ctx,
operators: ops,
operators: vec![BlockOperator::Map { exprs }],
};

Ok(ProcessorPtr::create(Transformer::create(
Expand Down
20 changes: 4 additions & 16 deletions src/query/service/src/table_functions/table_function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,6 @@ where
}
}

fn create_disabled_table_function(
_database_name: &str,
table_func_name: &str,
_table_id: u64,
_table_args: TableArgs,
) -> Result<Arc<dyn TableFunction>> {
Err(ErrorCode::UnknownFunction(format!(
"table function `{}` cannot be called",
table_func_name
)))
}

#[derive(Default)]
pub struct TableFunctionFactory {
creators: TableFunctionCreators,
Expand Down Expand Up @@ -148,10 +136,10 @@ impl TableFunctionFactory {
(next_id(), Arc::new(InferSchemaTable::create)),
);

creators.insert(
"read_parquet".to_string(),
(next_id(), Arc::new(create_disabled_table_function)),
);
// creators.insert(
// "read_parquet".to_string(),
// (next_id(), Arc::new(create_disabled_table_function)),
// );

TableFunctionFactory {
creators: RwLock::new(creators),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> {
};
let expr = check(&expr, &BUILTIN_FUNCTIONS).unwrap();

let operators = vec![BlockOperator::Map { expr }];
let operators = vec![BlockOperator::Map { exprs: vec![expr] }];

let stats_gen = ClusterStatsGenerator::new(
0,
Expand Down
44 changes: 29 additions & 15 deletions src/query/sql/src/evaluator/block_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use common_pipeline_transforms::processors::transforms::Transformer;
/// `BlockOperator` takes a `DataBlock` as input and produces a `DataBlock` as output.
#[derive(Clone)]
pub enum BlockOperator {
/// Evaluate expression and append result column to the end.
Map { expr: Expr },
/// Batch mode of map which merges map operators into one.
Map { exprs: Vec<Expr> },

/// Filter the input `DataBlock` with the predicate `eval`.
Filter { expr: Expr },
Expand All @@ -47,14 +47,16 @@ pub enum BlockOperator {
impl BlockOperator {
pub fn execute(&self, func_ctx: &FunctionContext, mut input: DataBlock) -> Result<DataBlock> {
match self {
BlockOperator::Map { expr } => {
let evaluator = Evaluator::new(&input, *func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(expr)?;
let col = BlockEntry {
data_type: expr.data_type().clone(),
value: result,
};
input.add_column(col);
BlockOperator::Map { exprs } => {
for expr in exprs {
let evaluator = Evaluator::new(&input, *func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(expr)?;
let col = BlockEntry {
data_type: expr.data_type().clone(),
value: result,
};
input.add_column(col);
}
Ok(input)
}

Expand Down Expand Up @@ -90,14 +92,26 @@ impl CompoundBlockOperator {
ctx: FunctionContext,
operators: Vec<BlockOperator>,
) -> Box<dyn Processor> {
let operators = Self::compact_map(operators);
Transformer::<Self>::create(input_port, output_port, Self { operators, ctx })
}

#[allow(dead_code)]
pub fn append(self, operator: BlockOperator) -> Self {
let mut result = self;
result.operators.push(operator);
result
pub fn compact_map(operators: Vec<BlockOperator>) -> Vec<BlockOperator> {
let mut results = Vec::with_capacity(operators.len());

for op in operators {
match op {
BlockOperator::Map { exprs } => {
if let Some(BlockOperator::Map { exprs: pre_exprs }) = results.last_mut() {
pre_exprs.extend(exprs);
} else {
results.push(BlockOperator::Map { exprs });
}
}
_ => results.push(op),
}
}
results
}

#[allow(dead_code)]
Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl<'a> Binder {
};
let filter_expr = SExpr::create_unary(filter.into(), table_expr);
let mut rewriter = SubqueryRewriter::new(self.metadata.clone());
let rewrote_expr = rewriter.rewrite(&filter_expr)?;
(None, Some(rewrote_expr))
let filter_expr = rewriter.rewrite(&filter_expr)?;
(None, Some(filter_expr))
} else {
(Some(scalar), None)
}
Expand Down
4 changes: 1 addition & 3 deletions src/query/sql/src/planner/optimizer/heuristic/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ use crate::MetadataRef;
/// More information can be found in the paper: Unnesting Arbitrary Queries
pub fn decorrelate_subquery(metadata: MetadataRef, s_expr: SExpr) -> Result<SExpr> {
let mut rewriter = SubqueryRewriter::new(metadata);
let hoisted = rewriter.rewrite(&s_expr)?;

Ok(hoisted)
rewriter.rewrite(&s_expr)
}

impl SubqueryRewriter {
Expand Down
6 changes: 6 additions & 0 deletions src/query/sql/src/planner/optimizer/s_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ impl SExpr {
.ok_or_else(|| ErrorCode::Internal(format!("Invalid children index: {}", n)))
}

pub fn child_mut(&mut self, n: usize) -> Result<&mut SExpr> {
self.children
.get_mut(n)
.ok_or_else(|| ErrorCode::Internal(format!("Invalid children index: {}", n)))
}

pub fn arity(&self) -> usize {
self.children.len()
}
Expand Down
12 changes: 6 additions & 6 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ use common_ast::parser::token::TokenKind;
use common_ast::parser::token::Tokenizer;
use common_ast::walk_statement_mut;
use common_ast::Backtrace;
use common_ast::Dialect;
use common_catalog::catalog::CatalogManager;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use parking_lot::RwLock;

use super::semantic::AggregateRewriter;
use super::semantic::DistinctToGroupBy;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
Expand Down Expand Up @@ -82,7 +84,7 @@ impl Planner {
// Step 2: Parse the SQL.
let backtrace = Backtrace::new();
let (mut stmt, format) = parse_sql(&tokens, sql_dialect, &backtrace)?;
self.replace_stmt(&mut stmt);
self.replace_stmt(&mut stmt, sql_dialect);

// Step 3: Bind AST with catalog, and generate a pure logical SExpr
let metadata = Arc::new(RwLock::new(Metadata::default()));
Expand Down Expand Up @@ -148,11 +150,9 @@ impl Planner {
}
}

fn replace_stmt(&self, stmt: &mut Statement) {
let mut visitors = vec![DistinctToGroupBy::default()];
for v in visitors.iter_mut() {
walk_statement_mut(v, stmt)
}
fn replace_stmt(&self, stmt: &mut Statement, sql_dialect: Dialect) {
walk_statement_mut(&mut DistinctToGroupBy::default(), stmt);
walk_statement_mut(&mut AggregateRewriter { sql_dialect }, stmt);

self.add_max_rows_limit(stmt);
}
Expand Down
Loading