Skip to content

Commit

Permalink
adds additional functionality to cover more partitioning capability
Browse files Browse the repository at this point in the history
  • Loading branch information
chalmerlowe committed Sep 11, 2024
1 parent b54bdde commit 42f9778
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 57 deletions.
54 changes: 47 additions & 7 deletions sqlalchemy_bigquery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,9 @@ def _raise_for_type(self, option, value, expected_type):
)

def _process_time_partitioning(
self, table: Table, time_partitioning: TimePartitioning
self,
table: Table,
time_partitioning: TimePartitioning,
):
"""
Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp.
Expand All @@ -830,23 +832,61 @@ def _process_time_partitioning(
- Given a table with a TIMESTAMP type column 'event_timestamp' and setting
'time_partitioning.field' to 'event_timestamp', the function returns
"PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)".
Current inputs allowed by BQ and covered by this function include:
* _PARTITIONDATE
* DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR)
* TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR)
* DATE_TRUNC(<date_column>, MONTH/YEAR)
Additional allowed options not explicitly covered by this function
include:
* DATE(_PARTITIONTIME)
* DATE(<timestamp_column>)
* DATE(<datetime_column>)
* DATE column
"""

field = "_PARTITIONDATE"
trunc_fn = "DATE_TRUNC"

# Format used with _PARTITIONDATE which can only be used for
# DAY / MONTH / YEAR
if time_partitioning.field is None and field == "_PARTITIONDATE":
if time_partitioning.type_ in {"DAY", "MONTH", "YEAR"}:
return f"PARTITION BY {trunc_fn}({field})"
else:
raise ValueError(
f"_PARTITIONDATE can only be used with TimePartitioningTypes {{DAY, MONTH, YEAR}} received {time_partitioning.type_}"
)

if time_partitioning.field is not None:
field = time_partitioning.field

if isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.DATE,
table.columns[field].type,
(sqlalchemy.sql.sqltypes.TIMESTAMP),
):
return f"PARTITION BY {field}"
trunc_fn = "TIMESTAMP_TRUNC"
elif isinstance(
table.columns[time_partitioning.field].type,
sqlalchemy.sql.sqltypes.TIMESTAMP,
table.columns[field].type,
sqlalchemy.sql.sqltypes.DATETIME,
):
trunc_fn = "TIMESTAMP_TRUNC"
trunc_fn = "DATETIME_TRUNC"

if isinstance(
table.columns[field].type,
sqlalchemy.sql.sqltypes.DATE,
):
if time_partitioning.type_ in {"DAY", "MONTH", "YEAR"}:
# CHECK for type: DAY/MONTH/YEAR
trunc_fn = "DATE_TRUNC"
else:
raise ValueError(
f"DATE_TRUNC can only be used with TimePartitioningTypes {{DAY, MONTH, YEAR}} received {time_partitioning.type_}"
)

# Format used with generically with DATE, TIMESTAMP, DATETIME
return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})"

def _process_range_partitioning(
Expand Down
134 changes: 84 additions & 50 deletions tests/unit/test_table_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,59 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn):
)


def test_table_time_partitioning_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
# DATETIME with type and field
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
# DATE dtype
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
# TIMESTAMP dtype
(sqlalchemy.TIMESTAMP, TimePartitioningType.HOUR, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.DAY, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.MONTH, "TIMESTAMP_TRUNC"),
(sqlalchemy.TIMESTAMP, TimePartitioningType.YEAR, "TIMESTAMP_TRUNC"),
# DATETIME dtype
(sqlalchemy.DATETIME, TimePartitioningType.HOUR, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.DAY, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.MONTH, "DATETIME_TRUNC"),
(sqlalchemy.DATETIME, TimePartitioningType.YEAR, "DATETIME_TRUNC"),
],
)
def test_table_time_partitioning_date_timestamp_and_datetime_dialect_option(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""Expect table creation to fail as SQLite does not support partitioned tables
Each parametrization ensures that the appropriate function is generated
depending on whether the column datatype is DATE, TIMESTAMP, DATETIME and
whether the TimePartitioningType is HOUR, DAY, MONTH, YEAR.
Notes:
* BigQuery will not partition on DATE by HOUR, so that is expected to xfail.
"""

with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(
field="createdAt", type_=time_partitioning_type
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)"
f"CREATE TABLE `some_table` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(createdAt, {time_partitioning_type})"
)


Expand All @@ -139,75 +178,70 @@ def test_table_require_partition_filter_dialect_option(faux_conn):
)


# DATETIME WITH FIELD but no TYPE: defaults to DAY
def test_table_time_partitioning_with_field_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
)
"""Expect table creation to fail as SQLite does not support partitioned tables
Confirms that if the column datatype is DATETIME but no TIMEPARTITIONINGTYPE
has been supplied, the system will default to DAY.
"""

def test_table_time_partitioning_by_month_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DateTime),
bigquery_time_partitioning=TimePartitioning(
field="createdAt",
type_=TimePartitioningType.MONTH,
),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, MONTH)"
)


def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table",
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
)

assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )"
" PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)"
"CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )"
" PARTITION BY DATE_TRUNC(createdAt, DAY)"
)


def test_table_time_partitioning_with_date_dialect_option(faux_conn):
# expect table creation to fail as SQLite does not support partitioned tables
@pytest.mark.parametrize(
"column_dtype,time_partitioning_type,func_name",
[
pytest.param(
sqlalchemy.DATE,
TimePartitioningType.HOUR,
"DATE_TRUNC",
marks=pytest.mark.xfail,
),
(sqlalchemy.DATE, TimePartitioningType.DAY, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"),
(sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"),
],
)
def test_table_time_partitioning_with_partitiondate_option(
faux_conn, column_dtype, time_partitioning_type, func_name
):
"""Expect table creation to fail as SQLite does not support partitioned tables
Each parametrization ensures that the appropriate function is generated
depending on the pseudocolumn datatype is _PARTITIONDATE and whether the
TimePartitioningType is HOUR, DAY, MONTH, YEAR.
Notes:
* BigQuery will not partition on _PARTITIONDATE by HOUR, so that is expected
to xfail.
"""
with pytest.raises(sqlite3.OperationalError):
setup_table(
faux_conn,
"some_table_2",
# schema=str([id_, createdAt]),
sqlalchemy.Column("id", sqlalchemy.Integer),
sqlalchemy.Column("createdAt", sqlalchemy.DATE),
bigquery_time_partitioning=TimePartitioning(field="createdAt"),
sqlalchemy.Column("createdAt", column_dtype),
bigquery_time_partitioning=TimePartitioning(type_=time_partitioning_type),
)

# confirm that the following code creates the correct SQL string
assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == (
"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` DATE )"
" PARTITION BY createdAt"
f"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )"
f" PARTITION BY {func_name}(_PARTITIONDATE)"
)


Expand Down

0 comments on commit 42f9778

Please sign in to comment.