From dff0756e82293cc2c361ea87528ce39df27f0913 Mon Sep 17 00:00:00 2001 From: dailai Date: Fri, 26 Jul 2024 09:53:13 +0800 Subject: [PATCH] [Improve][Connector-v2] Optimize all the jdbc catalog for the check about db or tb exists --- .../jdbc/catalog/AbstractJdbcCatalog.java | 36 ++++++++++++----- .../jdbc/catalog/dm/DamengCatalog.java | 29 ++++++-------- .../jdbc/catalog/iris/IrisCatalog.java | 27 ++++++++----- .../jdbc/catalog/mysql/MySqlCatalog.java | 30 ++++---------- .../oceanbase/OceanBaseOracleCatalog.java | 33 ++++++++------- .../jdbc/catalog/oracle/OracleCatalog.java | 40 +++++++------------ .../jdbc/catalog/psql/PostgresCatalog.java | 26 +++--------- .../catalog/redshift/RedshiftCatalog.java | 36 +++++++---------- .../jdbc/catalog/saphana/SapHanaCatalog.java | 30 ++++++-------- .../catalog/sqlserver/SqlServerCatalog.java | 22 +++------- .../jdbc/catalog/xugu/XuguCatalog.java | 32 +++++++-------- 11 files changed, 147 insertions(+), 194 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index 334e734e5453..a95bbbb3b51e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -71,6 +71,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); protected static final Set SYS_DATABASES = new HashSet<>(); + protected static final Set EXCLUDED_SCHEMAS = new HashSet<>(); protected final String catalogName; protected final String defaultDatabase; @@ -262,6 +263,10 @@ protected String getListDatabaseSql() { throw new UnsupportedOperationException(); } + protected String getDatabaseWithConditionSql(String databaseName) { + throw new UnsupportedOperationException(); + } + @Override public List listDatabases() throws CatalogException { try { @@ -280,15 +285,24 @@ public List listDatabases() throws CatalogException { @Override public boolean databaseExists(String databaseName) throws CatalogException { - checkArgument(StringUtils.isNotBlank(databaseName)); - - return listDatabases().contains(databaseName); + if (StringUtils.isBlank(databaseName)) { + return false; + } + if (!SYS_DATABASES.isEmpty() && SYS_DATABASES.contains(databaseName)) { + return false; + } + return queryExists( + getUrlFromDatabaseName(databaseName), getDatabaseWithConditionSql(databaseName)); } protected String getListTableSql(String databaseName) { throw new UnsupportedOperationException(); } + protected String getTableWithConditionSql(TablePath tablePath) { + throw new UnsupportedOperationException(); + } + protected String getTableName(ResultSet rs) throws SQLException { String schemaName = rs.getString(1); String tableName = rs.getString(2); @@ -320,12 +334,15 @@ public List listTables(String databaseName) @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { + String databaseName = tablePath.getDatabaseName(); + if (!databaseExists(databaseName)) { + return false; + } + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + return queryExists( + this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath)); } @Override @@ -531,11 +548,8 @@ protected List queryString(String url, String sql, ResultSetConsumer listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java index 40f08dc50b59..537dd64ed1f0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java @@ -101,13 +101,6 @@ protected String getTableName(ResultSet rs) throws SQLException { return schemaName + "." + tableName; } - // @Override - // protected String getSelectColumnsSql(TablePath tablePath) { - // return String.format( - // SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), - // tablePath.getTableName()); - // } - @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); @@ -144,12 +137,24 @@ public boolean databaseExists(String databaseName) throws CatalogException { @Override public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getSchemaName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { return false; } + return queryExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME= ?", + tablePath.getTableName()); + } + + @Override + protected String getUrlFromDatabaseName(String databaseName) { + return defaultUrl; } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index ed7da99ca58a..e2df8ab24b90 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -32,8 +32,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion; -import org.apache.commons.lang3.StringUtils; - import com.google.common.base.Preconditions; import com.mysql.cj.MysqlType; import lombok.extern.slf4j.Slf4j; @@ -54,10 +52,10 @@ public class MySqlCatalog extends AbstractJdbcCatalog { "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC"; private static final String SELECT_DATABASE_EXISTS = - "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = ?"; + "SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'"; private static final String SELECT_TABLE_EXISTS = - "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = ? AND table_name = ?"; + "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'"; static { SYS_DATABASES.add("information_schema"); @@ -77,28 +75,14 @@ public MySqlCatalog( } @Override - public boolean databaseExists(String databaseName) throws CatalogException { - if (StringUtils.isBlank(databaseName)) { - return false; - } - if (SYS_DATABASES.contains(databaseName.toLowerCase())) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), SELECT_DATABASE_EXISTS, databaseName); + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(SELECT_DATABASE_EXISTS, databaseName); } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - String databaseName = tablePath.getDatabaseName(); - if (!databaseExists(databaseName)) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - SELECT_TABLE_EXISTS, - databaseName, - tablePath.getTableName()); + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName()); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java index b4ece7db9c26..e12246b387a5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java @@ -25,8 +25,6 @@ import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; @@ -34,9 +32,10 @@ public class OceanBaseOracleCatalog extends OracleCatalog { static { - EXCLUDED_SCHEMAS = - Collections.unmodifiableList( - Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS")); + EXCLUDED_SCHEMAS.add("oceanbase"); + EXCLUDED_SCHEMAS.add("LBACSYS"); + EXCLUDED_SCHEMAS.add("ORAAUDITOR"); + EXCLUDED_SCHEMAS.add("SYS"); } public OceanBaseOracleCatalog( @@ -53,6 +52,21 @@ protected String getListDatabaseSql() { throw new UnsupportedOperationException(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) { + return false; + } + return queryExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } + @Override public List listTables(String databaseName) throws CatalogException, DatabaseNotExistException { @@ -65,15 +79,6 @@ public List listTables(String databaseName) } } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 9481d2d839cf..2b56dd840c75 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -29,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -45,7 +42,7 @@ @Slf4j public class OracleCatalog extends AbstractJdbcCatalog { - protected static List EXCLUDED_SCHEMAS = + protected static List EXCLUDED_SCHEMAS_ALL = Collections.unmodifiableList( Arrays.asList( "APPQOSSYS", @@ -100,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog { + "ORDER BY \n" + " cols.column_id \n"; + static { + EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL); + } + public OracleCatalog( String catalogName, String username, @@ -110,30 +111,19 @@ public OracleCatalog( } @Override - public boolean databaseExists(String databaseName) throws CatalogException { - if (StringUtils.isBlank(databaseName)) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListDatabaseSql() + " where name=?", - databaseName); + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - String databaseName = tablePath.getDatabaseName(); - if (!databaseExists(databaseName)) { - return false; - } - if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName().toUpperCase())) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListTableSql(databaseName) + " and OWNER= ? and table_name = ?", - tablePath.getSchemaName(), - tablePath.getTableName()); + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " and OWNER= '" + + tablePath.getSchemaName() + + "' and table_name = '" + + tablePath.getTableName() + + "'"); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java index 1d391f064685..023400a182e0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java @@ -29,7 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -104,28 +103,15 @@ public PostgresCatalog( } @Override - public boolean databaseExists(String databaseName) throws CatalogException { - if (StringUtils.isBlank(databaseName)) { - return false; - } - if (SYS_DATABASES.contains(databaseName.toLowerCase())) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListDatabaseSql() + " where datname=?", - databaseName); + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname='%s'", databaseName); } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - String databaseName = tablePath.getDatabaseName(); - if (!databaseExists(databaseName)) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListTableSql(databaseName) + " where table_schema = ? and table_name= ?", + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = ? and table_name= ?", tablePath.getSchemaName(), tablePath.getTableName()); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java index 7b29bbb8ea6f..5d9d6f7c8ebf 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftCatalog.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,23 +30,17 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class RedshiftCatalog extends AbstractJdbcCatalog { - protected static final Set EXCLUDED_SCHEMAS = new HashSet<>(4); - private final String SELECT_COLUMNS = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ordinal_position ASC"; @@ -80,6 +73,20 @@ public RedshiftCatalog( this.connectionMap = new ConcurrentHashMap<>(); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where datname='%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " where table_schema = ? and table_name= ?", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override public void close() throws CatalogException { for (Map.Entry entry : connectionMap.entrySet()) { @@ -144,21 +151,6 @@ protected String getDropDatabaseSql(String databaseName) { return String.format("DROP DATABASE `%s`;", databaseName); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } - return listTables(defaultDatabase) - .contains(tablePath.getSchemaAndTableName().toLowerCase()); - } catch (DatabaseNotExistException e) { - return false; - } - } - @Override protected String getSelectColumnsSql(TablePath tablePath) { return String.format(SELECT_COLUMNS, tablePath.getSchemaName(), tablePath.getTableName()); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java index df6f4b3c2487..a09fbe269d8c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java @@ -22,8 +22,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -31,8 +29,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -113,6 +109,18 @@ public SapHanaCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where SCHEMA_NAME=?", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + " and TABLE_NAME=?", + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT SCHEMA_NAME FROM SCHEMAS"; @@ -203,20 +211,6 @@ protected String getOptionTableName(TablePath tablePath) { return tablePath.getTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List listTables() { List databases = listDatabases(); return listTables(databases.get(0)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 687666ae151f..ac277399d49f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -69,25 +69,15 @@ public SqlServerCatalog( } @Override - public boolean databaseExists(String databaseName) throws CatalogException { - if (StringUtils.isBlank(databaseName)) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListDatabaseSql() + " where name=?", - databaseName); + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where name='%s'", databaseName); } @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - String databaseName = tablePath.getDatabaseName(); - if (!databaseExists(databaseName)) { - return false; - } - return queryExists( - this.getUrlFromDatabaseName(databaseName), - getListTableSql(databaseName) + " and TABLE_SCHEMA= ? and TABLE_NAME = ?", + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " and TABLE_SCHEMA= ? and TABLE_NAME = ?", tablePath.getSchemaName(), tablePath.getTableName()); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java index 462e109c76a6..76a81e5e51ab 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -30,8 +28,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper; -import org.apache.commons.lang3.StringUtils; - import lombok.extern.slf4j.Slf4j; import java.sql.Connection; @@ -128,6 +124,20 @@ public XuguCatalog( super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected String getDatabaseWithConditionSql(String databaseName) { + return String.format(getListDatabaseSql() + " where DB_NAME='%s'", databaseName); + } + + @Override + protected String getTableWithConditionSql(TablePath tablePath) { + return String.format( + getListTableSql(tablePath.getDatabaseName()) + + " and user_name= ? and table_name = ?", + tablePath.getSchemaName(), + tablePath.getTableName()); + } + @Override protected String getListDatabaseSql() { return "SELECT DB_NAME FROM dba_databases"; @@ -210,20 +220,6 @@ protected String getOptionTableName(TablePath tablePath) { return tablePath.getSchemaAndTableName(); } - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } - return listTables().contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - private List listTables() { List databases = listDatabases(); return listTables(databases.get(0));