From 27b22e2ff4949c84c4ddd5293079361a1b1b5f07 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 19 Jul 2024 19:42:03 +0800 Subject: [PATCH] [Fix][Zeta] Fix hybrid deployment can not get worker when init (#7235) --- .../engine/client/SeaTunnelClientTest.java | 13 ++++++---- .../engine/server/SeaTunnelServer.java | 6 +++++ .../AbstractResourceManager.java | 24 ++++++++++++------- .../opeartion/SyncWorkerProfileOperation.java | 6 ++++- .../resourcemanager/ResourceManagerTest.java | 5 ++++ 5 files changed, 39 insertions(+), 15 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index f0949e54cd76..fecff30e7af1 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -398,11 +398,14 @@ public void testSetJobIdDuplicate() { Assertions.assertThrows( Exception.class, () -> jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2()); - Assertions.assertEquals( - String.format( - "The job id %s has already been submitted and is not starting with a savepoint.", - jobId), - exception.getCause().getMessage()); + Assertions.assertTrue( + exception + .getCause() + .getMessage() + .contains( + String.format( + "The job id %s has already been submitted and is not starting with a savepoint.", + jobId))); } catch (Exception e) { throw new RuntimeException(e); } finally { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 765869fd0308..b76af4c19a09 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -84,6 +84,12 @@ public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) { /** Lazy load for Slot Service */ public SlotService getSlotService() { + // If the node is master node, the slot service is not needed. + if (EngineConfig.ClusterRole.MASTER.ordinal() + == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) { + return null; + } + if (slotService == null) { synchronized (this) { if (slotService == null) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index b830e5f05639..6c04748ccca6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -74,25 +74,31 @@ public void init() { private void initWorker() { log.info("initWorker... "); - List
aliveWorker = + List
aliveNode = nodeEngine.getClusterService().getMembers().stream() - .filter(Member::isLiteMember) .map(Member::getAddress) .collect(Collectors.toList()); - log.info("initWorker live nodes: " + aliveWorker); + log.info("init live nodes: {}", aliveNode); List> futures = - aliveWorker.stream() + aliveNode.stream() .map( - worker -> - sendToMember(new SyncWorkerProfileOperation(), worker) + node -> + sendToMember(new SyncWorkerProfileOperation(), node) .thenAccept( p -> { - registerWorker.put( - worker, (WorkerProfile) p); + if (p != null) { + registerWorker.put( + node, (WorkerProfile) p); + log.info( + "received new worker register: " + + ((WorkerProfile) + p) + .getAddress()); + } })) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); - log.info("registerWorker: " + registerWorker); + log.info("registerWorker: {}", registerWorker); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java index ebe85e3dafc3..904629648ab7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java @@ -33,7 +33,11 @@ public class SyncWorkerProfileOperation extends Operation implements IdentifiedD @Override public void run() throws Exception { SeaTunnelServer server = getService(); - result = server.getSlotService().getWorkerProfile(); + if (server.getSlotService() != null) { + result = server.getSlotService().getWorkerProfile(); + } else { + result = null; + } } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java index abd4ccdc090a..5ac803064a8e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java @@ -54,6 +54,11 @@ public void before() { server.getSlotService(); } + @Test + public void testHaveWorkerWhenUseHybridDeployment() { + Assertions.assertEquals(1, resourceManager.workerCount(null)); + } + @Test public void testApplyRequest() throws ExecutionException, InterruptedException { List resourceProfiles = new ArrayList<>();