Skip to content

Commit

Permalink
[Improve][Connector-v2] Optimize the way of databases and tables are …
Browse files Browse the repository at this point in the history
…checked for existence (#7261)
  • Loading branch information
dailai authored Jul 29, 2024
1 parent 4897491 commit f012b2a
Show file tree
Hide file tree
Showing 18 changed files with 309 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,12 @@ public static SeaTunnelRuntimeException formatDateError(String date, String fiel
params.put("field", field);
return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params);
}

public static SeaTunnelRuntimeException unsupportedMethod(
String identifier, String methodName) {
Map<String, String> params = new HashMap<>();
params.put("identifier", identifier);
params.put("methodName", methodName);
return new SeaTunnelRuntimeException(CommonErrorCode.UNSUPPORTED_METHOD, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
FORMAT_DATETIME_ERROR(
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."),
UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method '<methodName>'"),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
Expand All @@ -63,11 +65,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;

@Slf4j
public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);

protected static final Set<String> SYS_DATABASES = new HashSet<>();
protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>();

protected final String catalogName;
protected final String defaultDatabase;
Expand Down Expand Up @@ -259,6 +264,10 @@ protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

protected String getDatabaseWithConditionSql(String databaseName) {
throw CommonError.unsupportedMethod(this.catalogName, "getDatabaseWithConditionSql");
}

@Override
public List<String> listDatabases() throws CatalogException {
try {
Expand All @@ -277,15 +286,35 @@ public List<String> listDatabases() throws CatalogException {

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
checkArgument(StringUtils.isNotBlank(databaseName));

return listDatabases().contains(databaseName);
if (StringUtils.isBlank(databaseName)) {
return false;
}
if (SYS_DATABASES.contains(databaseName)) {
return false;
}
try {
return querySQLResultExists(
getUrlFromDatabaseName(databaseName),
getDatabaseWithConditionSql(databaseName));
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
log.warn(
"The catalog: {} is not supported the getDatabaseWithConditionSql for databaseExists",
this.catalogName);
return listDatabases().contains(databaseName);
}
throw e;
}
}

protected String getListTableSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getTableWithConditionSql(TablePath tablePath) {
throw CommonError.unsupportedMethod(this.catalogName, "getTableWithConditionSql");
}

protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
Expand Down Expand Up @@ -317,12 +346,28 @@ public List<String> listTables(String databaseName)

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
String databaseName = tablePath.getDatabaseName();
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
try {
return querySQLResultExists(
this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath));
} catch (SeaTunnelRuntimeException e1) {
if (e1.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
log.warn(
"The catalog: {} is not supported the getTableWithConditionSql for tableExists ",
this.catalogName);
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(getTableName(tablePath));
} catch (DatabaseNotExistException e2) {
return false;
}
}
throw e1;
}
}

@Override
Expand Down Expand Up @@ -528,6 +573,17 @@ protected List<String> queryString(String url, String sql, ResultSetConsumer<Str
}
}

protected boolean querySQLResultExists(String dbUrl, String sql) {
try (PreparedStatement stmt = getConnection(dbUrl).prepareStatement(sql)) {
try (ResultSet rs = stmt.executeQuery()) {
return rs.next();
}
} catch (Exception e) {
log.info("query exists error", e);
return false;
}
}

// If sql is DDL, the execute() method always returns false, so the return value
// should not be used to determine whether changes were made in database.
protected boolean executeInternal(String url, String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand Down Expand Up @@ -70,6 +68,20 @@ public DamengCatalog(
super(catalogName, username, pwd, urlInfo, defaultSchema);
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(getListDatabaseSql() + " where name = '%s'", databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getDatabaseName())
+ " where OWNER = '%s' and TABLE_NAME = '%s'",
tablePath.getSchemaName(),
tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
Expand Down Expand Up @@ -145,20 +157,6 @@ protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}
return listTables().contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
public class IrisCatalog extends AbstractJdbcCatalog {

private static final String LIST_TABLES_SQL_TEMPLATE =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW';";
"SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW'";

public IrisCatalog(
String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) {
Expand Down Expand Up @@ -101,13 +101,6 @@ protected String getTableName(ResultSet rs) throws SQLException {
return schemaName + "." + tableName;
}

// @Override
// protected String getSelectColumnsSql(TablePath tablePath) {
// return String.format(
// SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
// tablePath.getTableName());
// }

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
Expand Down Expand Up @@ -144,12 +137,24 @@ public boolean databaseExists(String databaseName) throws CatalogException {

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return listTables(tablePath.getSchemaName())
.contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME = '%s'",
tablePath.getTableName());
}

@Override
protected String getUrlFromDatabaseName(String databaseName) {
return defaultUrl;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
private static final String SELECT_COLUMNS_SQL_TEMPLATE =
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";

private static final String SELECT_DATABASE_EXISTS =
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";

private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

static {
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
Expand All @@ -68,6 +74,17 @@ public MySqlCatalog(
this.typeConverter = new MySqlTypeConverter(version);
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(SELECT_DATABASE_EXISTS, databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
return "SHOW DATABASES;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

public class OceanBaseOracleCatalog extends OracleCatalog {

static {
EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS"));
EXCLUDED_SCHEMAS.add("oceanbase");
EXCLUDED_SCHEMAS.add("LBACSYS");
EXCLUDED_SCHEMAS.add("ORAAUDITOR");
EXCLUDED_SCHEMAS.add("SYS");
}

public OceanBaseOracleCatalog(
Expand All @@ -53,6 +52,21 @@ protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
throw new UnsupportedOperationException();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
Expand All @@ -65,15 +79,6 @@ public List<String> listTables(String databaseName)
}
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
return false;
}
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Expand Down
Loading

0 comments on commit f012b2a

Please sign in to comment.