Skip to content

Commit

Permalink
[Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Aug 22, 2024
1 parent a6b188d commit 8ee129d
Showing 1 changed file with 50 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class StarRocksCatalog implements Catalog {
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
private final String template;
private Connection conn;

private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);

public StarRocksCatalog(
Expand All @@ -99,8 +101,7 @@ public StarRocksCatalog(

@Override
public List<String> listDatabases() throws CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
try (PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
ResultSet rs = ps.executeQuery()) {
List<String> databases = new ArrayList<>();

Expand All @@ -122,20 +123,19 @@ public List<String> listTables(String databaseName)
throw new DatabaseNotExistException(this.catalogName, databaseName);
}

try (Connection conn =
DriverManager.getConnection(
urlInfo.getUrlWithDatabase(databaseName), username, pwd);
PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
ResultSet rs = ps.executeQuery()) {

List<String> tables = new ArrayList<>();

while (rs.next()) {
tables.add(rs.getString(1));
try (PreparedStatement ps =
conn.prepareStatement(
"SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_SCHEMA = ? ORDER BY TABLE_NAME")) {
ps.setString(1, databaseName);
try (ResultSet rs = ps.executeQuery()) {
List<String> tables = new ArrayList<>();
while (rs.next()) {
tables.add(rs.getString(1));
}
return tables;
}

return tables;
} catch (Exception e) {
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
Expand All @@ -148,8 +148,7 @@ public CatalogTable getTable(TablePath tablePath)
throw new TableNotExistException(catalogName, tablePath);
}

String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
try {
Optional<PrimaryKey> primaryKey =
getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());

Expand Down Expand Up @@ -213,7 +212,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
conn.createStatement()
.execute(StarRocksSaveModeUtil.getDropTableSql(tablePath, ignoreIfNotExists));
} catch (Exception e) {
Expand All @@ -224,7 +223,7 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)

public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
if (ignoreIfNotExists) {
conn.createStatement()
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
Expand All @@ -237,17 +236,16 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
}

public void executeSql(TablePath tablePath, String sql) {
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd)) {
connection.createStatement().execute(sql);
try {
conn.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(String.format("Failed EXECUTE SQL in catalog %s", sql), e);
}
}

public boolean isExistsData(TablePath tablePath) {
String sql = String.format("select * from %s limit 1", tablePath.getFullName());
try (Connection connection = DriverManager.getConnection(defaultUrl, username, pwd);
Statement statement = connection.createStatement();
try (Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet == null) {
return false;
Expand All @@ -262,7 +260,7 @@ public boolean isExistsData(TablePath tablePath) {
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
conn.createStatement()
.execute(
StarRocksSaveModeUtil.getCreateDatabaseSql(
Expand All @@ -276,7 +274,7 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
conn.createStatement()
.execute(
StarRocksSaveModeUtil.getDropDatabaseSql(
Expand Down Expand Up @@ -368,7 +366,7 @@ private Map<String, String> buildConnectorOptions(TablePath tablePath) {

public void createTable(String sql)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
log.info("create table sql is :{}", sql);
conn.createStatement().execute(sql);
} catch (Exception e) {
Expand Down Expand Up @@ -418,7 +416,8 @@ public String getDefaultDatabase() {

@Override
public void open() throws CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
try {
conn = DriverManager.getConnection(defaultUrl, username, pwd);
// test connection, fail early if we cannot connect to database
conn.getCatalog();
} catch (SQLException e) {
Expand All @@ -432,6 +431,11 @@ public void open() throws CatalogException {
@Override
public void close() throws CatalogException {
LOG.info("Catalog {} closing", catalogName);
try {
conn.close();
} catch (SQLException e) {
throw new CatalogException("close doris catalog failed", e);
}
}

@Override
Expand All @@ -442,13 +446,12 @@ public String name() {
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException {

List<String> pkFields = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd);
ResultSet rs =
conn.createStatement()
.executeQuery(
String.format(
"SELECT COLUMN_NAME FROM information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
schema, table))) {
try (ResultSet rs =
conn.createStatement()
.executeQuery(
String.format(
"SELECT COLUMN_NAME FROM information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
schema, table))) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
pkFields.add(columnName);
Expand All @@ -471,11 +474,19 @@ public boolean databaseExists(String databaseName) throws CatalogException {

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
} catch (DatabaseNotExistException e) {
return false;
try (PreparedStatement ps =
conn.prepareStatement(
"SELECT TABLE_NAME FROM information_schema.tables "
+ "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ "ORDER BY TABLE_NAME")) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
} catch (SQLException e) {
throw new CatalogException(
String.format("check table [%s] exists failed", tablePath.getFullName()), e);
}
}

Expand Down

0 comments on commit 8ee129d

Please sign in to comment.