Skip to content

Commit

Permalink
fix: security fix, sqlx, hashbrown, idna
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Jan 10, 2025
1 parent 812a775 commit e933689
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 394 deletions.
777 changes: 494 additions & 283 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tests-fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ serde_yaml = "0.9"
snafu = { workspace = true }
sql = { workspace = true }
sqlparser.workspace = true
sqlx = { version = "0.6", features = [
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
Expand Down
7 changes: 5 additions & 2 deletions tests-fuzz/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ pub async fn init_greptime_connections_via_env() -> Connections {
/// Connects to GreptimeDB.
pub async fn init_greptime_connections(mysql: Option<String>) -> Connections {
let mysql = if let Some(addr) = mysql {
let mut opts: MySqlConnectOptions = format!("mysql://{addr}/public").parse().unwrap();
opts.log_statements(LevelFilter::Off);
let opts = format!("mysql://{addr}/public")
.parse::<MySqlConnectOptions>()
.unwrap()
.log_statements(LevelFilter::Off);

Some(MySqlPoolOptions::new().connect_with(opts).await.unwrap())
} else {
None
Expand Down
20 changes: 5 additions & 15 deletions tests-fuzz/src/utils/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use std::time::Duration;
use common_telemetry::info;
use humantime::parse_duration;
use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type};
use sqlx::MySqlPool;

use super::wait::wait_condition_fn;
use crate::error::{self, Result};
Expand All @@ -34,19 +33,10 @@ pub struct NodeInfo {
}

/// Returns all [NodeInfo] in the cluster.
pub async fn fetch_nodes<'a, DB, E>(e: E) -> Result<Vec<NodeInfo>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
let sql = "select * from information_schema.cluster_info;";
pub async fn fetch_nodes(db: &MySqlPool) -> Result<Vec<NodeInfo>> {
let sql = "select * from information_schema.cluster_info";
sqlx::query_as::<_, NodeInfo>(sql)
.fetch_all(e)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })
}
Expand All @@ -55,7 +45,7 @@ where
///
/// This function repeatedly checks the status of all datanodes and waits until all of them are online
/// or the timeout period elapses. A datanode is considered online if its `active_time` is less than 3 seconds.
pub async fn wait_for_all_datanode_online(greptime: Pool<MySql>, timeout: Duration) {
pub async fn wait_for_all_datanode_online(greptime: MySqlPool, timeout: Duration) {
wait_condition_fn(
timeout,
|| {
Expand Down
49 changes: 10 additions & 39 deletions tests-fuzz/src/utils/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use std::time::Duration;

use common_telemetry::info;
use snafu::ResultExt;
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, MySql, Pool, Type};
use sqlx::MySqlPool;
use store_api::storage::RegionId;

use super::wait::wait_condition_fn;
Expand All @@ -36,61 +35,33 @@ pub struct PartitionCount {
pub count: i64,
}

pub async fn count_partitions<'a, DB, E>(e: E, datanode_id: u64) -> Result<PartitionCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> u64: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
pub async fn count_partitions(db: &MySqlPool, datanode_id: u64) -> Result<PartitionCount> {
let sql = "select count(1) as count from information_schema.region_peers where peer_id == ?";
Ok(sqlx::query_as::<_, PartitionCount>(sql)
sqlx::query_as::<_, PartitionCount>(sql)
.bind(datanode_id)
.fetch_all(e)
.fetch_one(db)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
.context(error::ExecuteQuerySnafu { sql })
}

/// Returns the [Partition] of the specific `region_id`
pub async fn fetch_partition<'a, DB, E>(e: E, region_id: u64) -> Result<Partition>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> u64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> u64: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
pub async fn fetch_partition(db: &MySqlPool, region_id: u64) -> Result<Partition> {
let sql = "select region_id, peer_id as datanode_id from information_schema.region_peers where region_id = ?;";
sqlx::query_as::<_, Partition>(sql)
.bind(region_id)
.fetch_one(e)
.fetch_one(db)
.await
.context(error::ExecuteQuerySnafu { sql })
}

/// Returns all [Partition] of the specific `table`
pub async fn fetch_partitions<'a, DB, E>(e: E, table_name: Ident) -> Result<Vec<Partition>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> u64: Decode<'c, DB> + Type<DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
pub async fn fetch_partitions(db: &MySqlPool, table_name: Ident) -> Result<Vec<Partition>> {
let sql = "select b.peer_id as datanode_id, a.greptime_partition_id as region_id
from information_schema.partitions a left join information_schema.region_peers b
on a.greptime_partition_id = b.region_id where a.table_name= ? order by datanode_id asc;";
sqlx::query_as::<_, Partition>(sql)
.bind(table_name.value.to_string())
.fetch_all(e)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })
}
Expand Down Expand Up @@ -124,7 +95,7 @@ pub fn pretty_print_region_distribution(distribution: &BTreeMap<u64, Vec<RegionI
/// This function repeatedly checks the number of partitions on the specified datanode and waits until
/// the count reaches zero or the timeout period elapses. It logs the number of partitions on each check.
pub async fn wait_for_all_regions_evicted(
greptime: Pool<MySql>,
greptime: MySqlPool,
selected_datanode: u64,
timeout: Duration,
) {
Expand Down
19 changes: 5 additions & 14 deletions tests-fuzz/src/validator/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use common_telemetry::debug;
use datatypes::data_type::DataType;
use snafu::{ensure, ResultExt};
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Type};
use sqlx::MySqlPool;

use crate::error::{self, Result};
use crate::ir::create_expr::ColumnOption;
Expand Down Expand Up @@ -198,24 +197,16 @@ pub fn assert_eq(fetched_columns: &[ColumnEntry], columns: &[Column]) -> Result<
}

/// Returns all [ColumnEntry] of the `table_name` from `information_schema`.
pub async fn fetch_columns<'a, DB, E>(
e: E,
pub async fn fetch_columns(
db: &MySqlPool,
schema_name: Ident,
table_name: Ident,
) -> Result<Vec<ColumnEntry>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
) -> Result<Vec<ColumnEntry>> {
let sql = "SELECT table_schema, table_name, column_name, greptime_data_type as data_type, semantic_type, column_default, is_nullable FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
sqlx::query_as::<_, ColumnEntry>(sql)
.bind(schema_name.value.to_string())
.bind(table_name.value.to_string())
.fetch_all(e)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })
}
Expand Down
35 changes: 8 additions & 27 deletions tests-fuzz/src/validator/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ use common_time::date::Date;
use common_time::{DateTime, Timestamp};
use datatypes::value::Value;
use snafu::{ensure, ResultExt};
use sqlx::database::HasArguments;
use sqlx::{
Column, ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Row, Type, TypeInfo,
ValueRef,
};
use sqlx::mysql::MySqlRow;
use sqlx::{Column, ColumnIndex, Database, MySqlPool, Row, TypeInfo, ValueRef};

use crate::error::{self, Result};
use crate::ir::insert_expr::{RowValue, RowValues};
Expand Down Expand Up @@ -151,33 +148,17 @@ pub struct ValueCount {
pub count: i64,
}

pub async fn count_values<'a, DB, E>(e: E, sql: &'a str) -> Result<ValueCount>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> i64: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
for<'c> &'c str: ColumnIndex<<DB as Database>::Row>,
{
Ok(sqlx::query_as::<_, ValueCount>(sql)
.fetch_all(e)
pub async fn count_values(db: &MySqlPool, sql: &str) -> Result<ValueCount> {
sqlx::query_as::<_, ValueCount>(sql)
.fetch_one(db)
.await
.context(error::ExecuteQuerySnafu { sql })?
.remove(0))
.context(error::ExecuteQuerySnafu { sql })
}

/// Returns all [RowEntry] of the `table_name`.
pub async fn fetch_values<'a, DB, E>(e: E, sql: &'a str) -> Result<Vec<<DB as Database>::Row>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
{
pub async fn fetch_values(db: &MySqlPool, sql: &str) -> Result<Vec<MySqlRow>> {
sqlx::query(sql)
.fetch_all(e)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })
}
15 changes: 3 additions & 12 deletions tests-fuzz/src/validator/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.

