Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Time supports default value #7639

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -451,6 +452,7 @@ public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) {
}

@Getter
@Setter
@AllArgsConstructor
public static class RowData implements Serializable {
static final String KEY_KIND = "kind";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.ArrayType;
Expand All @@ -25,8 +31,11 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
Expand All @@ -35,12 +44,24 @@
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;

import static org.apache.seatunnel.api.table.type.SqlType.TIME;

public class FakeDataGenerator {
private static final String CURRENT_DATE = "CURRENT_DATE";
private static final String CURRENT_TIME = "CURRENT_TIME";
private static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";

private final ObjectMapper OBJECTMAPPER = new ObjectMapper();

private final CatalogTable catalogTable;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;
Expand Down Expand Up @@ -92,7 +113,10 @@ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
// Use manual configuration data preferentially
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
SeaTunnelDataType<?>[] fieldTypes = catalogTable.getSeaTunnelRowType().getFieldTypes();
String[] fieldNames = catalogTable.getSeaTunnelRowType().getFieldNames();
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
customField(rowData, fieldTypes, fieldNames);
seaTunnelRows.add(convertRow(rowData));
}
} else {
Expand All @@ -103,6 +127,69 @@ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
return seaTunnelRows;
}

private void customField(
FakeConfig.RowData rowData, SeaTunnelDataType<?>[] fieldTypes, String[] fieldNames) {
if (rowData.getFieldsJson() == null) {
return;
}

try {
JsonNode jsonNode = OBJECTMAPPER.readTree(rowData.getFieldsJson());
int arity = fieldTypes.length;

for (int i = 0; i < arity; i++) {
SeaTunnelDataType<?> fieldType = fieldTypes[i];
JsonNode field = jsonNode.isArray() ? jsonNode.get(i) : jsonNode.get(fieldNames[i]);

if (field == null) {
continue;
}

String newValue = getNewValueForField(fieldType.getSqlType(), field.asText());
if (newValue != null) {
jsonNode = replaceFieldValue(jsonNode, i, fieldNames[i], newValue);
}
}

rowData.setFieldsJson(jsonNode.toString());
} catch (JsonProcessingException e) {
throw new FakeConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"The data type of the fake data is not supported",
e);
}
}

private String getNewValueForField(SqlType sqlType, String fieldValue) {
switch (sqlType) {
case TIME:
return fieldValue.equals(CURRENT_TIME) ? LocalTime.now().toString() : null;
case DATE:
return fieldValue.equalsIgnoreCase(CURRENT_DATE)
? LocalDate.now().toString()
: null;
case TIMESTAMP:
return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP)
? LocalDateTime.now().toString()
: null;
default:
return null;
}
}

private JsonNode replaceFieldValue(
JsonNode jsonNode, int index, String fieldName, String newValue) {
JsonNode newFieldNode = OBJECTMAPPER.convertValue(newValue, JsonNode.class);

if (jsonNode.isArray()) {
((ArrayNode) jsonNode).set(index, newFieldNode);
} else {
((ObjectNode) jsonNode).set(fieldName, newFieldNode);
}

return jsonNode;
}

