Skip to content

Commit

Permalink
[Feature][Connector-V2][Tablestore] Support Source connector for Tab…
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud456 authored and hawk9821 committed Aug 29, 2024
1 parent b69bc0d commit d199841
Show file tree
Hide file tree
Showing 12 changed files with 860 additions and 0 deletions.
102 changes: 102 additions & 0 deletions docs/en/connector-v2/source/Tablestore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Tablestore

> Tablestore source connector
## Description

Read data from Alicloud Tablestore,support full and CDC.


## Key features

- [ ] [batch](../../concept/connector-v2-features.md)
- [X] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------|--------|----------|---------------|
| end_point | string | yes | - |
| instance_name | string | yes | - |
| access_key_id | string | yes | - |
| access_key_secret | string | yes | - |
| table | string | yes | - |
| primary_keys | array | yes | - |
| schema | config | yes | - |


### end_point [string]

The endpoint of Tablestore.

### instance_name [string]

The intance name of Tablestore.

### access_key_id [string]

The access id of Tablestore.

### access_key_secret [string]

The access secret of Tablestore.

### table [string]

The table name of Tablestore.

### primary_keys [array]

The primarky key of table,just add a unique primary key.

### schema [Config]



## Example

```bash
env {
parallelism = 1
job.mode = "STREAMING"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Tablestore {
end_point = "https://****.cn-zhangjiakou.tablestore.aliyuncs.com"
instance_name = "****"
access_key_id="***************2Ag5"
access_key_secret="***********2Dok"
table="test"
primary_keys=["id"]
schema={
fields {
id = string
name = string
}
}
}
}


sink {
MongoDB{
uri = "mongodb://localhost:27017"
database = "test"
collection = "test"
primary-key = ["id"]
schema = {
fields {
id = string
name = string
}
}
}
}
```

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ seatunnel.sink.InfluxDB = connector-influxdb
seatunnel.source.GoogleSheets = connector-google-sheets
seatunnel.sink.GoogleFirestore = connector-google-firestore
seatunnel.sink.Tablestore = connector-tablestore
seatunnel.source.Tablestore = connector-tablestore
seatunnel.source.Lemlist = connector-http-lemlist
seatunnel.source.Klaviyo = connector-http-klaviyo
seatunnel.sink.Slack = connector-slack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;

Expand All @@ -45,6 +48,8 @@ public class TablestoreOptions implements Serializable {

public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());

public TablestoreOptions() {}

public TablestoreOptions(Config config) {
this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
this.instanceName = config.getString(TablestoreConfig.INSTANCE_NAME.key());
Expand All @@ -57,4 +62,18 @@ public TablestoreOptions(Config config) {
this.batchSize = config.getInt(BATCH_SIZE.key());
}
}

public static TablestoreOptions of(ReadonlyConfig config) {
Map<String, Object> map = config.getSourceMap();
TablestoreOptions tablestoreOptions = new TablestoreOptions();
tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
List<String> keys = (List<String>) map.get(TablestoreConfig.PRIMARY_KEYS.key());

tablestoreOptions.setPrimaryKeys(keys);
return tablestoreOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import com.alicloud.openservices.tablestore.model.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer {

@Override
public SeaTunnelRow deserialize(StreamRecord r) {
List<Object> fields = new ArrayList<>();
r.getColumns()
.forEach(
k -> {
fields.add(k.getColumn().getValue());
});
return new SeaTunnelRow(fields.toArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import com.alicloud.openservices.tablestore.model.StreamRecord;

public interface SeaTunnelRowDeserializer {

SeaTunnelRow deserialize(StreamRecord streamRecord);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.tablestore.source;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceReader.Context;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;

import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
public class TableStoreDBSource
implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit, TableStoreDBSourceState>,
SupportParallelism,
SupportColumnProjection {

private TablestoreOptions tablestoreOptions;
private SeaTunnelRowType typeInfo;
private JobContext jobContext;

@Override
public String getPluginName() {
return "Tablestore";
}

@Override
public List<CatalogTable> getProducedCatalogTables() {
return SeaTunnelSource.super.getProducedCatalogTables();
}

public TableStoreDBSource(ReadonlyConfig config) {
this.tablestoreOptions = TablestoreOptions.of(config);
CatalogTableUtil.buildWithConfig(config);
this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
? Boundedness.BOUNDED
: Boundedness.UNBOUNDED;
}

@Override
public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> createReader(Context readerContext)
throws Exception {
return new TableStoreDBSourceReader(readerContext, tablestoreOptions, typeInfo);
}

@Override
public SourceSplitEnumerator<TableStoreDBSourceSplit, TableStoreDBSourceState> createEnumerator(
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
enumeratorContext)
throws Exception {
return new TableStoreDBSourceSplitEnumerator(enumeratorContext, tablestoreOptions);
}

@Override
public SourceSplitEnumerator<TableStoreDBSourceSplit, TableStoreDBSourceState>
restoreEnumerator(
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
TableStoreDBSourceSplit>
enumeratorContext,
TableStoreDBSourceState checkpointState)
throws Exception {
return new TableStoreDBSourceSplitEnumerator(
enumeratorContext, tablestoreOptions, checkpointState);
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
}
Loading

0 comments on commit d199841

Please sign in to comment.