Skip to content

Commit

Permalink
[FLINK-34007][k8s] fabric8io LeaderElector is created with every new …
Browse files Browse the repository at this point in the history
…#run() call

v5.12.4 allowed us to reuse the LeaderElector. With v6.6.2 (fabric8io/kubernetes-client#4125) this behavior changed. One LeaderElector can only be used until the leadership is lost.
  • Loading branch information
XComp committed Jan 23, 2024
1 parent 5189eca commit f1f440c
Showing 1 changed file with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
Expand All @@ -31,6 +32,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -55,9 +60,22 @@ public class KubernetesLeaderElector {

private final Object lock = new Object();

private final NamespacedKubernetesClient kubernetesClient;
private final LeaderElectionConfig leaderElectionConfig;
private final ExecutorService executorService;

private final LeaderElector internalLeaderElector;
/**
* The {@code internalLeaderElector} handles a single leadership lifecycle (i.e. acquiring,
* renewing and losing the leadership once). This member being null means that there is no
* leadership cycle happening right now.
*/
@Nullable private LeaderElector internalLeaderElector;

/**
* Reference to the currently executed leadership lifecycle. Analogously to {@link
* #internalLeaderElector}, it's {@code null} if no leadership lifecycle is initiated.
*/
@Nullable private CompletableFuture<?> currentLeaderElectionSession;

public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
Expand All @@ -77,7 +95,8 @@ public KubernetesLeaderElector(
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler,
ExecutorService executorService) {
final LeaderElectionConfig leaderElectionConfig =
this.kubernetesClient = kubernetesClient;
this.leaderElectionConfig =
new LeaderElectionConfigBuilder()
.withName(leaderConfig.getConfigMapName())
.withLeaseDuration(leaderConfig.getLeaseDuration())
Expand All @@ -99,12 +118,37 @@ public KubernetesLeaderElector(
leaderConfig.getConfigMapName())))
.build();
this.executorService = executorService;

LOG.info(
"Create KubernetesLeaderElector on lock {}.",
leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void resetInternalLeaderElector() {
stopLeaderElectionCycle();

internalLeaderElector =
new LeaderElector(kubernetesClient, leaderElectionConfig, executorService);
currentLeaderElectionSession = internalLeaderElector.start();

LOG.info(
"Create KubernetesLeaderElector {} with lock identity {}.",
leaderConfig.getConfigMapName(),
leaderConfig.getLockIdentity());
"Triggered leader election on lock {}.", leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void stopLeaderElectionCycle() {
if (internalLeaderElector != null) {
Preconditions.checkNotNull(currentLeaderElectionSession);

// the current leader election cycle needs to be cancelled before releasing the lock to
// avoid retrying
currentLeaderElectionSession.cancel(true);
currentLeaderElectionSession = null;

internalLeaderElector.release();
internalLeaderElector = null;
}
}

public void run() {
Expand All @@ -113,13 +157,14 @@ public void run() {
LOG.debug(
"Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down.");
} else {
executorService.execute(internalLeaderElector::run);
resetInternalLeaderElector();
}
}
}

public void stop() {
synchronized (lock) {
stopLeaderElectionCycle();
executorService.shutdownNow();
}
}
Expand Down

0 comments on commit f1f440c

Please sign in to comment.