Skip to content

Commit

Permalink
[Improve][Connector-v2] Optimize all the jdbc catalog for the check a…
Browse files Browse the repository at this point in the history
…bout db or tb exists
  • Loading branch information
dailai committed Jul 26, 2024
1 parent e808163 commit dff0756
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public abstract class AbstractJdbcCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);

protected static final Set<String> SYS_DATABASES = new HashSet<>();
protected static final Set<String> EXCLUDED_SCHEMAS = new HashSet<>();

protected final String catalogName;
protected final String defaultDatabase;
Expand Down Expand Up @@ -262,6 +263,10 @@ protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

protected String getDatabaseWithConditionSql(String databaseName) {
throw new UnsupportedOperationException();
}

@Override
public List<String> listDatabases() throws CatalogException {
try {
Expand All @@ -280,15 +285,24 @@ public List<String> listDatabases() throws CatalogException {

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
checkArgument(StringUtils.isNotBlank(databaseName));

return listDatabases().contains(databaseName);
if (StringUtils.isBlank(databaseName)) {
return false;
}
if (!SYS_DATABASES.isEmpty() && SYS_DATABASES.contains(databaseName)) {
return false;
}
return queryExists(
getUrlFromDatabaseName(databaseName), getDatabaseWithConditionSql(databaseName));
}

protected String getListTableSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getTableWithConditionSql(TablePath tablePath) {
throw new UnsupportedOperationException();
}

protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
Expand Down Expand Up @@ -320,12 +334,15 @@ public List<String> listTables(String databaseName)

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
String databaseName = tablePath.getDatabaseName();
if (!databaseExists(databaseName)) {
return false;
}
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(databaseName), getTableWithConditionSql(tablePath));
}

@Override
Expand Down Expand Up @@ -531,11 +548,8 @@ protected List<String> queryString(String url, String sql, ResultSetConsumer<Str
}
}

protected boolean queryExists(String dbUrl, String sql, String... parmas) {
protected boolean queryExists(String dbUrl, String sql) {
try (PreparedStatement stmt = getConnection(dbUrl).prepareStatement(sql)) {
for (int i = 0; i < parmas.length; i++) {
stmt.setString(i + 1, parmas[i]);
}
try (ResultSet rs = stmt.executeQuery()) {
return rs.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand Down Expand Up @@ -70,6 +68,19 @@ public DamengCatalog(
super(catalogName, username, pwd, urlInfo, defaultSchema);
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(getListDatabaseSql() + " where name='%s'", databaseName);
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getDatabaseName()) + " where OWNER = ? and TABLE_NAME= ?",
tablePath.getSchemaName(),
tablePath.getTableName());
}

@Override
protected String getListDatabaseSql() {
return "SELECT name FROM v$database";
Expand Down Expand Up @@ -145,20 +156,6 @@ protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}
return listTables().contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,6 @@ protected String getTableName(ResultSet rs) throws SQLException {
return schemaName + "." + tableName;
}

// @Override
// protected String getSelectColumnsSql(TablePath tablePath) {
// return String.format(
// SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(),
// tablePath.getTableName());
// }

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
Expand Down Expand Up @@ -144,12 +137,24 @@ public boolean databaseExists(String databaseName) throws CatalogException {

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return listTables(tablePath.getSchemaName())
.contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
}

@Override
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getSchemaName()) + " and TABLE_NAME= ?",
tablePath.getTableName());
}

@Override
protected String getUrlFromDatabaseName(String databaseName) {
return defaultUrl;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;
import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -54,10 +52,10 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME ='%s' ORDER BY ORDINAL_POSITION ASC";

private static final String SELECT_DATABASE_EXISTS =
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = ?";
"SELECT SCHEMA_NAME FROM information_schema.schemata WHERE SCHEMA_NAME = '%s'";

private static final String SELECT_TABLE_EXISTS =
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = ? AND table_name = ?";
"SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'";

static {
SYS_DATABASES.add("information_schema");
Expand All @@ -77,28 +75,14 @@ public MySqlCatalog(
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
if (StringUtils.isBlank(databaseName)) {
return false;
}
if (SYS_DATABASES.contains(databaseName.toLowerCase())) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(databaseName), SELECT_DATABASE_EXISTS, databaseName);
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(SELECT_DATABASE_EXISTS, databaseName);
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String databaseName = tablePath.getDatabaseName();
if (!databaseExists(databaseName)) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(databaseName),
SELECT_TABLE_EXISTS,
databaseName,
tablePath.getTableName());
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
SELECT_TABLE_EXISTS, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

public class OceanBaseOracleCatalog extends OracleCatalog {

static {
EXCLUDED_SCHEMAS =
Collections.unmodifiableList(
Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS"));
EXCLUDED_SCHEMAS.add("oceanbase");
EXCLUDED_SCHEMAS.add("LBACSYS");
EXCLUDED_SCHEMAS.add("ORAAUDITOR");
EXCLUDED_SCHEMAS.add("SYS");
}

public OceanBaseOracleCatalog(
Expand All @@ -53,6 +52,21 @@ protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

@Override
protected String getDatabaseWithConditionSql(String databaseName) {
throw new UnsupportedOperationException();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName())) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
Expand All @@ -65,15 +79,6 @@ public List<String> listTables(String databaseName)
}
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
return false;
}
}

@Override
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand All @@ -45,7 +42,7 @@
@Slf4j
public class OracleCatalog extends AbstractJdbcCatalog {

protected static List<String> EXCLUDED_SCHEMAS =
protected static List<String> EXCLUDED_SCHEMAS_ALL =
Collections.unmodifiableList(
Arrays.asList(
"APPQOSSYS",
Expand Down Expand Up @@ -100,6 +97,10 @@ public class OracleCatalog extends AbstractJdbcCatalog {
+ "ORDER BY \n"
+ " cols.column_id \n";

static {
EXCLUDED_SCHEMAS.addAll(EXCLUDED_SCHEMAS_ALL);
}

public OracleCatalog(
String catalogName,
String username,
Expand All @@ -110,30 +111,19 @@ public OracleCatalog(
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
if (StringUtils.isBlank(databaseName)) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(databaseName),
getListDatabaseSql() + " where name=?",
databaseName);
protected String getDatabaseWithConditionSql(String databaseName) {
return String.format(getListDatabaseSql() + " where name = '%s'", databaseName);
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String databaseName = tablePath.getDatabaseName();
if (!databaseExists(databaseName)) {
return false;
}
if (EXCLUDED_SCHEMAS.contains(tablePath.getSchemaName().toUpperCase())) {
return false;
}
return queryExists(
this.getUrlFromDatabaseName(databaseName),
getListTableSql(databaseName) + " and OWNER= ? and table_name = ?",
tablePath.getSchemaName(),
tablePath.getTableName());
protected String getTableWithConditionSql(TablePath tablePath) {
return String.format(
getListTableSql(tablePath.getDatabaseName())
+ " and OWNER= '"
+ tablePath.getSchemaName()
+ "' and table_name = '"
+ tablePath.getTableName()
+ "'");
}

@Override
Expand Down
Loading

0 comments on commit dff0756

Please sign in to comment.