diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index c34defa5ca2..fdefd2870bd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -81,7 +81,6 @@ import org.apache.bookkeeper.meta.MetadataClientDriver; import org.apache.bookkeeper.meta.MetadataDrivers; import org.apache.bookkeeper.meta.exceptions.MetadataException; -import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.proto.BookieAddressResolver; @@ -635,6 +634,11 @@ public LedgerManager getLedgerManager() { return ledgerManager; } + @VisibleForTesting + public LedgerManagerFactory getLedgerManagerFactory() { + return ledgerManagerFactory; + } + @VisibleForTesting LedgerManager getUnderlyingLedgerManager() { return ((CleanupLedgerManager) ledgerManager).getUnderlying(); @@ -743,10 +747,6 @@ public org.apache.bookkeeper.client.api.DigestType toApiDigestType() { } } - ZooKeeper getZkHandle() { - return ((ZKMetadataClientDriver) metadataDriver).getZk(); - } - protected ClientConfiguration getConf() { return conf; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 7dbf4df8cf9..1a37f61b81b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -67,6 +67,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener; +import org.apache.bookkeeper.meta.LedgerAuditorManager; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; import org.apache.bookkeeper.meta.LedgerManagerFactory; @@ -78,7 +79,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; -import org.apache.bookkeeper.replication.AuditorElector; import org.apache.bookkeeper.replication.BookieLedgerIndexer; import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; @@ -116,6 +116,8 @@ public class BookKeeperAdmin implements AutoCloseable { */ private LedgerUnderreplicationManager underreplicationManager; + private LedgerAuditorManager ledgerAuditorManager; + /** * Constructor that takes in a ZooKeeper servers connect string so we know * how to connect to ZooKeeper to retrieve information about the BookKeeper @@ -201,6 +203,14 @@ public void close() throws InterruptedException, BKException { if (ownsBK) { bkc.close(); } + + if (ledgerAuditorManager != null) { + try { + ledgerAuditorManager.close(); + } catch (Exception e) { + throw new BKException.MetaStoreException(e); + } + } } /** @@ -1404,6 +1414,14 @@ private LedgerUnderreplicationManager getUnderreplicationManager() return underreplicationManager; } + private LedgerAuditorManager getLedgerAuditorManager() + throws IOException, InterruptedException { + if (ledgerAuditorManager == null) { + ledgerAuditorManager = mFactory.newLedgerAuditorManager(); + } + return ledgerAuditorManager; + } + /** * Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper. * @@ -1455,8 +1473,7 @@ public void triggerAudit() throw new UnavailableException("Autorecovery is disabled. So giving up!"); } - BookieId auditorId = - AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle()); + BookieId auditorId = getLedgerAuditorManager().getCurrentAuditor(); if (auditorId == null) { LOG.error("No auditor elected, though Autorecovery is enabled. So giving up."); throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up."); @@ -1709,4 +1726,8 @@ public CompletableFuture asyncGetListOfEntriesOfL long ledgerId) { return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId); } + + public BookieId getCurrentAuditor() throws IOException, InterruptedException { + return getLedgerAuditorManager().getCurrentAuditor(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java index 19ac418eeae..e613082d644 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java @@ -23,8 +23,10 @@ import java.util.List; import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; @@ -87,4 +89,10 @@ public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException { return new ZkLedgerUnderreplicationManager(conf, zk); } + + @Override + public LedgerAuditorManager newLedgerAuditorManager() { + ServerConfiguration serverConfiguration = new ServerConfiguration(conf); + return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java new file mode 100644 index 00000000000..b1b2fa003fa --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import java.io.IOException; +import java.util.function.Consumer; +import org.apache.bookkeeper.net.BookieId; + +/** + * Interface to handle the ledger auditor election. + */ +public interface LedgerAuditorManager extends AutoCloseable { + + /** + * Events that can be triggered by the LedgerAuditorManager. + */ + enum AuditorEvent { + SessionLost, + VoteWasDeleted, + } + + /** + * Try to become the auditor. If there's already another auditor, it will wait until this + * current instance has become the auditor. + * + * @param bookieId the identifier for current bookie + * @param listener listener that will receive AuditorEvent notifications + * @return + */ + void tryToBecomeAuditor(String bookieId, Consumer listener) throws IOException, InterruptedException; + + /** + * Return the information regarding the current auditor. + * @return + */ + BookieId getCurrentAuditor() throws IOException, InterruptedException; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java index 80d3a6526f9..d213235d6e3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java @@ -86,6 +86,16 @@ LedgerManagerFactory initialize(AbstractConfiguration conf, LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException, InterruptedException, ReplicationException.CompatibilityException; + + /** + * Return a ledger auditor manager, which is used to + * coordinate the auto-recovery process. + * + * @return ledger auditor manager + * @see LedgerAuditorManager + */ + LedgerAuditorManager newLedgerAuditorManager() throws IOException, InterruptedException; + /** * Format the ledger metadata for LedgerManager. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java index 91579735b05..a218ef3eb97 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java @@ -23,8 +23,10 @@ import java.util.List; import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; @@ -78,6 +80,12 @@ public LedgerIdGenerator newLedgerIdGenerator() { zkAcls); } + @Override + public LedgerAuditorManager newLedgerAuditorManager() { + ServerConfiguration serverConfiguration = new ServerConfiguration(conf); + return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE); + } + @Override public LedgerManager newLedgerManager() { return new LegacyHierarchicalLedgerManager(conf, zk); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java index e15e0a56ef7..3ad303aa284 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.metastore.MSException; import org.apache.bookkeeper.metastore.MSWatchedEvent; @@ -60,6 +61,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.StringUtils; import org.apache.bookkeeper.util.ZkUtils; @@ -815,4 +817,10 @@ public boolean validateAndNukeExistingCluster(AbstractConfiguration conf, Lay zkServers, zkLedgersRootPath); return true; } + + @Override + public LedgerAuditorManager newLedgerAuditorManager() { + ServerConfiguration serverConfiguration = new ServerConfiguration(conf); + return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java new file mode 100644 index 00000000000..fa1fcd43b3c --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.meta; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat; +import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS; +import com.google.protobuf.TextFormat; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.annotations.StatsDoc; +import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; + +/** + * ZK based implementation of LedgerAuditorManager. + */ +@Slf4j +public class ZkLedgerAuditorManager implements LedgerAuditorManager { + + private final ZooKeeper zkc; + private final ServerConfiguration conf; + private final String basePath; + private final String electionPath; + + private String myVote; + + private static final String ELECTION_ZNODE = "auditorelection"; + + // Represents the index of the auditor node + private static final int AUDITOR_INDEX = 0; + // Represents vote prefix + private static final String VOTE_PREFIX = "V_"; + // Represents path Separator + private static final String PATH_SEPARATOR = "/"; + + private volatile Consumer listener; + private volatile boolean isClosed = false; + + // Expose Stats + @StatsDoc( + name = ELECTION_ATTEMPTS, + help = "The number of auditor election attempts" + ) + private final Counter electionAttempts; + + public ZkLedgerAuditorManager(ZooKeeper zkc, ServerConfiguration conf, StatsLogger statsLogger) { + this.zkc = zkc; + this.conf = conf; + + this.basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' + + BookKeeperConstants.UNDER_REPLICATION_NODE; + this.electionPath = basePath + '/' + ELECTION_ZNODE; + this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS); + } + + @Override + public void tryToBecomeAuditor(String bookieId, Consumer listener) + throws IOException, InterruptedException { + this.listener = listener; + createElectorPath(); + + try { + while (!isClosed) { + createMyVote(bookieId); + + List children = zkc.getChildren(getVotePath(""), false); + if (0 >= children.size()) { + throw new IllegalArgumentException( + "At least one bookie server should present to elect the Auditor!"); + } + + // sorting in ascending order of sequential number + Collections.sort(children, new ElectionComparator()); + String voteNode = StringUtils.substringAfterLast(myVote, PATH_SEPARATOR); + + if (children.get(AUDITOR_INDEX).equals(voteNode)) { + // We have been elected as the auditor + // update the auditor bookie id in the election path. This is + // done for debugging purpose + AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder() + .setBookieId(bookieId); + + zkc.setData(getVotePath(""), + builder.build().toString().getBytes(UTF_8), -1); + return; + } else { + // If not an auditor, will be watching to my predecessor and + // looking the previous node deletion. + int myIndex = children.indexOf(voteNode); + if (myIndex < 0) { + throw new IllegalArgumentException("My vote has disappeared"); + } + + int prevNodeIndex = myIndex - 1; + + CountDownLatch latch = new CountDownLatch(1); + + if (null == zkc.exists(getVotePath(PATH_SEPARATOR) + + children.get(prevNodeIndex), event -> latch.countDown())) { + // While adding, the previous znode doesn't exists. + // Again going to election. + continue; + } + + // Wait for the previous auditor in line to be deleted + latch.await(); + } + + electionAttempts.inc(); + } + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + public BookieId getCurrentAuditor() throws IOException, InterruptedException { + String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' + + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE; + + try { + List children = zkc.getChildren(electionRoot, false); + Collections.sort(children, new ElectionComparator()); + if (children.size() < 1) { + return null; + } + String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX); + byte[] data = zkc.getData(ledger, false, null); + + AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder(); + TextFormat.merge(new String(data, UTF_8), builder); + AuditorVoteFormat v = builder.build(); + return BookieId.parse(v.getBookieId()); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws Exception { + log.info("Shutting down AuditorElector"); + isClosed = true; + if (myVote != null) { + try { + zkc.delete(myVote, -1); + } catch (KeeperException.NoNodeException nne) { + // Ok + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("InterruptedException while deleting myVote: " + myVote, + ie); + } catch (KeeperException ke) { + log.error("Exception while deleting myVote:" + myVote, ke); + } + } + } + + private void createMyVote(String bookieId) throws IOException, InterruptedException { + List zkAcls = ZkUtils.getACLs(conf); + AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder() + .setBookieId(bookieId); + + try { + if (null == myVote || null == zkc.exists(myVote, false)) { + myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX), + builder.build().toString().getBytes(UTF_8), zkAcls, + CreateMode.EPHEMERAL_SEQUENTIAL); + } + } catch (KeeperException e) { + throw new IOException(e); + } + } + + private void createElectorPath() throws IOException { + try { + List zkAcls = ZkUtils.getACLs(conf); + if (zkc.exists(basePath, false) == null) { + try { + zkc.create(basePath, new byte[0], zkAcls, + CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + // do nothing, someone else could have created it + } + } + if (zkc.exists(getVotePath(""), false) == null) { + try { + zkc.create(getVotePath(""), new byte[0], + zkAcls, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + // do nothing, someone else could have created it + } + } + } catch (KeeperException ke) { + throw new IOException("Failed to initialize Auditor Elector", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialize Auditor Elector", ie); + } + } + + private String getVotePath(String vote) { + return electionPath + vote; + } + + private void handleZkWatch(WatchedEvent event) { + if (isClosed) { + return; + } + + if (event.getState() == Watcher.Event.KeeperState.Expired) { + log.error("Lost ZK connection, shutting down"); + + listener.accept(AuditorEvent.SessionLost); + } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + listener.accept(AuditorEvent.VoteWasDeleted); + } + } + + /** + * Compare the votes in the ascending order of the sequence number. Vote + * format is 'V_sequencenumber', comparator will do sorting based on the + * numeric sequence value. + */ + private static class ElectionComparator + implements Comparator, Serializable { + /** + * Return -1 if the first vote is less than second. Return 1 if the + * first vote is greater than second. Return 0 if the votes are equal. + */ + @Override + public int compare(String vote1, String vote2) { + long voteSeqId1 = getVoteSequenceId(vote1); + long voteSeqId2 = getVoteSequenceId(vote2); + int result = voteSeqId1 < voteSeqId2 ? -1 + : (voteSeqId1 > voteSeqId2 ? 1 : 0); + return result; + } + + private long getVoteSequenceId(String vote) { + String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX); + return Long.parseLong(voteId); + } + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java index 70284666c13..eaab84c7b15 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java @@ -20,47 +20,25 @@ */ package org.apache.bookkeeper.replication; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE; -import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS; - import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.TextFormat; - import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.ZkLayoutManager; -import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.meta.LedgerAuditorManager; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; -import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; -import org.apache.bookkeeper.util.BookKeeperConstants; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,35 +58,18 @@ public class AuditorElector { private static final Logger LOG = LoggerFactory .getLogger(AuditorElector.class); - // Represents the index of the auditor node - private static final int AUDITOR_INDEX = 0; - // Represents vote prefix - private static final String VOTE_PREFIX = "V_"; - // Represents path Separator - private static final String PATH_SEPARATOR = "/"; - private static final String ELECTION_ZNODE = "auditorelection"; - // Represents urLedger path in zk - private final String basePath; - // Represents auditor election path in zk - private final String electionPath; private final String bookieId; private final ServerConfiguration conf; private final BookKeeper bkc; - private final ZooKeeper zkc; private final boolean ownBkc; private final ExecutorService executor; + private final LedgerAuditorManager ledgerAuditorManager; - private String myVote; Auditor auditor; private AtomicBoolean running = new AtomicBoolean(false); - // Expose Stats - @StatsDoc( - name = ELECTION_ATTEMPTS, - help = "The number of auditor election attempts" - ) - private final Counter electionAttempts; + private final StatsLogger statsLogger; @@ -163,13 +124,12 @@ public AuditorElector(final String bookieId, this.conf = conf; this.bkc = bkc; this.ownBkc = ownBkc; - this.zkc = ((ZkLayoutManager) bkc.getMetadataClientDriver().getLayoutManager()).getZk(); this.statsLogger = statsLogger; - this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS); - basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' - + BookKeeperConstants.UNDER_REPLICATION_NODE; - electionPath = basePath + '/' + ELECTION_ZNODE; - createElectorPath(); + try { + this.ledgerAuditorManager = bkc.getLedgerManagerFactory().newLedgerAuditorManager(); + } catch (Exception e) { + throw new UnavailableException("Failed to instantiate the ledger auditor manager", e); + } executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -178,70 +138,6 @@ public Thread newThread(Runnable r) { }); } - private void createMyVote() throws KeeperException, InterruptedException { - if (null == myVote || null == zkc.exists(myVote, false)) { - List zkAcls = ZkUtils.getACLs(conf); - AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder() - .setBookieId(bookieId); - myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX), - builder.build().toString().getBytes(UTF_8), zkAcls, - CreateMode.EPHEMERAL_SEQUENTIAL); - } - } - - String getMyVote() { - return myVote; - } - - private String getVotePath(String vote) { - return electionPath + vote; - } - - private void createElectorPath() throws UnavailableException { - try { - List zkAcls = ZkUtils.getACLs(conf); - if (zkc.exists(basePath, false) == null) { - try { - zkc.create(basePath, new byte[0], zkAcls, - CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - // do nothing, someone else could have created it - } - } - if (zkc.exists(getVotePath(""), false) == null) { - try { - zkc.create(getVotePath(""), new byte[0], - zkAcls, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - // do nothing, someone else could have created it - } - } - } catch (KeeperException ke) { - throw new UnavailableException( - "Failed to initialize Auditor Elector", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new UnavailableException( - "Failed to initialize Auditor Elector", ie); - } - } - - /** - * Watching the predecessor bookies and will do election on predecessor node - * deletion or expiration. - */ - private class ElectionWatcher implements Watcher { - @Override - public void process(WatchedEvent event) { - if (event.getState() == KeeperState.Expired) { - LOG.error("Lost ZK connection, shutting down"); - submitShutdownTask(); - } else if (event.getType() == EventType.NodeDeleted) { - submitElectionTask(); - } - } - } - public Future start() { running.set(true); return submitElectionTask(); @@ -257,17 +153,14 @@ public void run() { if (!running.compareAndSet(true, false)) { return; } - LOG.info("Shutting down AuditorElector"); - if (myVote != null) { - try { - zkc.delete(myVote, -1); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("InterruptedException while deleting myVote: " + myVote, - ie); - } catch (KeeperException ke) { - LOG.error("Exception while deleting myVote:" + myVote, ke); - } + + try { + ledgerAuditorManager.close(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("InterruptedException while closing ledger auditor manager", ie); + } catch (Exception ke) { + LOG.error("Exception while closing ledger auditor manager", ke); } } }); @@ -288,59 +181,39 @@ public void run() { return; } try { - // creating my vote in zk. Vote format is 'V_numeric' - createMyVote(); - List children = zkc.getChildren(getVotePath(""), false); - - if (0 >= children.size()) { - throw new IllegalArgumentException( - "Atleast one bookie server should present to elect the Auditor!"); - } - - // sorting in ascending order of sequential number - Collections.sort(children, new ElectionComparator()); - String voteNode = StringUtils.substringAfterLast(myVote, - PATH_SEPARATOR); + ledgerAuditorManager.tryToBecomeAuditor(bookieId, e -> handleAuditorEvent(e)); - // starting Auditing service - if (children.get(AUDITOR_INDEX).equals(voteNode)) { - // update the auditor bookie id in the election path. This is - // done for debugging purpose - AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder() - .setBookieId(bookieId); - - zkc.setData(getVotePath(""), - builder.build().toString().getBytes(UTF_8), -1); - auditor = new Auditor(bookieId, conf, bkc, false, statsLogger); - auditor.start(); - } else { - // If not an auditor, will be watching to my predecessor and - // looking the previous node deletion. - Watcher electionWatcher = new ElectionWatcher(); - int myIndex = children.indexOf(voteNode); - int prevNodeIndex = myIndex - 1; - if (null == zkc.exists(getVotePath(PATH_SEPARATOR) - + children.get(prevNodeIndex), electionWatcher)) { - // While adding, the previous znode doesn't exists. - // Again going to election. - submitElectionTask(); - } - electionAttempts.inc(); - } - } catch (KeeperException e) { - LOG.error("Exception while performing auditor election", e); - submitShutdownTask(); + auditor = new Auditor(bookieId, conf, bkc, false, statsLogger); + auditor.start(); } catch (InterruptedException e) { LOG.error("Interrupted while performing auditor election", e); Thread.currentThread().interrupt(); submitShutdownTask(); - } catch (UnavailableException e) { - LOG.error("Ledger underreplication manager unavailable during election", e); + } catch (Exception e) { + LOG.error("Exception while performing auditor election", e); submitShutdownTask(); } } }; - return executor.submit(r); + try { + return executor.submit(r); + } catch (RejectedExecutionException e) { + LOG.debug("Executor was already closed"); + return CompletableFuture.completedFuture(null); + } + } + + private void handleAuditorEvent(LedgerAuditorManager.AuditorEvent e) { + switch (e) { + case SessionLost: + LOG.error("Lost ZK connection, shutting down"); + submitShutdownTask(); + break; + + case VoteWasDeleted: + submitElectionTask(); + break; + } } @VisibleForTesting @@ -348,33 +221,21 @@ Auditor getAuditor() { return auditor; } - /** - * Query zookeeper for the currently elected auditor. - * @return the bookie id of the current auditor - */ - public static BookieId getCurrentAuditor(ServerConfiguration conf, ZooKeeper zk) - throws KeeperException, InterruptedException, IOException { - String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/' - + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE; - - List children = zk.getChildren(electionRoot, false); - Collections.sort(children, new AuditorElector.ElectionComparator()); - if (children.size() < 1) { - return null; - } - String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX); - byte[] data = zk.getData(ledger, false, null); - AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder(); - TextFormat.merge(new String(data, UTF_8), builder); - AuditorVoteFormat v = builder.build(); - return BookieId.parse(v.getBookieId()); + public BookieId getCurrentAuditor() throws IOException, InterruptedException { + return ledgerAuditorManager.getCurrentAuditor(); } /** * Shutting down AuditorElector. */ public void shutdown() throws InterruptedException { + try { + ledgerAuditorManager.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + synchronized (this) { if (executor.isShutdown()) { return; @@ -413,30 +274,4 @@ public boolean isRunning() { public String toString() { return "AuditorElector for " + bookieId; } - - /** - * Compare the votes in the ascending order of the sequence number. Vote - * format is 'V_sequencenumber', comparator will do sorting based on the - * numeric sequence value. - */ - private static class ElectionComparator - implements Comparator, Serializable { - /** - * Return -1 if the first vote is less than second. Return 1 if the - * first vote is greater than second. Return 0 if the votes are equal. - */ - @Override - public int compare(String vote1, String vote2) { - long voteSeqId1 = getVoteSequenceId(vote1); - long voteSeqId2 = getVoteSequenceId(vote2); - int result = voteSeqId1 < voteSeqId2 ? -1 - : (voteSeqId1 > voteSeqId2 ? 1 : 0); - return result; - } - - private long getVoteSequenceId(String vote) { - String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX); - return Long.parseLong(voteId); - } - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java index e9ac4142f9b..76705b935ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java @@ -36,7 +36,6 @@ import org.apache.bookkeeper.http.service.ErrorHttpService; import org.apache.bookkeeper.http.service.HeartbeatService; import org.apache.bookkeeper.http.service.HttpEndpointService; -import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.AutoRecoveryMain; @@ -64,16 +63,12 @@ import org.apache.bookkeeper.server.http.service.TriggerGCService; import org.apache.bookkeeper.server.http.service.WhoIsAuditorService; import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; /** * Bookkeeper based implementation of HttpServiceProvider, * which provide bookkeeper services to handle http requests * from different http endpoints. - * - *

TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1332} */ @Slf4j public class BKHttpServiceProvider implements HttpServiceProvider { @@ -82,7 +77,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider { private final BookieServer bookieServer; private final AutoRecoveryMain autoRecovery; private final ServerConfiguration serverConf; - private final ZooKeeper zk; private final BookKeeperAdmin bka; private final ExecutorService executor; @@ -95,12 +89,6 @@ private BKHttpServiceProvider(BookieServer bookieServer, this.autoRecovery = autoRecovery; this.serverConf = serverConf; this.statsProvider = statsProvider; - String zkServers = ZKMetadataDriverBase.resolveZkServers(serverConf); - this.zk = ZooKeeperClient.newBuilder() - .connectString(zkServers) - .sessionTimeoutMs(serverConf.getZkTimeout()) - .build(); - ClientConfiguration clientConfiguration = new ClientConfiguration(serverConf); this.bka = new BookKeeperAdmin(clientConfiguration); @@ -115,9 +103,6 @@ public void close() throws IOException { if (bka != null) { bka.close(); } - if (zk != null) { - zk.close(); - } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); log.error("Interruption while closing BKHttpServiceProvider", ie); @@ -236,7 +221,7 @@ public HttpEndpointService provideHttpEndpointService(ApiType type) { case LIST_UNDER_REPLICATED_LEDGER: return new ListUnderReplicatedLedgerService(configuration, bookieServer); case WHO_IS_AUDITOR: - return new WhoIsAuditorService(configuration, zk); + return new WhoIsAuditorService(configuration, bka); case TRIGGER_AUDIT: return new TriggerAuditService(configuration, bka); case LOST_BOOKIE_RECOVERY_DELAY: diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java index 8bb78247afa..bafa932a3e3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java @@ -20,14 +20,13 @@ import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.http.HttpServer; import org.apache.bookkeeper.http.service.HttpEndpointService; import org.apache.bookkeeper.http.service.HttpServiceRequest; import org.apache.bookkeeper.http.service.HttpServiceResponse; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.replication.AuditorElector; -import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +40,12 @@ public class WhoIsAuditorService implements HttpEndpointService { static final Logger LOG = LoggerFactory.getLogger(WhoIsAuditorService.class); protected ServerConfiguration conf; - protected ZooKeeper zk; + protected BookKeeperAdmin bka; - public WhoIsAuditorService(ServerConfiguration conf, ZooKeeper zk) { + public WhoIsAuditorService(ServerConfiguration conf, BookKeeperAdmin bka) { checkNotNull(conf); this.conf = conf; - this.zk = zk; + this.bka = bka; } /* @@ -57,9 +56,9 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception { HttpServiceResponse response = new HttpServiceResponse(); if (HttpServer.Method.GET == request.getMethod()) { - BookieId bookieId = null; + BookieId bookieId; try { - bookieId = AuditorElector.getCurrentAuditor(conf, zk); + bookieId = bka.getCurrentAuditor(); if (bookieId == null) { response.setCode(HttpServer.StatusCode.NOT_FOUND); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java index 2aa12988474..853fca0b8f9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java @@ -21,18 +21,15 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import java.io.IOException; -import java.net.URI; +import lombok.Cleanup; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.replication.AuditorElector; import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; import org.apache.bookkeeper.tools.framework.CliFlags; import org.apache.bookkeeper.tools.framework.CliSpec; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +43,19 @@ public class WhoIsAuditorCommand extends BookieCommand { private static final String NAME = "whoisauditor"; private static final String DESC = "Print the node which holds the auditor lock."; + private BookKeeperAdmin bka; + public WhoIsAuditorCommand() { + this(null); + } + + public WhoIsAuditorCommand(BookKeeperAdmin bka) { super(CliSpec.newBuilder() .withName(NAME) .withDescription(DESC) .withFlags(new CliFlags()) .build()); + this.bka = bka; } @Override @@ -64,26 +68,22 @@ public boolean apply(ServerConfiguration conf, CliFlags cmdFlags) { } private boolean getAuditor(ServerConfiguration conf) - throws ConfigurationException, InterruptedException, IOException, KeeperException { - ZooKeeper zk = null; - try { - String metadataServiceUri = conf.getMetadataServiceUri(); - String zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataServiceUri)); - zk = ZooKeeperClient.newBuilder() - .connectString(zkServers) - .sessionTimeoutMs(conf.getZkTimeout()) - .build(); - BookieId bookieId = AuditorElector.getCurrentAuditor(conf, zk); - if (bookieId == null) { - LOG.info("No auditor elected"); - return false; - } - LOG.info("Auditor: " + bookieId); - } finally { - if (zk != null) { - zk.close(); - } + throws BKException, InterruptedException, IOException { + ClientConfiguration clientConfiguration = new ClientConfiguration(conf); + + BookieId bookieId; + if (this.bka != null) { + bookieId = bka.getCurrentAuditor(); + } else { + @Cleanup + BookKeeperAdmin bka = new BookKeeperAdmin(clientConfiguration); + bookieId = bka.getCurrentAuditor(); + } + if (bookieId == null) { + LOG.info("No auditor elected"); + return false; } + LOG.info("Auditor: " + bookieId); return true; } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java index e361294cb99..c692e1e1c35 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener; +import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -53,13 +54,20 @@ public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsPro this.statsProvider = statsProvider; } + public BookKeeperTestClient(ClientConfiguration conf, ZooKeeper zkc) + throws IOException, InterruptedException, BKException { + super(conf, zkc, null, new UnpooledByteBufAllocator(false), + NullStatsLogger.INSTANCE, null, null, null); + this.statsProvider = statsProvider; + } + public BookKeeperTestClient(ClientConfiguration conf) throws InterruptedException, BKException, IOException { - this(conf, null); + this(conf, (TestStatsProvider) null); } public ZooKeeper getZkHandle() { - return super.getZkHandle(); + return ((ZKMetadataClientDriver) metadataDriver).getZk(); } public ClientConfiguration getConf() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java index d26b22625c3..75f3d8eeb9f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java @@ -54,11 +54,6 @@ public class MockBookKeeper extends BookKeeper { final ExecutorService executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("mock-bookkeeper")); final ZooKeeper zkc; - @Override - public ZooKeeper getZkHandle() { - return zkc; - } - @Override public ClientConfiguration getConf() { return super.getConf(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java index 93077afac3e..d2f724ed133 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java @@ -160,7 +160,7 @@ private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boole ClientConfiguration conf = new ClientConfiguration(); conf.setMetadataServiceUri(metadataServiceUri); - try (BookKeeper bkc = new BookKeeper(conf, zk)) { + try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf, zk)) { LedgerHandle lh; try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java index 66c46501cf4..08eddc122ef 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java @@ -24,9 +24,11 @@ import static org.junit.Assert.assertEquals; import com.google.common.util.concurrent.UncheckedExecutionException; +import lombok.Cleanup; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerAuditorManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; import org.apache.bookkeeper.net.BookieId; @@ -77,7 +79,9 @@ private void testAuditingDuringRollingRestart(LedgerManagerFactory mFactory) thr underReplicationManager.pollLedgerToRereplicate(), -1); underReplicationManager.disableLedgerReplication(); - BookieId auditor = AuditorElector.getCurrentAuditor(baseConf, zkc); + @Cleanup + LedgerAuditorManager lam = mFactory.newLedgerAuditorManager(); + BookieId auditor = lam.getCurrentAuditor(); ServerConfiguration conf = killBookie(auditor); Thread.sleep(2000); startBookie(conf); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java index 23c7a7b862d..e5fbd628009 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java @@ -21,14 +21,14 @@ package org.apache.bookkeeper.replication; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import org.apache.bookkeeper.bookie.Bookie; +import java.io.IOException; import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; @@ -98,8 +98,22 @@ public void testAutoRecoverySessionLoss() throws Exception { */ ZKMetadataClientDriver zkMetadataClientDriver1 = startAutoRecoveryMain(main1); ZooKeeper zk1 = zkMetadataClientDriver1.getZk(); + + // Wait until auditor gets elected + for (int i = 0; i < 10; i++) { + try { + if (main1.auditorElector.getCurrentAuditor() != null) { + break; + } else { + Thread.sleep(1000); + } + } catch (IOException e) { + Thread.sleep(1000); + } + } + BookieId currentAuditor = main1.auditorElector.getCurrentAuditor(); + assertNotNull(currentAuditor); Auditor auditor1 = main1.auditorElector.getAuditor(); - BookieId currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(0), zk1); assertTrue("Current Auditor should be AR1", currentAuditor.equals(Bookie.getBookieId(bsConfs.get(0)))); assertTrue("Auditor of AR1 should be running", auditor1.isRunning()); @@ -141,19 +155,10 @@ public void testAutoRecoverySessionLoss() throws Exception { Thread.sleep(1000); } - /* - * since zk1 and zk2 sessions are expired, the 'myVote' ephemeral nodes - * of AR1 and AR2 should not be existing anymore. - */ - assertTrue("AR1's vote node should not be existing", - zk3.exists(main1.auditorElector.getMyVote(), false) == null); - assertTrue("AR2's vote node should not be existing", - zk3.exists(main2.auditorElector.getMyVote(), false) == null); - /* * the AR3 should be current auditor. */ - currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(2), zk3); + currentAuditor = main3.auditorElector.getCurrentAuditor(); assertTrue("Current Auditor should be AR3", currentAuditor.equals(Bookie.getBookieId(bsConfs.get(2)))); auditor3 = main3.auditorElector.getAuditor(); assertTrue("Auditor of AR3 should be running", auditor3.isRunning()); @@ -180,29 +185,12 @@ public void testAutoRecoverySessionLoss() throws Exception { * start autoRecoveryMain and make sure all its components are running and * myVote node is existing */ - ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain) - throws InterruptedException, KeeperException, UnavailableException { + ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain) { autoRecoveryMain.start(); ZKMetadataClientDriver metadataClientDriver = (ZKMetadataClientDriver) autoRecoveryMain.bkc .getMetadataClientDriver(); - ZooKeeper zk = metadataClientDriver.getZk(); - String myVote; - for (int i = 0; i < 10; i++) { - if (autoRecoveryMain.auditorElector.isRunning() && autoRecoveryMain.replicationWorker.isRunning() - && autoRecoveryMain.isAutoRecoveryRunning()) { - myVote = autoRecoveryMain.auditorElector.getMyVote(); - if (myVote != null) { - if (null != zk.exists(myVote, false)) { - break; - } - } - } - Thread.sleep(100); - } assertTrue("autoRecoveryMain components should be running", autoRecoveryMain.auditorElector.isRunning() && autoRecoveryMain.replicationWorker.isRunning() && autoRecoveryMain.isAutoRecoveryRunning()); - assertTrue("autoRecoveryMain's vote node should be existing", - zk.exists(autoRecoveryMain.auditorElector.getMyVote(), false) != null); return metadataClientDriver; } } diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java index d21856a4d03..f0e74fc4edc 100644 --- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java @@ -27,6 +27,8 @@ import java.net.URI; import java.util.UUID; +import lombok.Cleanup; +import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieId; @@ -75,10 +77,6 @@ public void setup() throws Exception { BookieId bookieId = BookieId.parse(UUID.randomUUID().toString()); - PowerMockito.mockStatic(AuditorElector.class); - PowerMockito.when(AuditorElector.getCurrentAuditor(eq(conf), eq(zk))) - .thenReturn(bookieId); - PowerMockito.mockStatic(CommandHelpers.class); PowerMockito.when(CommandHelpers .getBookieSocketAddrStringRepresentation( @@ -86,8 +84,11 @@ public void setup() throws Exception { } @Test - public void testCommand() { - WhoIsAuditorCommand cmd = new WhoIsAuditorCommand(); + public void testCommand() throws Exception { + @Cleanup + BookKeeperAdmin bka = mock(BookKeeperAdmin.class); + when(bka.getCurrentAuditor()).thenReturn(BookieId.parse("127.0.0.1:3181")); + WhoIsAuditorCommand cmd = new WhoIsAuditorCommand(bka); Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" })); } }