Skip to content

Commit

Permalink
Support Json type in postgres row -> arrow conversion (#40)
Browse files Browse the repository at this point in the history
* Support Json type in postgres row -> arrow conversion

* fix
  • Loading branch information
Sevenannn authored Aug 14, 2024
1 parent 1c6d92a commit 5191c63
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ r2d2 = { version = "0.8.10", optional = true }
rusqlite = { version = "0.31.0", optional = true }
sea-query = { version = "0.31.0", features = ["backend-sqlite", "backend-postgres", "postgres-array", "with-rust_decimal", "with-bigdecimal", "with-time"] }
secrecy = "0.8.0"
serde_json = "1.0.124"
snafu = "0.8.3"
time = "0.3.36"
tokio = { version = "1.38.0", features = ["macros", "fs"] }
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-uuid-1"], optional = true }
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4", "with-uuid-1", "with-serde_json-1"], optional = true }
tracing = "0.1.40"
uuid = { version = "1.9.1", optional = true }
postgres-native-tls = { version = "0.5.0", optional = true }
Expand Down
26 changes: 26 additions & 0 deletions src/sql/arrow_sql_gen/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use bigdecimal::ToPrimitive;
use chrono::Timelike;
use composite::CompositeType;
use sea_query::{Alias, ColumnType, SeaRc};
use serde_json::Value;
use snafu::prelude::*;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio_postgres::types::FromSql;
Expand Down Expand Up @@ -255,6 +256,30 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
Type::BOOL => {
handle_primitive_type!(builder, Type::BOOL, BooleanBuilder, bool, row, i);
}
Type::JSON => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
};
let Some(builder) = builder.as_any_mut().downcast_mut::<LargeStringBuilder>()
else {
return FailedToDowncastBuilderSnafu {
postgres_type: format!("{postgres_type}"),
}
.fail();
};
let v = row.try_get::<usize, Option<Value>>(i).with_context(|_| {
FailedToGetRowValueSnafu {
pg_type: Type::TIME,
}
})?;

match v {
Some(v) => {
builder.append_value(v.to_string());
}
None => builder.append_null(),
}
}
Type::TIME => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
Expand Down Expand Up @@ -560,6 +585,7 @@ fn map_column_type_to_data_type(column_type: &Type) -> Option<DataType> {
Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::UUID => Some(DataType::Utf8),
Type::BYTEA => Some(DataType::Binary),
Type::BOOL => Some(DataType::Boolean),
Type::JSON => Some(DataType::LargeUtf8),
// Inspect the scale from the first row. Precision will always be 38 for Decimal128.
Type::NUMERIC => None,
// We get a SystemTime that we can always convert into milliseconds
Expand Down

0 comments on commit 5191c63

Please sign in to comment.