Skip to content

Commit

Permalink
[Hotfix][Seatunnel-common] Fix the CommonError msg for paimon sink (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai authored Sep 6, 2024
1 parent f7286b7 commit d1f5db9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch(
String connector, int sourceFieldsNum, int sinkFieldsNum) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
params.put("sourceFieldsNum", String.valueOf(sourceFieldsNum));
params.put("sinkFieldsNum", String.valueOf(sinkFieldsNum));
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -62,6 +64,7 @@
import java.util.Map;

/** Unit tests for {@link RowConverter} */
@Slf4j
public class RowConverterTest {

private SeaTunnelRow seaTunnelRow;
Expand All @@ -71,6 +74,7 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;

private volatile boolean isCaseSensitive = false;
private volatile boolean subtractOneFiledInSource = false;
private volatile int index = 0;
private static final String[] filedNames = {
"c_tinyint",
Expand All @@ -89,6 +93,23 @@ public class RowConverterTest {
"c_array"
};

public static final SeaTunnelDataType<?>[] seaTunnelDataTypes = {
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(30, 8),
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
};

public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint");

public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
Expand Down Expand Up @@ -139,7 +160,7 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {

@BeforeEach
public void generateTestData() {
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index);
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index, subtractOneFiledInSource);
byte tinyint = 1;
short smallint = 2;
int intNum = 3;
Expand Down Expand Up @@ -216,34 +237,27 @@ public void generateTestData() {
internalRow = binaryRow;
}

private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int index) {
String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames, filedNames.length);
private void initSeaTunnelRowTypeCaseSensitive(
boolean isUpperCase, int index, boolean subtractOneFiledInSource) {
String[] oneUpperCaseFiledNames =
Arrays.copyOf(
filedNames,
subtractOneFiledInSource ? filedNames.length - 1 : filedNames.length);
if (isUpperCase) {
oneUpperCaseFiledNames[index] = oneUpperCaseFiledNames[index].toUpperCase();
}
seaTunnelRowType =
new SeaTunnelRowType(
oneUpperCaseFiledNames,
new SeaTunnelDataType<?>[] {
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(30, 8),
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
});
SeaTunnelDataType<?>[] newSeaTunnelDataTypes =
Arrays.copyOf(
seaTunnelDataTypes,
subtractOneFiledInSource
? seaTunnelDataTypes.length - 1
: filedNames.length);
seaTunnelRowType = new SeaTunnelRowType(oneUpperCaseFiledNames, newSeaTunnelDataTypes);
}

@Test
public void seaTunnelToPaimon() {
TableSchema sinkTableSchema = getTableSchema(30, 8);
SeaTunnelRuntimeException actualException =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
Expand All @@ -259,9 +273,26 @@ public void seaTunnelToPaimon() {
Assertions.assertEquals(exceptedException.getMessage(), actualException.getMessage());

InternalRow reconvert =
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8));
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, sinkTableSchema);
Assertions.assertEquals(reconvert, internalRow);

subtractOneFiledInSource = true;
generateTestData();
SeaTunnelRuntimeException filedNumsActualException =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() ->
RowConverter.reconvert(
seaTunnelRow, seaTunnelRowType, sinkTableSchema));
SeaTunnelRuntimeException filedNumsExceptException =
CommonError.writeRowErrorWithFiledsCountNotMatch(
"Paimon",
seaTunnelRowType.getTotalFields(),
sinkTableSchema.fields().size());
Assertions.assertEquals(
filedNumsExceptException.getMessage(), filedNumsActualException.getMessage());

subtractOneFiledInSource = false;
isCaseSensitive = true;

for (int i = 0; i < filedNames.length; i++) {
Expand All @@ -271,7 +302,6 @@ public void seaTunnelToPaimon() {
DataType exceptDataType =
RowTypeConverter.reconvert(sourceFiledname, seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFiledname, exceptDataType);
TableSchema sinkTableSchema = getTableSchema(30, 8);
SeaTunnelRuntimeException actualException1 =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
Expand Down

0 comments on commit d1f5db9

Please sign in to comment.