Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Handle user privacy when submitting a task print config logs #7247

Merged
merged 2 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ seatunnel:
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission
fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
import org.apache.seatunnel.shade.com.typesafe.config.impl.Parseable;

import org.apache.seatunnel.api.configuration.ConfigAdapter;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.ParserException;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.seatunnel.core.starter.utils.ConfigShadeUtils.DEFAULT_SENSITIVE_KEYWORDS;

/** Used to build the {@link Config} from config file. */
@Slf4j
Expand Down Expand Up @@ -76,14 +82,19 @@ public static Config of(@NonNull Path filePath, List<String> variables) {
adapterSupplier
.map(adapter -> of(adapter, filePath, variables))
.orElseGet(() -> ofInner(filePath, variables));
boolean isJson = filePath.getFileName().toString().endsWith(".json");
log.info(
"Parsed config file: \n{}",
mapToString(configDesensitization(config.root().unwrapped()), isJson));
return config;
}

public static Config of(@NonNull Map<String, Object> objectMap) {
return of(objectMap, false);
return of(objectMap, false, false);
}

public static Config of(@NonNull Map<String, Object> objectMap, boolean isEncrypt) {
public static Config of(
@NonNull Map<String, Object> objectMap, boolean isEncrypt, boolean isJson) {
log.info("Loading config file from objectMap");
Config config =
ConfigFactory.parseMap(objectMap)
Expand All @@ -94,9 +105,49 @@ public static Config of(@NonNull Map<String, Object> objectMap, boolean isEncryp
if (!isEncrypt) {
config = ConfigShadeUtils.decryptConfig(config);
}
log.info(
"Parsed config file: \n{}",
mapToString(configDesensitization(config.root().unwrapped()), isJson));
return config;
}

public static Map<String, Object> configDesensitization(Map<String, Object> configMap) {
return configMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> {
String key = entry.getKey();
if (Arrays.asList(DEFAULT_SENSITIVE_KEYWORDS)
.contains(key.toLowerCase())) {
return "******";
}
Object value = entry.getValue();
if (value instanceof Map) {
if ("schema".equals(key)) {
return value;
}
return configDesensitization((Map<String, Object>) value);
} else if (value instanceof List) {
return ((List<?>) value)
.stream()
.map(
v -> {
if (v instanceof Map) {
return configDesensitization(
(Map<
String,
Object>)
v);
}
return v;
})
.collect(Collectors.toList());
}
return value;
}));
}

public static Config of(
@NonNull ConfigAdapter configAdapter, @NonNull Path filePath, List<String> variables) {
log.info("With config adapter spi {}", configAdapter.getClass().getName());
Expand Down Expand Up @@ -133,4 +184,18 @@ private static Config backfillUserVariables(Config config, List<String> variable
}
return config;
}