use snafu::{ensure, ResultExt};
use sqlx::database::HasArguments;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments, Row, Type};
use sqlx::{MySqlPool, Row};

use crate::error::{self, Result, UnexpectedSnafu};
use crate::ir::alter_expr::AlterTableOption;
Expand Down Expand Up @@ -47,17 +46,9 @@ fn parse_show_create(show_create: &str) -> Result<Vec<AlterTableOption>> {
}

/// Fetches table options from the context
pub async fn fetch_table_options<'a, DB, E>(e: E, sql: &'a str) -> Result<Vec<AlterTableOption>>
where
DB: Database,
<DB as HasArguments<'a>>::Arguments: IntoArguments<'a, DB>,
for<'c> E: 'a + Executor<'c, Database = DB>,
for<'c> String: Decode<'c, DB> + Type<DB>,
for<'c> String: Encode<'c, DB> + Type<DB>,
usize: ColumnIndex<<DB as Database>::Row>,
{
pub async fn fetch_table_options(db: &MySqlPool, sql: &str) -> Result<Vec<AlterTableOption>> {
let fetched_rows = sqlx::query(sql)
.fetch_all(e)
.fetch_all(db)
.await
.context(error::ExecuteQuerySnafu { sql })?;
ensure!(
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ session.workspace = true
similar-asserts.workspace = true
snafu.workspace = true
sql.workspace = true
sqlx = { version = "0.6", features = [
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
Expand Down

0 comments on commit e933689

Please sign in to comment.