-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Elastic search] Support multi-table source feature #7502
Conversation
This reverts commit 26d0473.
@DisabledOnContainer( | ||
value = {}, | ||
type = {EngineType.SPARK, EngineType.FLINK}, | ||
disabledReason = "Currently SPARK/FLINK do not support multiple table read") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark already supports multiple tables. You need to enable test case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…type_data.json file
return getDocsWithTransformDate(source, index, Collections.emptyList()); | ||
} | ||
|
||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Map<String, BasicTypeDefine<EsType>> esFieldType = | ||
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source); | ||
esRestClient.getFieldTypeMapping(index, source); | ||
esRestClient.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a risk that the connection is not closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented Closeable for EsRestClient and extracted the code into a separate method to ensure that resources are closed even if an exception occurs
private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
String index, List<String> source) {
// EsRestClient#getFieldTypeMapping may throw runtime exception
// so here we use try-resources-finally to close the resource
try (EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig)) {
return esRestClient.getFieldTypeMapping(index, source);
}
}
esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList()); | ||
Thread.sleep(2000); | ||
esRestClient.getFieldTypeMapping(indexName, Lists.newArrayList()); | ||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can unify a parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now all refresh waiting times use static variables.
private static final long INDEX_REFRESH_MILL_DELAY = 5000L;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
tls_verify_hostname = false | ||
|
||
index = "multi_source_write_test_index" | ||
index_type = "st" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should test this feature using multi-table writes
check read/write tables row & column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on your suggestion, I modified my e2e test.
Now it reads different fields from different indices and writes them into different target indices.
env {
parallelism = 1
job.mode = "BATCH"
#checkpoint.interval = 10000
}
source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index_list = [
{
index = "read_filter_index1"
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
source = [
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp,
c_null
]
array_column = {
c_array = "array<tinyint>"
}
}
{
index = "read_filter_index2"
query = {"range": {"c_int2": {"gte": 10, "lte": 20}}}
source = [
c_int2,
c_null2,
c_date2
]
}
]
}
}
transform {
}
sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index = "${table_name}_copy"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
}
c_map, | ||
c_array, | ||
c_string, | ||
c_boolean, | ||
c_tinyint, | ||
c_smallint, | ||
c_bigint, | ||
c_float, | ||
c_double, | ||
c_decimal, | ||
c_bytes, | ||
c_int, | ||
c_date, | ||
c_timestamp | ||
] | ||
array_column = { | ||
c_array = "array<tinyint>" | ||
} | ||
} | ||
{ | ||
index = "read_index2" | ||
query = {"range": {"c_int": {"gte": 10, "lte": 20}}} | ||
source = [ | ||
c_map, | ||
c_array, | ||
c_string, | ||
c_boolean, | ||
c_tinyint, | ||
c_smallint, | ||
c_bigint, | ||
c_float, | ||
c_double, | ||
c_decimal, | ||
c_bytes, | ||
c_int, | ||
c_date, | ||
c_timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should define indexes using different fields
e.g
index_1: x, y, z...
index_2: a,b,c...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on your suggestion, I modified my e2e test.
Now it reads different fields from different indices and writes them into different target indices.
env {
parallelism = 1
job.mode = "BATCH"
#checkpoint.interval = 10000
}
source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index_list = [
{
index = "read_filter_index1"
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
source = [
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp,
c_null
]
array_column = {
c_array = "array<tinyint>"
}
}
{
index = "read_filter_index2"
query = {"range": {"c_int2": {"gte": 10, "lte": 20}}}
source = [
c_int2,
c_null2,
c_date2
]
}
]
}
}
transform {
}
sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index = "${table_name}_copy"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
}
@@ -55,7 +52,7 @@ public String factoryIdentifier() { | |||
@Override | |||
public OptionRule optionRule() { | |||
return OptionRule.builder() | |||
.required(HOSTS, INDEX) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it compatible with old versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fully compatible with previous configurations, in the ElasticsearchSource code, the first if step is to check for the existence of index_ist. If it exists, it should be parsed synchronously as multiple tables, and if it does not exist, it should be parsed synchronously as a single table
public ElasticsearchSource(ReadonlyConfig config) {
this.connectionConfig = config;
boolean multiSource = config.getOptional(SourceConfig.INDEX_LIST).isPresent();
boolean singleSource = config.getOptional(SourceConfig.INDEX).isPresent();
if (multiSource && singleSource) {
log.warn(
"Elasticsearch Source config warn: when both 'index' and 'index_list' are present in the configuration, only the 'index_list' configuration will take effect");
}
if (!multiSource && !singleSource) {
throw new ElasticsearchConnectorException(
ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01,
ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription());
}
if (multiSource) {
this.sourceConfigList = createMultiSource(config);
} else {
this.sourceConfigList = Collections.singletonList(parseOneIndexQueryConfig(config));
}
}
What will happen if I configure index & index_list connector at the same time |
The program will print a warning log to tell the user the configured processing priority。
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @FuYouJ
Waiting for ci passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thank you for your contribution @FuYouJ |
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.