diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index 782a071d011..e9adf4d70a1 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -285,4 +285,12 @@ public static SeaTunnelRuntimeException formatDateError(String date, String fiel params.put("field", field); return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params); } + + public static SeaTunnelRuntimeException unsupportedMethod( + String identifier, String methodName) { + Map params = new HashMap<>(); + params.put("identifier", identifier); + params.put("methodName", methodName); + return new SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_METHOD, params); + } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 58939248482..79621c42168 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -77,6 +77,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { FORMAT_DATETIME_ERROR( "COMMON-33", "The datetime format '' of field '' is not supported. Please check the datetime format."), + UNSUPPORTED_METHOD("COMMON-34", "'' unsupported the method ''"), ; private final String code; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index a033d0eaac7..8d0301b492e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -44,6 +44,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; @@ -63,11 +65,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD; +@Slf4j public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); protected static final Set SYS_DATABASES = new HashSet<>(); + protected static final Set EXCLUDED_SCHEMAS = new HashSet<>(); protected final String catalogName; protected final String defaultDatabase; @@ -259,6 +264,10 @@ protected String getListDatabaseSql() { throw new UnsupportedOperationException(); } + protected String getDatabaseWithConditionSql(String databaseName) { + throw CommonError.unsupportedMethod(this.catalogName, "getDatabaseWithConditionSql"); + } + @Override public List listDatabases() throws CatalogException { try { @@ -277,15 +286,35 @@ public List listDatabases() throws CatalogException { @Override public boolean databaseExists(String databaseName) throws CatalogException { - checkArgument(StringUtils.isNotBlank(databaseName)); - - return listDatabases().contains(databaseName); + if (StringUtils.isBlank(databaseName)) { + return false; + } + if (SYS_DATABASES.contains(databaseName)) { + return false; + } + try { + return querySQLResultExists( + getUrlFromDatabaseName(databaseName), + getDatabaseWithConditionSql(databaseName)); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) { + log.warn( + "The catalog: {} is not supported the getDatabaseWithConditionSql for databaseExists", + this.catalogName); + return listDatabases().contains(databaseName); + } + throw e; + } } protected String getListTableSql(String databaseName) { throw new UnsupportedOperationException(); } + protected String getTableWithConditionSql(TablePath tablePath) { + throw CommonError.unsupportedMethod(this.catalogName, "getTableWithConditionSql"); + } + protected String getTableName(ResultSet rs) throws SQLException { String schemaName = rs.getString(1); String tableName = rs.getString(2); @@ -317,12 +346,28 @@ public List listTables(String databaseName) @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { + String databaseName = tablePath.getDatabaseName(); + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + try { + return querySQLResultExists( + this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath)); + } catch (SeaTunnelRuntimeException e1) { + if (e1.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) { + log.warn( + "The catalog: {} is not supported the getTableWithConditionSql for tableExists ", + this.catalogName); + try { + return databaseExists(tablePath.getDatabaseName()) + && listTables(tablePath.getDatabaseName()) + .contains(getTableName(tablePath)); + } catch (DatabaseNotExistException e2) { + return false; + } + } + throw e1; + } } @Override @@ -528,6 +573,17 @@ protected List queryString(String url, String sql, ResultSetConsumer listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java index 40f08dc50b5..02e58ea8573 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java @@ -57,7 +57,7 @@ public class IrisCatalog extends AbstractJdbcCatalog { private static final String LIST_TABLES_SQL_TEMPLATE = - "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW';"; + "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW'"; public IrisCatalog( String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) { @@ -101,13 +101,6 @@ protected String getTableName(ResultSet rs) throws SQLException { return schemaName + "." + tableName; } - // @Override - // protected String getSelectColumnsSql(TablePath tablePath) { - // return String.format( - // SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), - // tablePath.getTableName()); - // } - @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); @@ -144,12 +137,24 @@ public boolean databaseExists(String databaseName) throws CatalogException { @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getSchemaName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME = '%s'", + tablePath.getTableName()); + } + + @Override + protected String getUrlFromDatabaseName(String databaseName) { + return defaultUrl; } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 6b263b0fd46..e2df8ab24b9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -51,6 +51,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog { private static final String SELECT_COLUMNS_SQL_TEMPLATE = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; + private static final String SELECT_DATABASE_EXISTS = + "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; + + private static final String SELECT_TABLE_EXISTS = + "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; + static { SYS_DATABASES.add("information_schema"); SYS_DATABASES.add("mysql"); @@ -68,6 +74,17 @@ public MySqlCatalog( this.typeConverter = new MySqlTypeConverter(version); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(SELECT_DATABASE_EXISTS, databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SHOW DATABASES;"; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java index b4ece7db9c2..b98f4c4c2b2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java @@ -25,8 +25,6 @@ import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; @@ -34,9 +32,10 @@ public class OceanBaseOracleCatalog extends OracleCatalog { static { - EXCLUDED_SCHEMAS = - Collections.unmodifiableList( - Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS")); + EXCLUDED_SCHEMAS.add("oceanbase"); + EXCLUDED_SCHEMAS.add("LBACSYS"); + EXCLUDED_SCHEMAS.add("ORAAUDITOR"); + EXCLUDED_SCHEMAS.add("SYS"); } public OceanBaseOracleCatalog( @@ -53,6 +52,21 @@ protected String getListDatabaseSql() { throw new UnsupportedOperationException(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { + return false; + } + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + @Override public List listTables(String databaseName) throws CatalogException, DatabaseNotExistException { @@ -65,15 +79,6 @@ public List listTables(String databaseName) } } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index b51369e3f58..1430cb387af 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -46,7 +42,7 @@ @Slf4j public class OracleCatalog extends AbstractJdbcCatalog { - protected static List EXCLUDED_SCHEMAS = + protected static List EXCLUDED_SCHEMAS_ALL = Collections.unmodifiableList( Arrays.asList( "APPQOSSYS", @@ -101,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog { + "ORDER BY \n" + " cols.column_id \n"; + static { + EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL); + } + public OracleCatalog( String catalogName, String username, @@ -110,6 +110,21 @@ public OracleCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return getListTableSql(tablePath.getDatabaseName()) + + " and OWNER = '" + + tablePath.getSchemaName() + + "' and table_name = '" + + tablePath.getTableName() + + "'"; + } + @Override protected String getListDatabaseSql() { return "SELECT name FROM v$database"; @@ -191,20 +206,6 @@ protected String getOptionTableName(TablePath tablePath) { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index 4697d1999ef..d5261e16d59 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,7 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -104,14 +102,28 @@ public PostgresCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = '%s' and table_name= '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { - return "select datname from pg_database;"; + return "select datname from pg_database"; } @Override protected String getListTableSql(String databaseName) { - return "SELECT table_schema, table_name FROM information_schema.tables;"; + return "SELECT table_schema, table_name FROM information_schema.tables"; } @Override @@ -231,21 +243,6 @@ protected void dropDatabaseInternal(String databaseName) throws CatalogException super.dropDatabaseInternal(databaseName); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - - return listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java index 7b29bbb8ea6..064b2473371 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,23 +30,17 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class RedshiftCatalog extends AbstractJdbcCatalog { - protected static final Set EXCLUDED_SCHEMAS = new HashSet<>(4); - private final String SELECT_COLUMNS = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC"; @@ -80,6 +73,20 @@ public RedshiftCatalog( this.connectionMap = new ConcurrentHashMap<>(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = '%s' and table_name = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override public void close() throws CatalogException { for (Map.Entry entry : connectionMap.entrySet()) { @@ -95,12 +102,12 @@ public void close() throws CatalogException { @Override protected String getListDatabaseSql() { - return "select datname from pg_database;"; + return "select datname from pg_database"; } @Override protected String getListTableSql(String databaseName) { - return "SELECT table_schema, table_name FROM information_schema.tables;"; + return "SELECT table_schema, table_name FROM information_schema.tables"; } @Override @@ -144,21 +151,6 @@ protected String getDropDatabaseSql(String databaseName) { return String.format("DROP DATABASE `%s`;", databaseName); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } - return listTables(defaultDatabase) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override protected String getSelectColumnsSql(TablePath tablePath) { return String.format(SELECT_COLUMNS, tablePath.getSchemaName(), tablePath.getTableName()); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java index df6f4b3c248..19b8f668af9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java @@ -22,8 +22,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,8 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -113,6 +109,18 @@ public SapHanaCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where SCHEMA_NAME = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + " and TABLE_NAME = '%s'", + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT SCHEMA_NAME FROM SCHEMAS"; @@ -203,20 +211,6 @@ protected String getOptionTableName(TablePath tablePath) { return tablePath.getTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 55660b36a2c..e4c63515220 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -69,6 +68,20 @@ public SqlServerCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " and TABLE_SCHEMA = '%s' and TABLE_NAME = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT NAME FROM sys.databases"; @@ -147,20 +160,6 @@ protected String getUrlFromDatabaseName(String databaseName) { return baseUrl + ";databaseName=" + databaseName + ";" + suffix; } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java index 462e109c76a..a0b28e49abd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -128,6 +124,20 @@ public XuguCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where DB_NAME = '%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where user_name = '%s' and table_name = '%s'", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT DB_NAME FROM dba_databases"; @@ -210,20 +220,6 @@ protected String getOptionTableName(TablePath tablePath) { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java index daf87b3693a..bc89d4c8c39 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; @@ -39,7 +40,8 @@ class MySqlCatalogTest { static JdbcUrlUtil.UrlInfo sqlParse = SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB"); static JdbcUrlUtil.UrlInfo MysqlUrlInfo = - JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false"); + JdbcUrlUtil.getUrlInfo( + "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&allowPublicKeyRetrieval=true"); static JdbcUrlUtil.UrlInfo pg = JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest"); static TablePath tablePathSQL; @@ -74,13 +76,22 @@ static void before() { tablePathPG = TablePath.of(databaseName, "pg_to_mysql"); tablePathOracle = TablePath.of(databaseName, "oracle_to_mysql"); sqlServerCatalog = new SqlServerCatalog("sqlserver", "sa", "root@123", sqlParse, null); - mySqlCatalog = new MySqlCatalog("mysql", "root", "root@123", MysqlUrlInfo); + mySqlCatalog = new MySqlCatalog("mysql", "root", "123456", MysqlUrlInfo); postgresCatalog = new PostgresCatalog("postgres", "postgres", "postgres", pg, null); mySqlCatalog.open(); sqlServerCatalog.open(); postgresCatalog.open(); } + @Test + void exists() { + Assertions.assertTrue(mySqlCatalog.databaseExists("test")); + Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", "MY_TABLE"))); + Assertions.assertTrue(mySqlCatalog.tableExists(TablePath.of("test", "my_table"))); + Assertions.assertFalse(mySqlCatalog.tableExists(TablePath.of("test", "test"))); + Assertions.assertFalse(mySqlCatalog.databaseExists("mysql")); + } + @Test @Order(1) void getTable() { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java index 1c5fb5a2b22..75b22ec24dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalogTest.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -27,17 +29,24 @@ @Disabled("Please Test it in your local environment") class OracleCatalogTest { - @Test - void testCatalog() { - OracleCatalog catalog = + + static OracleCatalog catalog; + + @BeforeAll + static void before() { + catalog = new OracleCatalog( "oracle", - "test", - "oracle", - OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521:xe"), + "c##gguser", + "testdb", + OracleURLParser.parse("jdbc:oracle:thin:@127.0.0.1:1521/CDC_PDB"), null); catalog.open(); + } + + @Test + void testCatalog() { List strings = catalog.listDatabases(); @@ -45,4 +54,16 @@ void testCatalog() { catalog.createTable(new TablePath("XE", "TEST", "TEST003"), table, false); } + + @Test + void exist() { + Assertions.assertTrue(catalog.databaseExists("ORCLCDB")); + Assertions.assertTrue(catalog.tableExists(TablePath.of("ORCLCDB", "C##GGUSER", "myTable"))); + Assertions.assertFalse(catalog.databaseExists("ORCL")); + Assertions.assertTrue( + catalog.tableExists( + TablePath.of("ORCLCDB", "CDC_PDB", "ads_index_public_health_data"))); + Assertions.assertTrue( + catalog.tableExists(TablePath.of("ORCLCDB", "CDC_PDB", "ADS_INDEX_DISEASE_DATA"))); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java index c04c1941b0b..05a013ef691 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalogTest.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -31,15 +33,23 @@ @Slf4j class PostgresCatalogTest { - @Test - void testCatalog() { - JdbcUrlUtil.UrlInfo urlInfo = - JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/liulitest"); - PostgresCatalog catalog = - new PostgresCatalog("postgres", "postgres", "postgres", urlInfo, null); + static PostgresCatalog catalog; + + @BeforeAll + static void before() { + catalog = + new PostgresCatalog( + "postgres", + "pg", + "pg#2024", + JdbcUrlUtil.getUrlInfo("jdbc:postgresql://127.0.0.1:5432/postgres"), + null); catalog.open(); + } + @Test + void testCatalog() { MySqlCatalog mySqlCatalog = new MySqlCatalog( "mysql", @@ -59,4 +69,14 @@ void testCatalog() { catalog.createTable( new TablePath("liulitest", "public", "all_types_table_02"), table, false); } + + @Test + void exists() { + Assertions.assertFalse(catalog.databaseExists("postgres")); + Assertions.assertFalse( + catalog.tableExists(TablePath.of("postgres", "pg_catalog", "pg_aggregate"))); + Assertions.assertTrue(catalog.databaseExists("zdykdb")); + Assertions.assertTrue( + catalog.tableExists(TablePath.of("zdykdb", "pg_catalog", "pg_class"))); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java index ea305ca0c1f..a18cc4abd9d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; @@ -38,7 +39,7 @@ class SqlServerCatalogTest { static JdbcUrlUtil.UrlInfo sqlParse = - SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1434;database=TestDB"); + SqlServerURLParser.parse("jdbc:sqlserver://127.0.0.1:1433;database=master"); static JdbcUrlUtil.UrlInfo MysqlUrlInfo = JdbcUrlUtil.getUrlInfo("jdbc:mysql://127.0.0.1:33061/liuliTest?useSSL=false"); static JdbcUrlUtil.UrlInfo pg = @@ -84,9 +85,14 @@ void listTables() { } @Test - void tableExists() { - - // boolean b = sqlServerCatalog.tableExists(tablePath); + void exists() { + Assertions.assertTrue(sqlServerCatalog.databaseExists("master")); + Assertions.assertTrue( + sqlServerCatalog.tableExists( + TablePath.of("master", "dbo", "MSreplication_options"))); + Assertions.assertTrue( + sqlServerCatalog.tableExists(TablePath.of("master", "dbo", "spt_fallback_db"))); + Assertions.assertFalse(sqlServerCatalog.tableExists(TablePath.of("master", "dbo", "xxx"))); } @Test diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql index a9b02e2ae3a..8c624959f87 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql @@ -55,10 +55,10 @@ CREATE TABLE sink_table WITH ( 'user' = 'root', 'password' = 'Abc!@#135_seatunnel', 'generate_sink_sql' = 'true', - 'database' = 'sink' + 'database' = 'sink', 'table' = '${table_name}' ); -- If it's multi-table synchronization, there's no need to set select columns. -- You can directly use the syntax 'INSERT INTO sink_table SELECT source_table'. -INSERT INTO sink_table SELECT source_table; \ No newline at end of file +INSERT INTO sink_table SELECT source_table;