Skip to content

Commit

Permalink
feat: Enable Lossless Timestamps in BQ java client lib (#3589)
Browse files Browse the repository at this point in the history
* feat: Enable Lossless Timestamps in BQ java client lib

* Fix Formatting.

* Fix tests for FieldValue and FieldValueList.

* Add more robust testing to IT test, minor formatting fixes.
  • Loading branch information
whuffman36 authored Dec 2, 2024
1 parent 3eef3a9 commit c0b874a
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1206,15 +1206,15 @@ public TableDataList call() {
new PageImpl<>(
new TableDataPageFetcher(tableId, schema, serviceOptions, cursor, pageOptionMap),
cursor,
transformTableData(result.getRows(), schema)),
transformTableData(result.getRows(), schema, serviceOptions.getUseInt64Timestamps())),
result.getTotalRows());
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}

private static Iterable<FieldValueList> transformTableData(
Iterable<TableRow> tableDataPb, final Schema schema) {
Iterable<TableRow> tableDataPb, final Schema schema, boolean useInt64Timestamps) {
return ImmutableList.copyOf(
Iterables.transform(
tableDataPb != null ? tableDataPb : ImmutableList.<TableRow>of(),
Expand All @@ -1223,7 +1223,7 @@ private static Iterable<FieldValueList> transformTableData(

@Override
public FieldValueList apply(TableRow rowPb) {
return FieldValueList.fromPb(rowPb.getF(), fields);
return FieldValueList.fromPb(rowPb.getF(), fields, useInt64Timestamps);
}
}));
}
Expand Down Expand Up @@ -1347,7 +1347,8 @@ public TableResult query(QueryJobConfiguration configuration, JobOption... optio

// If all parameters passed in configuration are supported by the query() method on the backend,
// put on fast path
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
QueryRequestInfo requestInfo =
new QueryRequestInfo(configuration, getOptions().getUseInt64Timestamps());
if (requestInfo.isFastQuerySupported(null)) {
String projectId = getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
Expand Down Expand Up @@ -1420,7 +1421,8 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
// fetch next pages of results
new QueryPageFetcher(jobId, schema, getOptions(), cursor, optionMap(options)),
cursor,
transformTableData(results.getRows(), schema)))
transformTableData(
results.getRows(), schema, getOptions().getUseInt64Timestamps())))
.setJobId(jobId)
.setQueryId(results.getQueryId())
.build();
Expand All @@ -1433,7 +1435,8 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
new PageImpl<>(
new TableDataPageFetcher(null, schema, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows(), schema)))
transformTableData(
results.getRows(), schema, getOptions().getUseInt64Timestamps())))
// Return the JobID of the successful job
.setJobId(
results.getJobReference() != null ? JobId.fromPb(results.getJobReference()) : null)
Expand All @@ -1448,7 +1451,8 @@ public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOp

