Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve timezone information in arrow type - pg type conversion #59

Merged
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ futures = "0.3.30"
mysql_async = { version = "0.34.1", features = ["native-tls-tls", "chrono"], optional = true }
r2d2 = { version = "0.8.10", optional = true }
rusqlite = { version = "0.31.0", optional = true }
sea-query = { version = "0.31.0", features = ["backend-sqlite", "backend-postgres", "postgres-array", "with-rust_decimal", "with-bigdecimal", "with-time"] }
sea-query = { version = "0.31.0", features = ["backend-sqlite", "backend-postgres", "postgres-array", "with-rust_decimal", "with-bigdecimal", "with-time", "with-chrono"] }
secrecy = "0.8.0"
serde_json = "1.0.124"
snafu = "0.8.3"
Expand Down
61 changes: 54 additions & 7 deletions src/sql/arrow_sql_gen/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use bigdecimal::num_bigint::Sign;
use bigdecimal::BigDecimal;
use bigdecimal::ToPrimitive;
use byteorder::{BigEndian, ReadBytesExt};
use chrono::Timelike;
use chrono::{DateTime, Offset, Timelike, Utc};
use composite::CompositeType;
use geo_types::geometry::Point;
use sea_query::{Alias, ColumnType, SeaRc};
Expand Down Expand Up @@ -452,7 +452,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
};
dec_builder.append_value(v_i128);
}
ref pg_type @ (Type::TIMESTAMP | Type::TIMESTAMPTZ) => {
Type::TIMESTAMP => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
};
Expand All @@ -468,7 +468,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
let v = row
.try_get::<usize, Option<SystemTime>>(i)
.with_context(|_| FailedToGetRowValueSnafu {
pg_type: pg_type.clone(),
pg_type: Type::TIMESTAMP,
})?;

match v {
Expand All @@ -484,6 +484,55 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
None => builder.append_null(),
}
}
Type::TIMESTAMPTZ => {
Sevenannn marked this conversation as resolved.
Show resolved Hide resolved
let v = row
.try_get::<usize, Option<DateTime<Utc>>>(i)
.with_context(|_| FailedToGetRowValueSnafu {
pg_type: Type::TIMESTAMPTZ,
})?;

let time_zone = v.unwrap_or_default().offset().fix();
let timestampz_builder = builder.get_or_insert_with(|| {
Box::new(
TimestampMillisecondBuilder::new().with_timezone(time_zone.to_string()),
)
});

let Some(timestampz_builder) = timestampz_builder
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
else {
return FailedToDowncastBuilderSnafu {
postgres_type: format!("{postgres_type}"),
}
.fail();
};

if arrow_field.is_none() {
let Some(field_name) = column_names.get(i) else {
return NoColumnNameForIndexSnafu { index: i }.fail();
};
let new_arrow_field = Field::new(
field_name,
DataType::Timestamp(
TimeUnit::Millisecond,
Some(Arc::from(time_zone.to_string())),
),
true,
);

*arrow_field = Some(new_arrow_field);
}

match v {
Some(v) => {
let utc_timestamp = v.to_utc().timestamp_millis();
timestampz_builder.append_value(utc_timestamp);
}
None => timestampz_builder.append_null(),
}
}

