Skip to content

Commit

Permalink
[Fix][Zeta] Fix hybrid deployment can not get worker when init (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and hawk9821 committed Jul 27, 2024
1 parent 87f44f2 commit 27b22e2
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,14 @@ public void testSetJobIdDuplicate() {
Assertions.assertThrows(
Exception.class,
() -> jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2());
Assertions.assertEquals(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId),
exception.getCause().getMessage());
Assertions.assertTrue(
exception
.getCause()
.getMessage()
.contains(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId)));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {

/** Lazy load for Slot Service */
public SlotService getSlotService() {
// If the node is master node, the slot service is not needed.
if (EngineConfig.ClusterRole.MASTER.ordinal()
== seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
return null;
}

if (slotService == null) {
synchronized (this) {
if (slotService == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,31 @@ public void init() {

private void initWorker() {
log.info("initWorker... ");
List<Address> aliveWorker =
List<Address> aliveNode =
nodeEngine.getClusterService().getMembers().stream()
.filter(Member::isLiteMember)
.map(Member::getAddress)
.collect(Collectors.toList());
log.info("initWorker live nodes: " + aliveWorker);
log.info("init live nodes: {}", aliveNode);
List<CompletableFuture<Void>> futures =
aliveWorker.stream()
aliveNode.stream()
.map(
worker ->
sendToMember(new SyncWorkerProfileOperation(), worker)
node ->
sendToMember(new SyncWorkerProfileOperation(), node)
.thenAccept(
p -> {
registerWorker.put(
worker, (WorkerProfile) p);
if (p != null) {
registerWorker.put(
node, (WorkerProfile) p);
log.info(
"received new worker register: "
+ ((WorkerProfile)
p)
.getAddress());
}
}))
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
log.info("registerWorker: " + registerWorker);
log.info("registerWorker: {}", registerWorker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public class SyncWorkerProfileOperation extends Operation implements IdentifiedD
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
result = server.getSlotService().getWorkerProfile();
if (server.getSlotService() != null) {
result = server.getSlotService().getWorkerProfile();
} else {
result = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void before() {
server.getSlotService();
}

@Test
public void testHaveWorkerWhenUseHybridDeployment() {
Assertions.assertEquals(1, resourceManager.workerCount(null));
}

@Test
public void testApplyRequest() throws ExecutionException, InterruptedException {
List<ResourceProfile> resourceProfiles = new ArrayList<>();
Expand Down

0 comments on commit 27b22e2

Please sign in to comment.