Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Docs] update Dynamic-compile document #7730

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 116 additions & 57 deletions docs/en/transform-v2/dynamic-compile.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ If the conversion is too complex, it may affect performance
| compile_pattern | Enum | no | SOURCE_CODE |
| absolute_path | string | no | |

### source_code [string]

The code must implement two methods: getInlineOutputColumns and getInlineOutputFieldValues. getInlineOutputColumns determines the columns you want to add or convert, and the original column structure can be obtained from CatalogTable
GetInlineOutputFieldValues determines your column values. You can fulfill any of your requirements, and even complete RPC requests to obtain new values based on the original columns
If there are third-party dependency packages, please place them in ${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the libs of the corresponding service.

### common options [string]

Expand All @@ -47,6 +42,28 @@ If it is a SOURCE-CODE enumeration; the SOURCE-CODE attribute is required, and t

The absolute path of Java or Groovy files on the server

### source_code [string]

The source code.

#### Details about the source code

In the source code, you must implement two method:
- `Column[] getInlineOutputColumns(CatalogTable inputCatalogTable)`
- `Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow)`

`getInlineOutputColumns` method, input parameter is `CatalogTable`, return type is `Column[]`.
you can get the current table's schema from `CatalogTable`.
if the return column exist in current schema, then it will overwrite by returned value (field type, comment, ...), if it's a new column, it will add into current schema.

`getInlineOutputFieldValues` method, input parameter is `SeaTunnelRowAccessor`, return type is `Object[]`
You can get the record from `SeaTunnelRowAccessor`, do you own customized data process logical.
The return `Object[]` array length should match with `getInlineOutputColumns` method result's length. and the order also need be match.

If there are third-party dependency packages, please place them in ${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the libs of the corresponding service.
You need restart the server to load the lib file.


## Example

The data read from source is a table like this:
Expand All @@ -55,10 +72,14 @@ The data read from source is a table like this:
|----------|-----|------|
| Joy Ding | 20 | 123 |
| May Ding | 20 | 123 |
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |
| Kin Dom | 30 | 123 |
| Joy Dom | 30 | 123 |

```
Use this DynamicCompile to add a new column `compile_language`, and update the `age` field by its original value (if age = 20, update to 40)


- use groovy
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -73,29 +94,50 @@ transform {
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
class demo {
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
List<Column> columns = new ArrayList<>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10,
true,
"",
"");
columns.add(destColumn);
return columns.toArray(new Column[0]);
}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[1];
fieldValues[0]="GROOVY"
return fieldValues;
}
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
PhysicalColumn col1 =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10L,
true,
"",
"");
PhysicalColumn col2 =
PhysicalColumn.of(
"age",
BasicType.INT_TYPE,
0L,
false,
false,
""
);
return new Column[]{
col1, col2
};
}


public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[2];
// get age
Object ageField = inputRow.getField(1);
fieldValues[0] = "GROOVY";
if (Integer.parseInt(ageField.toString()) == 20) {
fieldValues[1] = 40;
} else {
fieldValues[1] = ageField;
}
return fieldValues;
}
};"""

}
}
```

- use java
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -108,32 +150,49 @@ transform {
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {

ArrayList<Column> columns = new ArrayList<Column>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10,
true,
"",
"");
return new Column[]{
destColumn
};

}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[1];
fieldValues[0]="JAVA";
return fieldValues;
}
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
PhysicalColumn col1 =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10L,
true,
"",
"");
PhysicalColumn col2 =
PhysicalColumn.of(
"age",
BasicType.INT_TYPE,
0L,
false,
false,
""
);
return new Column[]{
col1, col2
};
}


public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[2];
// get age
Object ageField = inputRow.getField(1);
fieldValues[0] = "JAVA";
if (Integer.parseInt(ageField.toString()) == 20) {
fieldValues[1] = 40;
} else {
fieldValues[1] = ageField;
}
return fieldValues;
}
"""

}
}

```
- use absolute path to read code
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -148,21 +207,21 @@ transform {

Then the data in result table `groovy_out` will like this

| name | age | card | compile_language |
| name | age | card | compile_language |
|----------|-----|------|------------------|
| Joy Ding | 20 | 123 | GROOVY |
| May Ding | 20 | 123 | GROOVY |
| Kin Dom | 20 | 123 | GROOVY |
| Joy Dom | 20 | 123 | GROOVY |
| Joy Ding | 40 | 123 | GROOVY |
| May Ding | 40 | 123 | GROOVY |
| Kin Dom | 30 | 123 | GROOVY |
| Joy Dom | 30 | 123 | GROOVY |

Then the data in result table `java_out` will like this

| name | age | card | compile_language |
|----------|-----|------|------------------|
| Joy Ding | 20 | 123 | JAVA |
| May Ding | 20 | 123 | JAVA |
| Kin Dom | 20 | 123 | JAVA |
| Joy Dom | 20 | 123 | JAVA |
| Joy Ding | 40 | 123 | JAVA |
| May Ding | 40 | 123 | JAVA |
| Kin Dom | 30 | 123 | JAVA |
| Joy Dom | 30 | 123 | JAVA |

More complex examples can be referred to
https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf
Expand Down
Loading
Loading