Type::DATE => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
Expand Down Expand Up @@ -728,11 +777,9 @@ fn map_column_type_to_data_type(column_type: &Type) -> Option<DataType> {
Type::BOOL => Some(DataType::Boolean),
Type::JSON => Some(DataType::LargeUtf8),
// Inspect the scale from the first row. Precision will always be 38 for Decimal128.
Type::NUMERIC => None,
Type::NUMERIC | Type::TIMESTAMPTZ => None,
// We get a SystemTime that we can always convert into milliseconds
Type::TIMESTAMP | Type::TIMESTAMPTZ => {
Some(DataType::Timestamp(TimeUnit::Millisecond, None))
}
Type::TIMESTAMP => Some(DataType::Timestamp(TimeUnit::Millisecond, None)),
Type::DATE => Some(DataType::Date32),
Type::TIME => Some(DataType::Time64(TimeUnit::Nanosecond)),
Type::INTERVAL => Some(DataType::Interval(IntervalUnit::MonthDayNano)),
Expand Down
141 changes: 114 additions & 27 deletions src/sql/arrow_sql_gen/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::{
util::display::array_value_to_string,
};
use bigdecimal_0_3_0::BigDecimal;
use chrono::{DateTime, FixedOffset};
use num_bigint::BigInt;
use sea_query::{
Alias, ColumnDef, ColumnType, Expr, GenericBuilder, Index, InsertStatement, IntoIden,
Expand Down Expand Up @@ -346,7 +347,7 @@ impl InsertBuilder {
}
_ => unreachable!(),
},
DataType::Timestamp(TimeUnit::Second, _) => {
DataType::Timestamp(TimeUnit::Second, timezone) => {
let array = column
.as_any()
.downcast_ref::<array::TimestampSecondArray>();
Expand All @@ -356,13 +357,27 @@ impl InsertBuilder {
row_values.push(Keyword::Null.into());
continue;
}
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp(valid_array.value(row)),
&mut row_values,
)?;
if let Some(timezone) = timezone {
let utc_time = DateTime::from_timestamp_nanos(
valid_array.value(row) * 1_000_000_000,
)
.to_utc();
let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
Error::FailedToCreateInsertStatement {
source: "Unable to parse arrow timezone information".into(),
},
)?;
let time_with_offset = utc_time.with_timezone(&offset);
row_values.push(time_with_offset.into());
} else {
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp(valid_array.value(row)),
&mut row_values,
)?;
}
}
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
DataType::Timestamp(TimeUnit::Millisecond, timezone) => {
let array = column
.as_any()
.downcast_ref::<array::TimestampMillisecondArray>();
Expand All @@ -372,15 +387,29 @@ impl InsertBuilder {
row_values.push(Keyword::Null.into());
continue;
}
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(
i128::from(valid_array.value(row)) * 1_000_000,
),
&mut row_values,
)?;
if let Some(timezone) = timezone {
let utc_time = DateTime::from_timestamp_nanos(
valid_array.value(row) * 1_000_000,
)
.to_utc();
let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
Error::FailedToCreateInsertStatement {
source: "Unable to parse arrow timezone information".into(),
},
)?;
let time_with_offset = utc_time.with_timezone(&offset);
row_values.push(time_with_offset.into());
} else {
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(
i128::from(valid_array.value(row)) * 1_000_000,
),
&mut row_values,
)?;
}
}
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
let array = column
.as_any()
.downcast_ref::<array::TimestampMicrosecondArray>();
Expand All @@ -390,15 +419,28 @@ impl InsertBuilder {
row_values.push(Keyword::Null.into());
continue;
}
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(
i128::from(valid_array.value(row)) * 1_000,
),
&mut row_values,
)?;
if let Some(timezone) = timezone {
let utc_time =
DateTime::from_timestamp_nanos(valid_array.value(row) * 1_000)
.to_utc();
let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
Error::FailedToCreateInsertStatement {
source: "Unable to parse arrow timezone information".into(),
},
)?;
let time_with_offset = utc_time.with_timezone(&offset);
row_values.push(time_with_offset.into());
} else {
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(
i128::from(valid_array.value(row)) * 1_000,
),
&mut row_values,
)?;
}
}
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
let array = column
.as_any()
.downcast_ref::<array::TimestampNanosecondArray>();
Expand All @@ -408,12 +450,24 @@ impl InsertBuilder {
row_values.push(Keyword::Null.into());
continue;
}
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(i128::from(
valid_array.value(row),
)),
&mut row_values,
)?;
if let Some(timezone) = timezone {
let utc_time =
DateTime::from_timestamp_nanos(valid_array.value(row)).to_utc();
let offset = parse_fixed_offset(timezone.as_ref()).ok_or(
Error::FailedToCreateInsertStatement {
source: "Unable to parse arrow timezone information".into(),
},
)?;
let time_with_offset = utc_time.with_timezone(&offset);
row_values.push(time_with_offset.into());
} else {
insert_timestamp_into_row_values(
OffsetDateTime::from_unix_timestamp_nanos(i128::from(
valid_array.value(row),
)),
&mut row_values,
)?;
}
}
}
DataType::List(list_type) => {
Expand Down Expand Up @@ -894,6 +948,34 @@ fn insert_timestamp_into_row_values(
}
}

// Reference: https://github.com/apache/arrow-rs/blob/6c59b7637592e4b67b18762b8313f91086c0d5d8/arrow-array/src/timezone.rs#L25
#[allow(clippy::cast_lossless)]
fn parse_fixed_offset(tz: &str) -> Option<FixedOffset> {
let bytes = tz.as_bytes();

let mut values = match bytes.len() {
// [+-]XX:XX
6 if bytes[3] == b':' => [bytes[1], bytes[2], bytes[4], bytes[5]],
// [+-]XXXX
5 => [bytes[1], bytes[2], bytes[3], bytes[4]],
// [+-]XX
3 => [bytes[1], bytes[2], b'0', b'0'],
_ => return None,
};
values.iter_mut().for_each(|x| *x = x.wrapping_sub(b'0'));
if values.iter().any(|x| *x > 9) {
return None;
}
let secs =
(values[0] * 10 + values[1]) as i32 * 60 * 60 + (values[2] * 10 + values[3]) as i32 * 60;

match bytes[0] {
b'+' => FixedOffset::east_opt(secs),
b'-' => FixedOffset::west_opt(secs),
_ => None,
}
}

#[allow(clippy::needless_pass_by_value)]
fn insert_list_into_row_values(
list_array: Arc<dyn Array>,
Expand Down Expand Up @@ -1021,7 +1103,12 @@ pub(crate) fn map_data_type_to_column_type(data_type: &DataType) -> ColumnType {
DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => {
ColumnType::Decimal(Some((u32::from(*p), *s as u32)))
}
DataType::Timestamp(_unit, _time_zone) => ColumnType::Timestamp,
DataType::Timestamp(_unit, time_zone) => {
if time_zone.is_some() {
return ColumnType::TimestampWithTimeZone;
}
ColumnType::Timestamp
}
DataType::Date32 | DataType::Date64 => ColumnType::Date,
DataType::Time64(_unit) | DataType::Time32(_unit) => ColumnType::Time,
DataType::List(list_type)
Expand Down
Loading