Skip to content

Commit

Permalink
feat: support add if not exists in the gRPC alter kind (#5273)
Browse files Browse the repository at this point in the history
* test: test adding existing columns

* chore: add more checks to AlterKind

* chore: update logs

* fix: check and build table info first

* feat: Add add_if_not_exists flag to alter expr

* feat: skip existing columns when building alter kind

* checks in make_region_alter_kind()
* reuse the alter kind

* test: fix tests in common-meta

* chore: fix typos

* chore: update comments
  • Loading branch information
evenyag committed Jan 3, 2025
1 parent 5e3c594 commit 5c6161a
Show file tree
Hide file tree
Showing 22 changed files with 633 additions and 142 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7c01e4a8e64580707438dabc5cf7f4e2584c28b6" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
Expand Down
7 changes: 7 additions & 0 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_schema: schema,
is_key: column_def.semantic_type == SemanticType::Tag as i32,
location: parse_location(ac.location)?,
add_if_not_exists: ac.add_if_not_exists,
})
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -220,6 +221,7 @@ mod tests {
..Default::default()
}),
location: None,
add_if_not_exists: true,
}],
})),
};
Expand All @@ -240,6 +242,7 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
assert!(add_column.add_if_not_exists);
}

#[test]
Expand All @@ -265,6 +268,7 @@ mod tests {
location_type: LocationType::First.into(),
after_column_name: String::default(),
}),
add_if_not_exists: false,
},
AddColumn {
column_def: Some(ColumnDef {
Expand All @@ -280,6 +284,7 @@ mod tests {
location_type: LocationType::After.into(),
after_column_name: "ts".to_string(),
}),
add_if_not_exists: true,
},
],
})),
Expand Down Expand Up @@ -308,6 +313,7 @@ mod tests {
}),
add_column.location
);
assert!(add_column.add_if_not_exists);

let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
Expand All @@ -317,6 +323,7 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
assert!(!add_column.add_if_not_exists);
}

#[test]
Expand Down
5 changes: 5 additions & 0 deletions src/common/grpc-expr/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);

let memory_column = &add_columns.add_columns[1];
assert_eq!(
Expand All @@ -311,6 +312,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);

let time_column = &add_columns.add_columns[2];
assert_eq!(
Expand All @@ -323,6 +325,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);

let interval_column = &add_columns.add_columns[3];
assert_eq!(
Expand All @@ -335,6 +338,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);

let decimal_column = &add_columns.add_columns[4];
assert_eq!(
Expand All @@ -352,6 +356,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
}

#[test]
Expand Down
4 changes: 4 additions & 0 deletions src/common/grpc-expr/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ pub fn build_create_table_expr(
Ok(expr)
}

/// Find columns that are not present in the schema and return them as `AddColumns`
/// for adding columns automatically.
/// It always sets `add_if_not_exists` to `true` for now.
pub fn extract_new_columns(
schema: &Schema,
column_exprs: Vec<ColumnExpr>,
Expand All @@ -213,6 +216,7 @@ pub fn extract_new_columns(
AddColumn {
column_def,
location: None,
add_if_not_exists: true,
}
})
.collect::<Vec<_>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure {
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
Expand Down
49 changes: 41 additions & 8 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;

use crate::cache_invalidator::Context;
Expand All @@ -51,10 +51,14 @@ use crate::{metrics, ClusterId};

/// The alter table procedure
pub struct AlterTableProcedure {
// The runtime context.
/// The runtime context.
context: DdlContext,
// The serialized data.
/// The serialized data.
data: AlterTableData,
/// Cached new table metadata in the prepare step.
/// If we recover the procedure from json, then the table info value is not cached.
/// But we already validated it in the prepare step.
new_table_info: Option<TableInfo>,
}

impl AlterTableProcedure {
Expand All @@ -70,18 +74,31 @@ impl AlterTableProcedure {
Ok(Self {
context,
data: AlterTableData::new(task, table_id, cluster_id),
new_table_info: None,
})
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure { context, data })
Ok(AlterTableProcedure {
context,
data,
new_table_info: None,
})
}

// Checks whether the table exists.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.check_alter().await?;
self.fill_table_info().await?;

// Validates the request and builds the new table info.
// We need to build the new table info here because we should ensure the alteration
// is valid in `UpdateMeta` state as we already altered the region.
// Safety: `fill_table_info()` already set it.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);

// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if matches!(alter_kind, Kind::RenameTable { .. }) {
Expand All @@ -106,14 +123,22 @@ impl AlterTableProcedure {

let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
let alter_kind = self.make_region_alter_kind()?;

info!(
"Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
self.data.table_ref(),
table_id,
alter_kind,
);

for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);

for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.make_alter_region_request(region_id)?;
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
debug!("Submitting {request:?} to {datanode}");

let datanode = datanode.clone();
Expand Down Expand Up @@ -150,7 +175,15 @@ impl AlterTableProcedure {
let table_ref = self.data.table_ref();
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
// Gets the table info from the cache or builds it.
let new_info = match &self.new_table_info {
Some(cached) => cached.clone(),
None => self.build_new_table_info(&table_info_value.table_info)
.inspect_err(|e| {
// We already check the table info in the prepare step so this should not happen.
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
})?,
};

debug!(
"Starting update table: {} metadata, new table info {:?}",
Expand All @@ -174,7 +207,7 @@ impl AlterTableProcedure {
.await?;
}

info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
}
Expand Down
Loading

0 comments on commit 5c6161a

Please sign in to comment.