Skip to content

Commit

Permalink
[Improve][Jdbc] Merge user config primary key when create table (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Aug 7, 2024
1 parent 7c3cd99 commit 819c685
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand Down Expand Up @@ -200,6 +201,25 @@ public TableSink createSink(TableSinkFactoryContext context) {
.collect(Collectors.joining(",")));
}
}
} else {
// replace primary key to config
PrimaryKey configPk =
PrimaryKey.of(
catalogTable.getTablePath().getTableName() + "_config_pk",
config.get(PRIMARY_KEYS));
TableSchema tableSchema = catalogTable.getTableSchema();
catalogTable =
CatalogTable.of(
catalogTable.getTableId(),
TableSchema.builder()
.primaryKey(configPk)
.constraintKey(tableSchema.getConstraintKeys())
.columns(tableSchema.getColumns())
.build(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
// always execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class JdbcMysqlSaveModeHandlerIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
+ " `id` bigint(20) NOT NULL,\n"
+ " `c_bit_1` bit(1) DEFAULT NULL,\n"
+ " `c_bit_8` bit(8) DEFAULT NULL,\n"
+ " `c_bit_16` bit(16) DEFAULT NULL,\n"
Expand Down Expand Up @@ -164,6 +166,9 @@ void compareResult(String executeKey) {
final List<Column> columns = table.getTableSchema().getColumns();

Assertions.assertEquals(columns.size(), columnsSource.size());
Assertions.assertIterableEquals(
Collections.singletonList("id"),
table.getTableSchema().getPrimaryKey().getColumnNames());
}

@Override
Expand All @@ -175,6 +180,7 @@ String driverUrl() {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
"id",
"c_bit_1",
"c_bit_8",
"c_bit_16",
Expand Down Expand Up @@ -229,6 +235,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
(long) i,
i % 2 == 0 ? (byte) 1 : (byte) 0,
new byte[] {byteArr},
new byte[] {byteArr, byteArr},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ sink {
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "Abc!@#135_seatunnel"

generate_sink_sql = true
table = "test_laowang"
database = "seatunnel"
table = "test_laowang"
primary_keys = ["id"]

schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
Expand Down

0 comments on commit 819c685

Please sign in to comment.