public static String mapToString(Map<String, Object> configMap, boolean isJson) {
ConfigRenderOptions configRenderOptions =
ConfigRenderOptions.concise().setFormatted(true).setJson(isJson);
ConfigParseOptions configParseOptions =
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON);
Config config =
ConfigFactory.parseString(JsonUtils.toJsonString(configMap), configParseOptions)
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
return config.root().render(configRenderOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public final class ConfigShadeUtils {

private static final String SHADE_IDENTIFIER_OPTION = "shade.identifier";

private static final String[] DEFAULT_SENSITIVE_OPTIONS =
new String[] {"password", "username", "auth"};
public static final String[] DEFAULT_SENSITIVE_KEYWORDS =
new String[] {"password", "username", "auth", "token"};

private static final Map<String, ConfigShade> CONFIG_SHADES = new HashMap<>();

Expand Down Expand Up @@ -126,7 +126,7 @@ public static Config encryptConfig(String identifier, Config config) {
@SuppressWarnings("unchecked")
private static Config processConfig(String identifier, Config config, boolean isDecrypted) {
ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
List<String> sensitiveOptions = new ArrayList<>(Arrays.asList(DEFAULT_SENSITIVE_OPTIONS));
List<String> sensitiveOptions = new ArrayList<>(Arrays.asList(DEFAULT_SENSITIVE_KEYWORDS));
sensitiveOptions.addAll(Arrays.asList(configShade.sensitiveOptions()));
BiFunction<String, Object, String> processFunction =
(key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;

import org.apache.seatunnel.api.configuration.ConfigShade;
import org.apache.seatunnel.common.utils.JsonUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.beust.jcommander.internal.Lists;
import lombok.extern.slf4j.Slf4j;

import java.net.URISyntaxException;
Expand Down Expand Up @@ -68,6 +71,46 @@ public void testParseConfig() throws URISyntaxException {
config.getConfigList("source").get(0).getString("password"), PASSWORD);
}

@Test
public void testUsePrivacyHandlerHocon() throws URISyntaxException {
URL resource = ConfigShadeTest.class.getResource("/config.shade.conf");
Assertions.assertNotNull(resource);
Config config = ConfigBuilder.of(Paths.get(resource.toURI()), Lists.newArrayList());
config =
ConfigFactory.parseMap(
ConfigBuilder.configDesensitization(config.root().unwrapped()))
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
Assertions.assertEquals(
config.getConfigList("source").get(0).getString("username"), "******");
Assertions.assertEquals(
config.getConfigList("source").get(0).getString("password"), "******");
String conf = ConfigBuilder.mapToString(config.root().unwrapped(), false);
Assertions.assertTrue(conf.contains("username=\"******\""));
}

@Test
public void testUsePrivacyHandlerJson() throws URISyntaxException {
URL resource = ConfigShadeTest.class.getResource("/config.shade.json");
Assertions.assertNotNull(resource);
Config config = ConfigBuilder.of(Paths.get(resource.toURI()), Lists.newArrayList());
config =
ConfigFactory.parseMap(
ConfigBuilder.configDesensitization(config.root().unwrapped()))
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
Assertions.assertEquals(
config.getConfigList("source").get(0).getString("username"), "******");
Assertions.assertEquals(
config.getConfigList("source").get(0).getString("password"), "******");
String json = ConfigBuilder.mapToString(config.root().unwrapped(), true);
Assertions.assertTrue(json.contains("\"password\" : \"******\""));
}

@Test
public void testVariableReplacement() throws URISyntaxException {
String jobName = "seatunnel variable test job";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"env" : {
"shade.identifier" : "base64",
"parallelism" : 1
},
"source" : [
{
"plugin_name" : "MySQL-CDC",
"base-url" : "jdbc:mysql://localhost:56725",
"username" : "c2VhdHVubmVs",
"password" : "c2VhdHVubmVsX3Bhc3N3b3Jk",
"hostname" : "127.0.0.1",
"port" : 56725,
"database-name" : "inventory_vwyw0n",
"parallelism" : 1,
"table-name" : "products",
"server-id" : 5656,
"schema" : {
"fields" : {
"name" : "string",
"age" : "int",
"sex" : "boolean"
}
},
"result_table_name" : "fake"
}
],
"transform" : [],
"sink" : [
{
"plugin_name" : "Clickhouse",
"host" : "localhost:8123",
"username" : "c2VhdHVubmVs",
"password" : "c2VhdHVubmVsX3Bhc3N3b3Jk",
"database" : "default",
"table" : "fake_all",
"support_upsert" : true,
"primary_key" : "id"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public static void buildRequestParams(Map<String, String> requestParams, String

public static Config buildConfig(JsonNode jsonNode, boolean isEncrypt) {
Map<String, Object> objectMap = JsonUtils.toMap(jsonNode);
return ConfigBuilder.of(objectMap, isEncrypt);
return ConfigBuilder.of(objectMap, isEncrypt, true);
}
}
Loading