@SuppressWarnings("magicnumber")
private Object randomColumnValue(Column column) {
SeaTunnelDataType<?> fieldType = column.getDataType();
Expand Down Expand Up @@ -152,11 +239,47 @@ private Object randomColumnValue(Column column) {
case BYTES:
return value(column, String::getBytes, fakeDataRandomUtils::randomBytes);
case DATE:
return value(column, String::toString, fakeDataRandomUtils::randomLocalDate);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_DATE)) {
return LocalDate.now();
}
DateTimeFormatter dateTimeFormatter =
DateUtils.matchDateFormatter(defaultValue);
return LocalDate.parse(
defaultValue,
dateTimeFormatter == null
? DateTimeFormatter.ISO_LOCAL_DATE
: dateTimeFormatter);
},
fakeDataRandomUtils::randomLocalDate);
case TIME:
return value(column, String::toString, fakeDataRandomUtils::randomLocalTime);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_TIME)) {
return LocalTime.now();
}
return LocalTime.parse(defaultValue, DateTimeFormatter.ISO_LOCAL_TIME);
},
fakeDataRandomUtils::randomLocalTime);
case TIMESTAMP:
return value(column, String::toString, fakeDataRandomUtils::randomLocalDateTime);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_TIMESTAMP)) {
return LocalDateTime.now();
}
DateTimeFormatter dateTimeFormatter =
DateTimeUtils.matchDateTimeFormatter(defaultValue);
return LocalDateTime.parse(
defaultValue,
dateTimeFormatter == null
? DateTimeFormatter.ISO_LOCAL_DATE_TIME
: dateTimeFormatter);
},
fakeDataRandomUtils::randomLocalDateTime);
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -41,6 +42,9 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,13 +177,17 @@ public void testColumnDataParse(String conf) throws FileNotFoundException, URISy
8, ((ByteBuffer) seaTunnelRow.getField(8)).capacity() / 2);
// VectorType.VECTOR_SPARSE_FLOAT_TYPE
Assertions.assertEquals(8, ((Map) seaTunnelRow.getField(9)).size());
Assertions.assertNotNull(seaTunnelRow.getField(10).toString());
Assertions.assertNotNull(seaTunnelRow.getField(11).toString());
Assertions.assertEquals(
268,
436,
seaTunnelRow.getBytesSize(
new SeaTunnelRowType(
new String[] {
"field1", "field2", "field3", "field4", "field5",
"field6", "field7", "field8", "field9", "field10"
"field6", "field7", "field8", "field9", "field10",
"field11", "field12", "field13", "field14",
"field15", "field16"
},
new SeaTunnelDataType<?>[] {
BasicType.STRING_TYPE,
Expand All @@ -191,11 +199,36 @@ public void testColumnDataParse(String conf) throws FileNotFoundException, URISy
VectorType.VECTOR_BINARY_TYPE,
VectorType.VECTOR_FLOAT16_TYPE,
VectorType.VECTOR_BFLOAT16_TYPE,
VectorType.VECTOR_SPARSE_FLOAT_TYPE
VectorType.VECTOR_SPARSE_FLOAT_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TYPE
})));
});
}

@ParameterizedTest
@ValueSource(strings = {"fake-data.schema.default.conf"})
public void testDataParse(String conf) throws FileNotFoundException, URISyntaxException {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
seaTunnelRows.forEach(
seaTunnelRow -> {
Assertions.assertInstanceOf(Long.class, seaTunnelRow.getField(0));
Assertions.assertInstanceOf(String.class, seaTunnelRow.getField(1));
Assertions.assertInstanceOf(Integer.class, seaTunnelRow.getField(2));
Assertions.assertInstanceOf(LocalDateTime.class, seaTunnelRow.getField(3));
Assertions.assertInstanceOf(LocalTime.class, seaTunnelRow.getField(4));
Assertions.assertInstanceOf(LocalDate.class, seaTunnelRow.getField(5));
});
}

private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,42 @@
type = sparse_float_vector
columnScale =8
comment = "vector"
},
{
name = book_publication_time
type = timestamp
defaultValue = "2024-09-12 15:45:30"
comment = "book publication time"
},
{
name = book_publication_time2
type = timestamp
defaultValue = CURRENT_TIMESTAMP
comment = "book publication time2"
},
{
name = book_publication_time3
type = time
defaultValue = "15:45:30"
comment = "book publication time3"
},
{
name = book_publication_time4
type = time
defaultValue = CURRENT_TIME
comment = "book publication time4"
},
{
name = book_publication_time5
type = date
defaultValue = "2024-09-12"
comment = "book publication time5"
},
{
name = book_publication_time6
type = date
defaultValue = CURRENT_DATE
comment = "book publication time6"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
time1 = timestamp
time2 = time
time3 = date
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
},
{
kind = INSERT
fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
},
{
kind = INSERT
fields = [3, "C", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
},
{
kind = DELETE
fields = [2, "B", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class JsonToRowConverters implements Serializable {
.toFormatter();

public static final String FORMAT = "Common";

/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;

Expand Down Expand Up @@ -264,7 +263,8 @@ private LocalDate convertToLocalDate(JsonNode jsonNode, String fieldName) {
}

private LocalTime convertToLocalTime(JsonNode jsonNode) {
TemporalAccessor parsedTime = TIME_FORMAT.parse(jsonNode.asText());
String localTime = jsonNode.asText();
TemporalAccessor parsedTime = TIME_FORMAT.parse(localTime);
corgy-w marked this conversation as resolved.
Show resolved Hide resolved
return parsedTime.query(TemporalQueries.localTime());
}

Expand Down
Loading