// If all parameters passed in configuration are supported by the query() method on the backend,
// put on fast path
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
QueryRequestInfo requestInfo =
new QueryRequestInfo(configuration, getOptions().getUseInt64Timestamps());
if (requestInfo.isFastQuerySupported(jobId)) {
// Be careful when setting the projectID in JobId, if a projectID is specified in the JobId,
// the job created by the query method will use that project. This may cause the query to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public class BigQueryOptions extends ServiceOptions<BigQuery, BigQueryOptions> {
private static final int DEFAULT_READ_API_TIME_OUT = 60000;
private static final String BIGQUERY_SCOPE = "https://www.googleapis.com/auth/bigquery";
private static final Set<String> SCOPES = ImmutableSet.of(BIGQUERY_SCOPE);
private static final long serialVersionUID = -2437598817433266049L;
private static final long serialVersionUID = -2437598817433266048L;
private final String location;
// set the option ThrowNotFound when you want to throw the exception when the value not found
private boolean setThrowNotFound;
private boolean useInt64Timestamps;
private String queryPreviewEnabled = System.getenv("QUERY_PREVIEW_ENABLED");

public static class DefaultBigQueryFactory implements BigQueryFactory {
Expand All @@ -63,6 +64,7 @@ public ServiceRpc create(BigQueryOptions options) {
public static class Builder extends ServiceOptions.Builder<BigQuery, BigQueryOptions, Builder> {

private String location;
private boolean useInt64Timestamps;

private Builder() {}

Expand All @@ -84,6 +86,11 @@ public Builder setLocation(String location) {
return this;
}

public Builder setUseInt64Timestamps(boolean useInt64Timestamps) {
this.useInt64Timestamps = useInt64Timestamps;
return this;
}

@Override
public BigQueryOptions build() {
return new BigQueryOptions(this);
Expand All @@ -93,6 +100,7 @@ public BigQueryOptions build() {
private BigQueryOptions(Builder builder) {
super(BigQueryFactory.class, BigQueryRpcFactory.class, builder, new BigQueryDefaults());
this.location = builder.location;
this.useInt64Timestamps = builder.useInt64Timestamps;
}

private static class BigQueryDefaults implements ServiceDefaults<BigQuery, BigQueryOptions> {
Expand Down Expand Up @@ -140,6 +148,10 @@ public void setThrowNotFound(boolean setThrowNotFound) {
this.setThrowNotFound = setThrowNotFound;
}

public void setUseInt64Timestamps(boolean useInt64Timestamps) {
this.useInt64Timestamps = useInt64Timestamps;
}

@VisibleForTesting
public void setQueryPreviewEnabled(String queryPreviewEnabled) {
this.queryPreviewEnabled = queryPreviewEnabled;
Expand All @@ -149,6 +161,10 @@ public boolean getThrowNotFound() {
return setThrowNotFound;
}

public boolean getUseInt64Timestamps() {
return useInt64Timestamps;
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.io.BaseEncoding;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -46,10 +47,11 @@
public class FieldValue implements Serializable {

private static final int MICROSECONDS = 1000000;
private static final long serialVersionUID = 469098630191710061L;
private static final long serialVersionUID = 469098630191710062L;

private final Attribute attribute;
private final Object value;
private final Boolean useInt64Timestamps;

/** The field value's attribute, giving information on the field's content type. */
public enum Attribute {
Expand All @@ -74,8 +76,13 @@ public enum Attribute {
}

private FieldValue(Attribute attribute, Object value) {
this(attribute, value, false);
}

private FieldValue(Attribute attribute, Object value, Boolean useInt64Timestamps) {
this.attribute = checkNotNull(attribute);
this.value = value;
this.useInt64Timestamps = useInt64Timestamps;
}

/**
Expand Down Expand Up @@ -107,6 +114,10 @@ public Object getValue() {
return value;
}

public Boolean getUseInt64Timestamps() {
return useInt64Timestamps;
}

/**
* Returns this field's value as a {@link String}. This method should only be used if the
* corresponding field has primitive type ({@link LegacySQLTypeName#BYTES}, {@link
Expand Down Expand Up @@ -207,6 +218,9 @@ public boolean getBooleanValue() {
*/
@SuppressWarnings("unchecked")
public long getTimestampValue() {
if (useInt64Timestamps) {
return new BigInteger(getStringValue()).longValue();
}
// timestamps are encoded in the format 1408452095.22 where the integer part is seconds since
// epoch (e.g. 1408452095.22 == 2014-08-19 07:41:35.220 -05:00)
BigDecimal secondsWithMicro = new BigDecimal(getStringValue());
Expand Down Expand Up @@ -317,12 +331,13 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("attribute", attribute)
.add("value", value)
.add("useInt64Timestamps", useInt64Timestamps)
.toString();
}

@Override
public final int hashCode() {
return Objects.hash(attribute, value);
return Objects.hash(attribute, value, useInt64Timestamps);
}

@Override
Expand All @@ -334,7 +349,9 @@ public final boolean equals(Object obj) {
return false;
}
FieldValue other = (FieldValue) obj;
return attribute == other.attribute && Objects.equals(value, other.value);
return attribute == other.attribute
&& Objects.equals(value, other.value)
&& Objects.equals(useInt64Timestamps, other.useInt64Timestamps);
}

/**
Expand All @@ -353,42 +370,52 @@ public final boolean equals(Object obj) {
*/
@BetaApi
public static FieldValue of(Attribute attribute, Object value) {
return new FieldValue(attribute, value);
return of(attribute, value, false);
}

@BetaApi
public static FieldValue of(Attribute attribute, Object value, Boolean useInt64Timestamps) {
return new FieldValue(attribute, value, useInt64Timestamps);
}

static FieldValue fromPb(Object cellPb) {
return fromPb(cellPb, null);
return fromPb(cellPb, null, false);
}

@SuppressWarnings("unchecked")
static FieldValue fromPb(Object cellPb, Field recordSchema) {
static FieldValue fromPb(Object cellPb, Field recordSchema, Boolean useInt64Timestamps) {
if (Data.isNull(cellPb)) {
return FieldValue.of(Attribute.PRIMITIVE, null);
return FieldValue.of(Attribute.PRIMITIVE, null, useInt64Timestamps);
}
if (cellPb instanceof String) {
if ((recordSchema != null)
&& (recordSchema.getType() == LegacySQLTypeName.RANGE)
&& (recordSchema.getRangeElementType() != null)) {
return FieldValue.of(
Attribute.RANGE, Range.of((String) cellPb, recordSchema.getRangeElementType()));
Attribute.RANGE,
Range.of((String) cellPb, recordSchema.getRangeElementType()),
useInt64Timestamps);
}
return FieldValue.of(Attribute.PRIMITIVE, cellPb);
return FieldValue.of(Attribute.PRIMITIVE, cellPb, useInt64Timestamps);
}
if (cellPb instanceof List) {
return FieldValue.of(Attribute.REPEATED, FieldValueList.fromPb((List<Object>) cellPb, null));
return FieldValue.of(
Attribute.REPEATED,
FieldValueList.fromPb((List<Object>) cellPb, null, useInt64Timestamps));
}
if (cellPb instanceof Map) {
Map<String, Object> cellMapPb = (Map<String, Object>) cellPb;
if (cellMapPb.containsKey("f")) {
FieldList subFieldsSchema = recordSchema != null ? recordSchema.getSubFields() : null;
return FieldValue.of(
Attribute.RECORD,
FieldValueList.fromPb((List<Object>) cellMapPb.get("f"), subFieldsSchema));
FieldValueList.fromPb(
(List<Object>) cellMapPb.get("f"), subFieldsSchema, useInt64Timestamps));
}
// This should never be the case when we are processing a first level table field (i.e. a
// row's field, not a record sub-field)
if (cellMapPb.containsKey("v")) {
return FieldValue.fromPb(cellMapPb.get("v"), recordSchema);
return FieldValue.fromPb(cellMapPb.get("v"), recordSchema, useInt64Timestamps);
}
}
throw new IllegalArgumentException("Unexpected table cell format");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ FieldValueList withSchema(FieldList schema) {
}

static FieldValueList fromPb(List<?> rowPb, FieldList schema) {
return fromPb(rowPb, schema, false);
}

static FieldValueList fromPb(List<?> rowPb, FieldList schema, Boolean useInt64Timestamps) {
List<FieldValue> row = new ArrayList<>(rowPb.size());
if (schema != null) {
if (schema.size() != rowPb.size()) {
Expand All @@ -120,11 +124,11 @@ static FieldValueList fromPb(List<?> rowPb, FieldList schema) {
Iterator<Field> schemaIter = schema.iterator();
Iterator<?> rowPbIter = rowPb.iterator();
while (rowPbIter.hasNext() && schemaIter.hasNext()) {
row.add(FieldValue.fromPb(rowPbIter.next(), schemaIter.next()));
row.add(FieldValue.fromPb(rowPbIter.next(), schemaIter.next(), useInt64Timestamps));
}
} else {
for (Object cellPb : rowPb) {
row.add(FieldValue.fromPb(cellPb, null));
row.add(FieldValue.fromPb(cellPb, null, useInt64Timestamps));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.bigquery;

import com.google.api.services.bigquery.model.DataFormatOptions;
import com.google.api.services.bigquery.model.QueryParameter;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
Expand All @@ -42,8 +43,9 @@ final class QueryRequestInfo {
private final Boolean useQueryCache;
private final Boolean useLegacySql;
private final JobCreationMode jobCreationMode;
private final DataFormatOptions formatOptions;

QueryRequestInfo(QueryJobConfiguration config) {
QueryRequestInfo(QueryJobConfiguration config, Boolean useInt64Timestamps) {
this.config = config;
this.connectionProperties = config.getConnectionProperties();
this.defaultDataset = config.getDefaultDataset();
Expand All @@ -58,6 +60,7 @@ final class QueryRequestInfo {
this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
this.jobCreationMode = config.getJobCreationMode();
this.formatOptions = new DataFormatOptions().setUseInt64Timestamp(useInt64Timestamps);
}

boolean isFastQuerySupported(JobId jobId) {
Expand Down Expand Up @@ -122,6 +125,9 @@ QueryRequest toPb() {
if (jobCreationMode != null) {
request.setJobCreationMode(jobCreationMode.toString());
}
if (formatOptions != null) {
request.setFormatOptions(formatOptions);
}
return request;
}

Expand All @@ -141,6 +147,7 @@ public String toString() {
.add("useQueryCache", useQueryCache)
.add("useLegacySql", useLegacySql)
.add("jobCreationMode", jobCreationMode)
.add("formatOptions", formatOptions.getUseInt64Timestamp())
.toString();
}

Expand All @@ -159,7 +166,8 @@ public int hashCode() {
createSession,
useQueryCache,
useLegacySql,
jobCreationMode);
jobCreationMode,
formatOptions);
}

@Override
Expand Down
Loading

0 comments on commit c0b874a

Please sign in to comment.