Skip to content

Commit

Permalink
Remove direct ZK access for Auditor (#2842)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Oct 23, 2021
1 parent 577121f commit c01ed32
Show file tree
Hide file tree
Showing 18 changed files with 519 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
Expand Down Expand Up @@ -635,6 +634,11 @@ public LedgerManager getLedgerManager() {
return ledgerManager;
}

@VisibleForTesting
public LedgerManagerFactory getLedgerManagerFactory() {
return ledgerManagerFactory;
}

@VisibleForTesting
LedgerManager getUnderlyingLedgerManager() {
return ((CleanupLedgerManager) ledgerManager).getUnderlying();
Expand Down Expand Up @@ -743,10 +747,6 @@ public org.apache.bookkeeper.client.api.DigestType toApiDigestType() {
}
}

ZooKeeper getZkHandle() {
return ((ZKMetadataClientDriver) metadataDriver).getZk();
}

protected ClientConfiguration getConf() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
import org.apache.bookkeeper.meta.LedgerAuditorManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
Expand All @@ -78,7 +79,6 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
Expand Down Expand Up @@ -116,6 +116,8 @@ public class BookKeeperAdmin implements AutoCloseable {
*/
private LedgerUnderreplicationManager underreplicationManager;

private LedgerAuditorManager ledgerAuditorManager;

/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
Expand Down Expand Up @@ -201,6 +203,14 @@ public void close() throws InterruptedException, BKException {
if (ownsBK) {
bkc.close();
}

if (ledgerAuditorManager != null) {
try {
ledgerAuditorManager.close();
} catch (Exception e) {
throw new BKException.MetaStoreException(e);
}
}
}

/**
Expand Down Expand Up @@ -1404,6 +1414,14 @@ private LedgerUnderreplicationManager getUnderreplicationManager()
return underreplicationManager;
}

private LedgerAuditorManager getLedgerAuditorManager()
throws IOException, InterruptedException {
if (ledgerAuditorManager == null) {
ledgerAuditorManager = mFactory.newLedgerAuditorManager();
}
return ledgerAuditorManager;
}

/**
* Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper.
*
Expand Down Expand Up @@ -1455,8 +1473,7 @@ public void triggerAudit()
throw new UnavailableException("Autorecovery is disabled. So giving up!");
}

BookieId auditorId =
AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle());
BookieId auditorId = getLedgerAuditorManager().getCurrentAuditor();
if (auditorId == null) {
LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
Expand Down Expand Up @@ -1709,4 +1726,8 @@ public CompletableFuture<AvailabilityOfEntriesOfLedger> asyncGetListOfEntriesOfL
long ledgerId) {
return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId);
}

public BookieId getCurrentAuditor() throws IOException, InterruptedException {
return getLedgerAuditorManager().getCurrentAuditor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.List;

import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -87,4 +89,10 @@ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
return new ZkLedgerUnderreplicationManager(conf, zk);
}

@Override
public LedgerAuditorManager newLedgerAuditorManager() {
ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.meta;

import java.io.IOException;
import java.util.function.Consumer;
import org.apache.bookkeeper.net.BookieId;

/**
* Interface to handle the ledger auditor election.
*/
public interface LedgerAuditorManager extends AutoCloseable {

/**
* Events that can be triggered by the LedgerAuditorManager.
*/
enum AuditorEvent {
SessionLost,
VoteWasDeleted,
}

/**
* Try to become the auditor. If there's already another auditor, it will wait until this
* current instance has become the auditor.
*
* @param bookieId the identifier for current bookie
* @param listener listener that will receive AuditorEvent notifications
* @return
*/
void tryToBecomeAuditor(String bookieId, Consumer<AuditorEvent> listener) throws IOException, InterruptedException;

/**
* Return the information regarding the current auditor.
* @return
*/
BookieId getCurrentAuditor() throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ LedgerManagerFactory initialize(AbstractConfiguration conf,
LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;


/**
* Return a ledger auditor manager, which is used to
* coordinate the auto-recovery process.
*
* @return ledger auditor manager
* @see LedgerAuditorManager
*/
LedgerAuditorManager newLedgerAuditorManager() throws IOException, InterruptedException;

/**
* Format the ledger metadata for LedgerManager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.List;

import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -78,6 +80,12 @@ public LedgerIdGenerator newLedgerIdGenerator() {
zkAcls);
}

@Override
public LedgerAuditorManager newLedgerAuditorManager() {
ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
}

@Override
public LedgerManager newLedgerManager() {
return new LegacyHierarchicalLedgerManager(conf, zk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.metastore.MSException;
import org.apache.bookkeeper.metastore.MSWatchedEvent;
Expand All @@ -60,6 +61,7 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
Expand Down Expand Up @@ -815,4 +817,10 @@ public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, Lay
zkServers, zkLedgersRootPath);
return true;
}

@Override
public LedgerAuditorManager newLedgerAuditorManager() {
ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
}
}
Loading

0 comments on commit c01ed32

Please sign in to comment.