diff --git a/Cargo.lock b/Cargo.lock index 879415cf80ec..507ce40fac57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4556,7 +4556,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7c01e4a8e64580707438dabc5cf7f4e2584c28b6#7c01e4a8e64580707438dabc5cf7f4e2584c28b6" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 193fdba98e48..e2640311286a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7c01e4a8e64580707438dabc5cf7f4e2584c28b6" } hex = "0.4" http = "0.2" humantime = "2.1" diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 4c4ad0905bb3..724fdaa5a342 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -60,6 +60,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result< column_schema: schema, is_key: column_def.semantic_type == SemanticType::Tag as i32, location: parse_location(ac.location)?, + add_if_not_exists: ac.add_if_not_exists, }) }) .collect::>>()?; @@ -220,6 +221,7 @@ mod tests { ..Default::default() }), location: None, + add_if_not_exists: true, }], })), }; @@ -240,6 +242,7 @@ mod tests { add_column.column_schema.data_type ); assert_eq!(None, add_column.location); + assert!(add_column.add_if_not_exists); } #[test] @@ -265,6 +268,7 @@ mod tests { location_type: LocationType::First.into(), after_column_name: String::default(), }), + add_if_not_exists: false, }, AddColumn { column_def: Some(ColumnDef { @@ -280,6 +284,7 @@ mod tests { location_type: LocationType::After.into(), after_column_name: "ts".to_string(), }), + add_if_not_exists: true, }, ], })), @@ -308,6 +313,7 @@ mod tests { }), add_column.location ); + assert!(add_column.add_if_not_exists); let add_column = add_columns.pop().unwrap(); assert!(!add_column.is_key); @@ -317,6 +323,7 @@ mod tests { add_column.column_schema.data_type ); assert_eq!(Some(AddColumnLocation::First), add_column.location); + assert!(!add_column.add_if_not_exists); } #[test] diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 56ed7e5bf02f..9ce9ff29ccff 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -299,6 +299,7 @@ mod tests { .unwrap() ) ); + assert!(host_column.add_if_not_exists); let memory_column = &add_columns.add_columns[1]; assert_eq!( @@ -311,6 +312,7 @@ mod tests { .unwrap() ) ); + assert!(host_column.add_if_not_exists); let time_column = &add_columns.add_columns[2]; assert_eq!( @@ -323,6 +325,7 @@ mod tests { .unwrap() ) ); + assert!(host_column.add_if_not_exists); let interval_column = &add_columns.add_columns[3]; assert_eq!( @@ -335,6 +338,7 @@ mod tests { .unwrap() ) ); + assert!(host_column.add_if_not_exists); let decimal_column = &add_columns.add_columns[4]; assert_eq!( @@ -352,6 +356,7 @@ mod tests { .unwrap() ) ); + assert!(host_column.add_if_not_exists); } #[test] diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 725adf82a1c6..f9b5b8964400 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -192,6 +192,9 @@ pub fn build_create_table_expr( Ok(expr) } +/// Find columns that are not present in the schema and return them as `AddColumns` +/// for adding columns automatically. +/// It always sets `add_if_not_exists` to `true` for now. pub fn extract_new_columns( schema: &Schema, column_exprs: Vec, @@ -213,6 +216,7 @@ pub fn extract_new_columns( AddColumn { column_def, location: None, + add_if_not_exists: true, } }) .collect::>(); diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index 7338968153c9..c05777bcc657 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure { .context(ConvertAlterTableRequestSnafu)?; let new_meta = table_info .meta - .builder_with_alter_kind(table_ref.table, &request.alter_kind, true) + .builder_with_alter_kind(table_ref.table, &request.alter_kind) .context(error::TableSnafu)? .build() .with_context(|_| error::BuildTableMetaSnafu { diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index e745af900f24..55ecdba54549 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey, }; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, error, info}; use futures::future; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::storage::RegionId; use strum::AsRefStr; -use table::metadata::{RawTableInfo, TableId}; +use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; @@ -51,10 +51,14 @@ use crate::{metrics, ClusterId}; /// The alter table procedure pub struct AlterTableProcedure { - // The runtime context. + /// The runtime context. context: DdlContext, - // The serialized data. + /// The serialized data. data: AlterTableData, + /// Cached new table metadata in the prepare step. + /// If we recover the procedure from json, then the table info value is not cached. + /// But we already validated it in the prepare step. + new_table_info: Option, } impl AlterTableProcedure { @@ -70,18 +74,31 @@ impl AlterTableProcedure { Ok(Self { context, data: AlterTableData::new(task, table_id, cluster_id), + new_table_info: None, }) } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(AlterTableProcedure { context, data }) + Ok(AlterTableProcedure { + context, + data, + new_table_info: None, + }) } // Checks whether the table exists. pub(crate) async fn on_prepare(&mut self) -> Result { self.check_alter().await?; self.fill_table_info().await?; + + // Validates the request and builds the new table info. + // We need to build the new table info here because we should ensure the alteration + // is valid in `UpdateMeta` state as we already altered the region. + // Safety: `fill_table_info()` already set it. + let table_info_value = self.data.table_info_value.as_ref().unwrap(); + self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?); + // Safety: Checked in `AlterTableProcedure::new`. let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); if matches!(alter_kind, Kind::RenameTable { .. }) { @@ -106,6 +123,14 @@ impl AlterTableProcedure { let leaders = find_leaders(&physical_table_route.region_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); + let alter_kind = self.make_region_alter_kind()?; + + info!( + "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}", + self.data.table_ref(), + table_id, + alter_kind, + ); for datanode in leaders { let requester = self.context.node_manager.datanode(&datanode).await; @@ -113,7 +138,7 @@ impl AlterTableProcedure { for region in regions { let region_id = RegionId::new(table_id, region); - let request = self.make_alter_region_request(region_id)?; + let request = self.make_alter_region_request(region_id, alter_kind.clone())?; debug!("Submitting {request:?} to {datanode}"); let datanode = datanode.clone(); @@ -150,7 +175,15 @@ impl AlterTableProcedure { let table_ref = self.data.table_ref(); // Safety: checked before. let table_info_value = self.data.table_info_value.as_ref().unwrap(); - let new_info = self.build_new_table_info(&table_info_value.table_info)?; + // Gets the table info from the cache or builds it. + let new_info = match &self.new_table_info { + Some(cached) => cached.clone(), + None => self.build_new_table_info(&table_info_value.table_info) + .inspect_err(|e| { + // We already check the table info in the prepare step so this should not happen. + error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id); + })?, + }; debug!( "Starting update table: {} metadata, new table info {:?}", @@ -174,7 +207,7 @@ impl AlterTableProcedure { .await?; } - info!("Updated table metadata for table {table_ref}, table_id: {table_id}"); + info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}"); self.data.state = AlterTableState::InvalidateTableCache; Ok(Status::executing(true)) } diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index fb700335aaf8..7ac1ae71e5da 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use api::v1::alter_table_expr::Kind; use api::v1::region::region_request::Body; use api::v1::region::{ @@ -27,13 +29,15 @@ use crate::ddl::alter_table::AlterTableProcedure; use crate::error::{InvalidProtoMsgSnafu, Result}; impl AlterTableProcedure { - /// Makes alter region request. - pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result { - // Safety: Checked in `AlterTableProcedure::new`. - let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); + /// Makes alter region request from existing an alter kind. + /// Region alter request always add columns if not exist. + pub(crate) fn make_alter_region_request( + &self, + region_id: RegionId, + kind: Option, + ) -> Result { // Safety: checked let table_info = self.data.table_info().unwrap(); - let kind = create_proto_alter_kind(table_info, alter_kind)?; Ok(RegionRequest { header: Some(RegionRequestHeader { @@ -47,45 +51,66 @@ impl AlterTableProcedure { })), }) } + + /// Makes alter kind proto that all regions can reuse. + /// Region alter request always add columns if not exist. + pub(crate) fn make_region_alter_kind(&self) -> Result> { + // Safety: Checked in `AlterTableProcedure::new`. + let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); + // Safety: checked + let table_info = self.data.table_info().unwrap(); + let kind = create_proto_alter_kind(table_info, alter_kind)?; + + Ok(kind) + } } /// Creates region proto alter kind from `table_info` and `alter_kind`. /// -/// Returns the kind and next column id if it adds new columns. +/// It always adds column if not exists and drops column if exists. +/// It skips the column if it already exists in the table. fn create_proto_alter_kind( table_info: &RawTableInfo, alter_kind: &Kind, ) -> Result> { match alter_kind { Kind::AddColumns(x) => { + // Construct a set of existing columns in the table. + let existing_columns: HashSet<_> = table_info + .meta + .schema + .column_schemas + .iter() + .map(|col| &col.name) + .collect(); let mut next_column_id = table_info.meta.next_column_id; - let add_columns = x - .add_columns - .iter() - .map(|add_column| { - let column_def = - add_column - .column_def - .as_ref() - .context(InvalidProtoMsgSnafu { - err_msg: "'column_def' is absent", - })?; + let mut add_columns = Vec::with_capacity(x.add_columns.len()); + for add_column in &x.add_columns { + let column_def = add_column + .column_def + .as_ref() + .context(InvalidProtoMsgSnafu { + err_msg: "'column_def' is absent", + })?; - let column_id = next_column_id; - next_column_id += 1; + // Skips existing columns. + if existing_columns.contains(&column_def.name) { + continue; + } - let column_def = RegionColumnDef { - column_def: Some(column_def.clone()), - column_id, - }; + let column_id = next_column_id; + next_column_id += 1; + let column_def = RegionColumnDef { + column_def: Some(column_def.clone()), + column_id, + }; - Ok(AddColumn { - column_def: Some(column_def), - location: add_column.location.clone(), - }) - }) - .collect::>>()?; + add_columns.push(AddColumn { + column_def: Some(column_def), + location: add_column.location.clone(), + }); + } Ok(Some(alter_request::Kind::AddColumns(AddColumns { add_columns, @@ -143,6 +168,7 @@ mod tests { use crate::rpc::router::{Region, RegionRoute}; use crate::test_util::{new_ddl_context, MockDatanodeManager}; + /// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`. async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) { let datanode_manager = Arc::new(MockDatanodeManager::new(())); let ddl_context = new_ddl_context(datanode_manager); @@ -171,6 +197,7 @@ mod tests { .name("cpu") .data_type(ColumnDataType::Float64) .semantic_type(SemanticType::Field) + .is_nullable(true) .build() .unwrap() .into(), @@ -225,15 +252,16 @@ mod tests { name: "my_tag3".to_string(), data_type: ColumnDataType::String as i32, is_nullable: true, - default_constraint: b"hello".to_vec(), + default_constraint: Vec::new(), semantic_type: SemanticType::Tag as i32, comment: String::new(), ..Default::default() }), location: Some(AddColumnLocation { location_type: LocationType::After as i32, - after_column_name: "my_tag2".to_string(), + after_column_name: "host".to_string(), }), + add_if_not_exists: false, }], })), }, @@ -242,8 +270,11 @@ mod tests { let mut procedure = AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); - let Some(Body::Alter(alter_region_request)) = - procedure.make_alter_region_request(region_id).unwrap().body + let alter_kind = procedure.make_region_alter_kind().unwrap(); + let Some(Body::Alter(alter_region_request)) = procedure + .make_alter_region_request(region_id, alter_kind) + .unwrap() + .body else { unreachable!() }; @@ -259,7 +290,7 @@ mod tests { name: "my_tag3".to_string(), data_type: ColumnDataType::String as i32, is_nullable: true, - default_constraint: b"hello".to_vec(), + default_constraint: Vec::new(), semantic_type: SemanticType::Tag as i32, comment: String::new(), ..Default::default() @@ -268,7 +299,7 @@ mod tests { }), location: Some(AddColumnLocation { location_type: LocationType::After as i32, - after_column_name: "my_tag2".to_string(), + after_column_name: "host".to_string(), }), }] } @@ -299,8 +330,11 @@ mod tests { let mut procedure = AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); - let Some(Body::Alter(alter_region_request)) = - procedure.make_alter_region_request(region_id).unwrap().body + let alter_kind = procedure.make_region_alter_kind().unwrap(); + let Some(Body::Alter(alter_region_request)) = procedure + .make_alter_region_request(region_id, alter_kind) + .unwrap() + .body else { unreachable!() }; diff --git a/src/common/meta/src/ddl/alter_table/update_metadata.rs b/src/common/meta/src/ddl/alter_table/update_metadata.rs index de72b7977aa2..3c547b884cb7 100644 --- a/src/common/meta/src/ddl/alter_table/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_table/update_metadata.rs @@ -23,7 +23,9 @@ use crate::key::table_info::TableInfoValue; use crate::key::{DeserializedValueWithBytes, RegionDistribution}; impl AlterTableProcedure { - /// Builds new_meta + /// Builds new table info after alteration. + /// It bumps the column id of the table by the number of the add column requests. + /// So there may be holes in the column id sequence. pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result { let table_info = TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?; @@ -34,7 +36,7 @@ impl AlterTableProcedure { let new_meta = table_info .meta - .builder_with_alter_kind(table_ref.table, &request.alter_kind, false) + .builder_with_alter_kind(table_ref.table, &request.alter_kind) .context(error::TableSnafu)? .build() .with_context(|_| error::BuildTableMetaSnafu { @@ -46,6 +48,9 @@ impl AlterTableProcedure { new_info.ident.version = table_info.ident.version + 1; match request.alter_kind { AlterKind::AddColumns { columns } => { + // Bumps the column id for the new columns. + // It may bump more than the actual number of columns added if there are + // existing columns, but it's fine. new_info.meta.next_column_id += columns.len() as u32; } AlterKind::RenameTable { new_table_name } => { diff --git a/src/common/meta/src/ddl/test_util/alter_table.rs b/src/common/meta/src/ddl/test_util/alter_table.rs index 0274256d2da6..9813da4f2b1d 100644 --- a/src/common/meta/src/ddl/test_util/alter_table.rs +++ b/src/common/meta/src/ddl/test_util/alter_table.rs @@ -30,6 +30,8 @@ pub struct TestAlterTableExpr { add_columns: Vec, #[builder(setter(into, strip_option))] new_table_name: Option, + #[builder(setter)] + add_if_not_exists: bool, } impl From for AlterTableExpr { @@ -53,6 +55,7 @@ impl From for AlterTableExpr { .map(|col| AddColumn { column_def: Some(col), location: None, + add_if_not_exists: value.add_if_not_exists, }) .collect(), })), diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 41de5ef4b10b..03348c393052 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -56,6 +56,7 @@ fn make_alter_logical_table_add_column_task( let alter_table = alter_table .table_name(table.to_string()) .add_columns(add_columns) + .add_if_not_exists(true) .build() .unwrap(); diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index b065f56d4529..18294efe00fe 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -139,7 +139,7 @@ async fn test_on_submit_alter_request() { table_name: table_name.to_string(), kind: Some(Kind::DropColumns(DropColumns { drop_columns: vec![DropColumn { - name: "my_field_column".to_string(), + name: "cpu".to_string(), }], })), }, @@ -225,7 +225,7 @@ async fn test_on_submit_alter_request_with_outdated_request() { table_name: table_name.to_string(), kind: Some(Kind::DropColumns(DropColumns { drop_columns: vec![DropColumn { - name: "my_field_column".to_string(), + name: "cpu".to_string(), }], })), }, @@ -330,6 +330,7 @@ async fn test_on_update_metadata_add_columns() { ..Default::default() }), location: None, + add_if_not_exists: false, }], })), }, diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 10d87e2940c2..6554ff333f0d 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -145,10 +145,8 @@ impl RegionWorkerLoop { } info!( - "Try to alter region {} from version {} to {}", - region_id, - version.metadata.schema_version, - region.metadata().schema_version + "Try to alter region {}, version.metadata: {:?}, request: {:?}", + region_id, version.metadata, request, ); self.handle_alter_region_metadata(region, version, request, sender); } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 5a21c0de15bd..137a0a8d10fc 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -101,10 +101,10 @@ impl RegionWorkerLoop { .version_control .alter_schema(change_result.new_meta, ®ion.memtable_builder); + let version = region.version(); info!( - "Region {} is altered, schema version is {}", - region.region_id, - region.metadata().schema_version + "Region {} is altered, metadata is {:?}, options: {:?}", + region.region_id, version.metadata, version.options, ); } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index bc50eff161b5..18f1ddc3075f 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -477,6 +477,7 @@ pub fn column_schemas_to_defs( .collect() } +/// Converts a SQL alter table statement into a gRPC alter table expression. pub(crate) fn to_alter_table_expr( alter_table: AlterTable, query_ctx: &QueryContextRef, @@ -504,6 +505,8 @@ pub(crate) fn to_alter_table_expr( .context(ExternalSnafu)?, ), location: location.as_ref().map(From::from), + // TODO(yingwen): We don't support `IF NOT EXISTS` for `ADD COLUMN` yet. + add_if_not_exists: false, }], }), AlterTableOperation::ModifyColumnType { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 6b7702f25b8b..2dc5d98e41b2 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -741,6 +741,8 @@ impl Inserter { Ok(create_table_expr) } + /// Returns an alter table expression if it finds new columns in the request. + /// It always adds columns if not exist. fn get_alter_table_expr_on_demand( &self, req: &RowInsertRequest, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index eba88ee44d8a..a6f2e6c1c2d5 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -911,7 +911,7 @@ impl StatementExecutor { let _ = table_info .meta - .builder_with_alter_kind(table_name, &request.alter_kind, false) + .builder_with_alter_kind(table_name, &request.alter_kind) .context(error::TableSnafu)? .build() .context(error::BuildTableMetaSnafu { table_name })?; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index ee9ebf483d98..b9b9835f64ae 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -597,7 +597,8 @@ pub struct AddColumn { impl AddColumn { /// Returns an error if the column to add is invalid. /// - /// It allows adding existing columns. + /// It allows adding existing columns. However, the existing column must have the same metadata + /// and the location must be None. pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { ensure!( self.column_metadata.column_schema.is_nullable() @@ -615,6 +616,46 @@ impl AddColumn { } ); + if let Some(existing_column) = + metadata.column_by_name(&self.column_metadata.column_schema.name) + { + // If the column already exists. + ensure!( + *existing_column == self.column_metadata, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column {} already exists with different metadata, existing: {:?}, got: {:?}", + self.column_metadata.column_schema.name, existing_column, self.column_metadata, + ), + } + ); + ensure!( + self.location.is_none(), + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column {} already exists, but location is specified", + self.column_metadata.column_schema.name + ), + } + ); + } + + if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) { + // Ensures the existing column has the same name. + ensure!( + existing_column.column_schema.name == self.column_metadata.column_schema.name, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column id {} already exists with different name {}", + self.column_metadata.column_id, existing_column.column_schema.name + ), + } + ); + } + Ok(()) } @@ -1008,6 +1049,8 @@ mod tests { ); } + /// Returns a new region metadata for testing. Metadata: + /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]` fn new_metadata() -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); builder @@ -1062,7 +1105,7 @@ mod tests { true, ), semantic_type: SemanticType::Tag, - column_id: 4, + column_id: 5, }, location: None, }; @@ -1078,7 +1121,7 @@ mod tests { false, ), semantic_type: SemanticType::Tag, - column_id: 4, + column_id: 5, }, location: None, } @@ -1094,7 +1137,7 @@ mod tests { true, ), semantic_type: SemanticType::Tag, - column_id: 4, + column_id: 2, }, location: None, }; @@ -1114,7 +1157,7 @@ mod tests { true, ), semantic_type: SemanticType::Tag, - column_id: 4, + column_id: 5, }, location: None, }, @@ -1126,7 +1169,7 @@ mod tests { true, ), semantic_type: SemanticType::Field, - column_id: 5, + column_id: 6, }, location: None, }, @@ -1137,6 +1180,82 @@ mod tests { assert!(kind.need_alter(&metadata)); } + #[test] + fn test_add_existing_column_different_metadata() { + let metadata = new_metadata(); + + // Add existing column with different id. + let kind = AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 4, + }, + location: None, + }], + }; + kind.validate(&metadata).unwrap_err(); + + // Add existing column with different type. + let kind = AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }, + location: None, + }], + }; + kind.validate(&metadata).unwrap_err(); + + // Add existing column with different name. + let kind = AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }, + location: None, + }], + }; + kind.validate(&metadata).unwrap_err(); + } + + #[test] + fn test_add_existing_column_with_location() { + let metadata = new_metadata(); + let kind = AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }, + location: Some(AddColumnLocation::First), + }], + }; + kind.validate(&metadata).unwrap_err(); + } + #[test] fn test_validate_drop_column() { let metadata = new_metadata(); @@ -1235,19 +1354,19 @@ mod tests { true, ), semantic_type: SemanticType::Tag, - column_id: 4, + column_id: 5, }, location: None, }, AddColumn { column_metadata: ColumnMetadata { column_schema: ColumnSchema::new( - "field_1", + "field_2", ConcreteDataType::string_datatype(), true, ), semantic_type: SemanticType::Field, - column_id: 5, + column_id: 6, }, location: None, }, diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 6dfc47314a36..29abe7144ee6 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -194,12 +194,9 @@ impl TableMeta { &self, table_name: &str, alter_kind: &AlterKind, - add_if_not_exists: bool, ) -> Result { match alter_kind { - AlterKind::AddColumns { columns } => { - self.add_columns(table_name, columns, add_if_not_exists) - } + AlterKind::AddColumns { columns } => self.add_columns(table_name, columns), AlterKind::DropColumns { names } => self.remove_columns(table_name, names), AlterKind::ModifyColumnTypes { columns } => { self.modify_column_types(table_name, columns) @@ -340,6 +337,7 @@ impl TableMeta { Ok(meta_builder) } + // TODO(yingwen): Remove this. /// Allocate a new column for the table. /// /// This method would bump the `next_column_id` of the meta. @@ -384,11 +382,11 @@ impl TableMeta { builder } + // TODO(yingwen): Tests add if not exists. fn add_columns( &self, table_name: &str, requests: &[AddColumnRequest], - add_if_not_exists: bool, ) -> Result { let table_schema = &self.schema; let mut meta_builder = self.new_meta_builder(); @@ -396,63 +394,61 @@ impl TableMeta { self.primary_key_indices.iter().collect(); let mut names = HashSet::with_capacity(requests.len()); - let mut new_requests = Vec::with_capacity(requests.len()); - let requests = if add_if_not_exists { - for col_to_add in requests { - if let Some(column_schema) = - table_schema.column_schema_by_name(&col_to_add.column_schema.name) - { - // If the column already exists, we should check if the type is the same. - ensure!( - column_schema.data_type == col_to_add.column_schema.data_type, - error::InvalidAlterRequestSnafu { - table: table_name, - err: format!( - "column {} already exists with different type", - col_to_add.column_schema.name - ), - } - ); - } else { - new_requests.push(col_to_add.clone()); - } - } - &new_requests[..] - } else { - requests - }; + let mut new_columns = Vec::with_capacity(requests.len()); for col_to_add in requests { - ensure!( - names.insert(&col_to_add.column_schema.name), - error::InvalidAlterRequestSnafu { - table: table_name, - err: format!( - "add column {} more than once", - col_to_add.column_schema.name - ), - } - ); + if let Some(column_schema) = + table_schema.column_schema_by_name(&col_to_add.column_schema.name) + { + // If the column already exists. + ensure!( + col_to_add.add_if_not_exists, + error::ColumnExistsSnafu { + table_name, + column_name: &col_to_add.column_schema.name + }, + ); - ensure!( - !table_schema.contains_column(&col_to_add.column_schema.name), - error::ColumnExistsSnafu { - table_name, - column_name: col_to_add.column_schema.name.to_string() - }, - ); + // Checks if the type is the same + ensure!( + column_schema.data_type == col_to_add.column_schema.data_type, + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "column {} already exists with different type {:?}", + col_to_add.column_schema.name, column_schema.data_type, + ), + } + ); + } else { + // A new column. + // Ensures we only add a column once. + ensure!( + names.insert(&col_to_add.column_schema.name), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "add column {} more than once", + col_to_add.column_schema.name + ), + } + ); - ensure!( - col_to_add.column_schema.is_nullable() - || col_to_add.column_schema.default_constraint().is_some(), - error::InvalidAlterRequestSnafu { - table: table_name, - err: format!( - "no default value for column {}", - col_to_add.column_schema.name - ), - }, - ); + ensure!( + col_to_add.column_schema.is_nullable() + || col_to_add.column_schema.default_constraint().is_some(), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "no default value for column {}", + col_to_add.column_schema.name + ), + }, + ); + + new_columns.push(col_to_add.clone()); + } } + let requests = &new_columns[..]; let SplitResult { columns_at_first, @@ -881,6 +877,7 @@ pub struct RawTableMeta { pub value_indices: Vec, /// Engine type of this table. Usually in small case. pub engine: String, + /// Next column id of a new column. /// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982 pub next_column_id: ColumnId, pub region_numbers: Vec, @@ -1078,6 +1075,7 @@ mod tests { use super::*; + /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`. fn new_test_schema() -> Schema { let column_schemas = vec![ ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), @@ -1129,17 +1127,19 @@ mod tests { column_schema: new_tag, is_key: true, location: None, + add_if_not_exists: false, }, AddColumnRequest { column_schema: new_field, is_key: false, location: None, + add_if_not_exists: false, }, ], }; let builder = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap(); builder.build().unwrap() } @@ -1157,6 +1157,7 @@ mod tests { column_schema: new_tag, is_key: true, location: Some(AddColumnLocation::First), + add_if_not_exists: false, }, AddColumnRequest { column_schema: new_field, @@ -1164,12 +1165,13 @@ mod tests { location: Some(AddColumnLocation::After { column_name: "ts".to_string(), }), + add_if_not_exists: false, }, ], }; let builder = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap(); builder.build().unwrap() } @@ -1199,6 +1201,48 @@ mod tests { assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]); } + #[test] + fn test_add_columns_multiple_times() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + let alter_kind = AlterKind::AddColumns { + columns: vec![ + AddColumnRequest { + column_schema: ColumnSchema::new( + "col3", + ConcreteDataType::int32_datatype(), + true, + ), + is_key: true, + location: None, + add_if_not_exists: true, + }, + AddColumnRequest { + column_schema: ColumnSchema::new( + "col3", + ConcreteDataType::int32_datatype(), + true, + ), + is_key: true, + location: None, + add_if_not_exists: true, + }, + ], + }; + let err = meta + .builder_with_alter_kind("my_table", &alter_kind) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_remove_columns() { let schema = Arc::new(new_test_schema()); @@ -1216,7 +1260,7 @@ mod tests { names: vec![String::from("col2"), String::from("my_field")], }; let new_meta = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap() .build() .unwrap(); @@ -1271,7 +1315,7 @@ mod tests { names: vec![String::from("col3"), String::from("col1")], }; let new_meta = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap() .build() .unwrap(); @@ -1307,14 +1351,62 @@ mod tests { column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true), is_key: false, location: None, + add_if_not_exists: false, }], }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::TableColumnExists, err.status_code()); + + // Add if not exists + let alter_kind = AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + is_key: true, + location: None, + add_if_not_exists: true, + }], + }; + let new_meta = meta + .builder_with_alter_kind("my_table", &alter_kind) + .unwrap() + .build() + .unwrap(); + assert_eq!( + meta.schema.column_schemas(), + new_meta.schema.column_schemas() + ); + assert_eq!(meta.schema.version() + 1, new_meta.schema.version()); + } + + #[test] + fn test_add_different_type_column() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + // Add if not exists, but different type. + let alter_kind = AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true), + is_key: false, + location: None, + add_if_not_exists: true, + }], + }; + let err = meta + .builder_with_alter_kind("my_table", &alter_kind) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); } #[test] @@ -1328,6 +1420,7 @@ mod tests { .build() .unwrap(); + // Not nullable and no default value. let alter_kind = AlterKind::AddColumns { columns: vec![AddColumnRequest { column_schema: ColumnSchema::new( @@ -1337,11 +1430,12 @@ mod tests { ), is_key: false, location: None, + add_if_not_exists: false, }], }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -1363,7 +1457,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); @@ -1388,7 +1482,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); @@ -1411,7 +1505,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -1422,7 +1516,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -1448,7 +1542,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -1462,7 +1556,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -1531,7 +1625,7 @@ mod tests { options: FulltextOptions::default(), }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .err() .unwrap(); assert_eq!( @@ -1552,7 +1646,7 @@ mod tests { }, }; let new_meta = new_meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap() .build() .unwrap(); @@ -1572,7 +1666,7 @@ mod tests { column_name: "my_tag_first".to_string(), }; let new_meta = new_meta - .builder_with_alter_kind("my_table", &alter_kind, false) + .builder_with_alter_kind("my_table", &alter_kind) .unwrap() .build() .unwrap(); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 74554631c62d..e7de2eb9d6cb 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -185,6 +185,8 @@ pub struct AddColumnRequest { pub column_schema: ColumnSchema, pub is_key: bool, pub location: Option, + /// Add column if not exists. + pub add_if_not_exists: bool, } /// Change column datatype request diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 1fd5e4239dd6..e27397ffea36 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -66,12 +66,190 @@ mod test { test_handle_ddl_request(instance.as_ref()).await; } + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_handle_multi_ddl_request() { + common_telemetry::init_default_ut_logging(); + let instance = + tests::create_distributed_instance("test_distributed_handle_multi_ddl_request").await; + + test_handle_multi_ddl_request(instance.frontend().as_ref()).await; + + verify_table_is_dropped(&instance).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_handle_multi_ddl_request() { + let standalone = + GreptimeDbStandaloneBuilder::new("test_standalone_handle_multi_ddl_request") + .build() + .await; + let instance = &standalone.instance; + + test_handle_multi_ddl_request(instance.as_ref()).await; + } + async fn query(instance: &Instance, request: Request) -> Output { GrpcQueryHandler::do_query(instance, request, QueryContext::arc()) .await .unwrap() } + async fn test_handle_multi_ddl_request(instance: &Instance) { + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + create_if_not_exists: true, + options: Default::default(), + })), + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(1))); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + data_type: ColumnDataType::String as _, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ColumnDef { + name: "ts".to_string(), + data_type: ColumnDataType::TimestampMillisecond as _, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ], + time_index: "ts".to_string(), + engine: MITO_ENGINE.to_string(), + ..Default::default() + })), + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(0))); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::AlterTable(AlterTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + kind: Some(alter_table_expr::Kind::AddColumns(AddColumns { + add_columns: vec![ + AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + data_type: ColumnDataType::Int32 as _, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + ..Default::default() + }), + location: None, + add_if_not_exists: true, + }, + AddColumn { + column_def: Some(ColumnDef { + name: "a".to_string(), + data_type: ColumnDataType::String as _, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + ..Default::default() + }), + location: None, + add_if_not_exists: true, + }, + ], + })), + })), + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(0))); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::AlterTable(AlterTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + kind: Some(alter_table_expr::Kind::AddColumns(AddColumns { + add_columns: vec![ + AddColumn { + column_def: Some(ColumnDef { + name: "c".to_string(), + data_type: ColumnDataType::Int32 as _, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + ..Default::default() + }), + location: None, + add_if_not_exists: true, + }, + AddColumn { + column_def: Some(ColumnDef { + name: "d".to_string(), + data_type: ColumnDataType::Int32 as _, + is_nullable: true, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + ..Default::default() + }), + location: None, + add_if_not_exists: true, + }, + ], + })), + })), + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(0))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, c, d, ts) VALUES ('s', 1, 1, 1, 1672816466000)".to_string())) + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(1))); + + let request = Request::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc" + .to_string(), + )), + }); + let output = query(instance, request).await; + let OutputData::Stream(stream) = output.data else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-04T07:14:26 | s | 1 | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + + let request = Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(DropTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + ..Default::default() + })), + }); + let output = query(instance, request).await; + assert!(matches!(output.data, OutputData::AffectedRows(0))); + } + async fn test_handle_ddl_request(instance: &Instance) { let request = Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { @@ -131,6 +309,7 @@ mod test { ..Default::default() }), location: None, + add_if_not_exists: false, }], })), })), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 74c8a6c0f73d..11db34acb865 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -372,6 +372,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { add_columns: vec![AddColumn { column_def: Some(add_column), location: None, + add_if_not_exists: false, }], }); let expr = AlterTableExpr {