diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index e9adf4d70a1..0148e651884 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -266,8 +266,8 @@ public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch( String connector, int sourceFieldsNum, int sinkFieldsNum) { Map params = new HashMap<>(); params.put("connector", connector); - params.put("sourceFiledName", String.valueOf(sourceFieldsNum)); - params.put("sourceFiledType", String.valueOf(sinkFieldsNum)); + params.put("sourceFieldsNum", String.valueOf(sourceFieldsNum)); + params.put("sinkFieldsNum", String.valueOf(sinkFieldsNum)); return new SeaTunnelRuntimeException( WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java index 58cb3e053b7..c574b77e125 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java @@ -52,6 +52,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import lombok.extern.slf4j.Slf4j; + import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -62,6 +64,7 @@ import java.util.Map; /** Unit tests for {@link RowConverter} */ +@Slf4j public class RowConverterTest { private SeaTunnelRow seaTunnelRow; @@ -71,6 +74,7 @@ public class RowConverterTest { private SeaTunnelRowType seaTunnelRowType; private volatile boolean isCaseSensitive = false; + private volatile boolean subtractOneFiledInSource = false; private volatile int index = 0; private static final String[] filedNames = { "c_tinyint", @@ -89,6 +93,23 @@ public class RowConverterTest { "c_array" }; + public static final SeaTunnelDataType[] seaTunnelDataTypes = { + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(30, 8), + BasicType.STRING_TYPE, + PrimitiveByteArrayType.INSTANCE, + BasicType.BOOLEAN_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.STRING_ARRAY_TYPE + }; + public static final List KEY_NAME_LIST = Arrays.asList("c_tinyint"); public TableSchema getTableSchema(int decimalPrecision, int decimalScale) { @@ -139,7 +160,7 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) { @BeforeEach public void generateTestData() { - initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index); + initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index, subtractOneFiledInSource); byte tinyint = 1; short smallint = 2; int intNum = 3; @@ -216,34 +237,27 @@ public void generateTestData() { internalRow = binaryRow; } - private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int index) { - String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames, filedNames.length); + private void initSeaTunnelRowTypeCaseSensitive( + boolean isUpperCase, int index, boolean subtractOneFiledInSource) { + String[] oneUpperCaseFiledNames = + Arrays.copyOf( + filedNames, + subtractOneFiledInSource ? filedNames.length - 1 : filedNames.length); if (isUpperCase) { oneUpperCaseFiledNames[index] = oneUpperCaseFiledNames[index].toUpperCase(); } - seaTunnelRowType = - new SeaTunnelRowType( - oneUpperCaseFiledNames, - new SeaTunnelDataType[] { - BasicType.BYTE_TYPE, - BasicType.SHORT_TYPE, - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - new DecimalType(30, 8), - BasicType.STRING_TYPE, - PrimitiveByteArrayType.INSTANCE, - BasicType.BOOLEAN_TYPE, - LocalTimeType.LOCAL_DATE_TYPE, - LocalTimeType.LOCAL_DATE_TIME_TYPE, - new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), - ArrayType.STRING_ARRAY_TYPE - }); + SeaTunnelDataType[] newSeaTunnelDataTypes = + Arrays.copyOf( + seaTunnelDataTypes, + subtractOneFiledInSource + ? seaTunnelDataTypes.length - 1 + : filedNames.length); + seaTunnelRowType = new SeaTunnelRowType(oneUpperCaseFiledNames, newSeaTunnelDataTypes); } @Test public void seaTunnelToPaimon() { + TableSchema sinkTableSchema = getTableSchema(30, 8); SeaTunnelRuntimeException actualException = Assertions.assertThrows( SeaTunnelRuntimeException.class, @@ -259,9 +273,26 @@ public void seaTunnelToPaimon() { Assertions.assertEquals(exceptedException.getMessage(), actualException.getMessage()); InternalRow reconvert = - RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8)); + RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, sinkTableSchema); Assertions.assertEquals(reconvert, internalRow); + subtractOneFiledInSource = true; + generateTestData(); + SeaTunnelRuntimeException filedNumsActualException = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> + RowConverter.reconvert( + seaTunnelRow, seaTunnelRowType, sinkTableSchema)); + SeaTunnelRuntimeException filedNumsExceptException = + CommonError.writeRowErrorWithFiledsCountNotMatch( + "Paimon", + seaTunnelRowType.getTotalFields(), + sinkTableSchema.fields().size()); + Assertions.assertEquals( + filedNumsExceptException.getMessage(), filedNumsActualException.getMessage()); + + subtractOneFiledInSource = false; isCaseSensitive = true; for (int i = 0; i < filedNames.length; i++) { @@ -271,7 +302,6 @@ public void seaTunnelToPaimon() { DataType exceptDataType = RowTypeConverter.reconvert(sourceFiledname, seaTunnelRowType.getFieldType(i)); DataField exceptDataField = new DataField(i, sourceFiledname, exceptDataType); - TableSchema sinkTableSchema = getTableSchema(30, 8); SeaTunnelRuntimeException actualException1 = Assertions.assertThrows( SeaTunnelRuntimeException.class,