Skip to content

Commit

Permalink
Eliminate direct ZK access in ScanAndCompareGarbageCollector (apache#…
Browse files Browse the repository at this point in the history
…2833)

* Eliminate direct ZK access in ScanAndCompareGarbageCollector

* Removed unused imports

* Fixed zk ACLs

* Addressed comments

* Fixed checkstyle
  • Loading branch information
merlimat authored Oct 18, 2021
1 parent a9b576d commit a4afaa4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
Expand All @@ -36,22 +37,22 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,41 +69,36 @@
* <b>globalActiveLedgers</b>, do garbage collection on them.
* </ul>
* </p>
*
* <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1331}
*/
public class ScanAndCompareGarbageCollector implements GarbageCollector {

static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
static final int MAX_CONCURRENT_ZK_REQUESTS = 1000;
static final int MAX_CONCURRENT_METADATA_REQUESTS = 1000;

private final LedgerManager ledgerManager;
private final CompactableLedgerStorage ledgerStorage;
private final ServerConfiguration conf;
private final BookieId selfBookieAddress;
private ZooKeeper zk = null;
private boolean enableGcOverReplicatedLedger;
private final long gcOverReplicatedLedgerIntervalMillis;
private long lastOverReplicatedLedgerGcTimeMillis;
private final String zkServers;
private final String zkLedgersRootPath;
private final boolean verifyMetadataOnGc;
private int activeLedgerCounter;
private StatsLogger statsLogger;

public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage,
ServerConfiguration conf, StatsLogger statsLogger) throws IOException {
this.ledgerManager = ledgerManager;
this.ledgerStorage = ledgerStorage;
this.conf = conf;
this.statsLogger = statsLogger;
this.selfBookieAddress = BookieImpl.getBookieId(conf);

this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis();
this.lastOverReplicatedLedgerGcTimeMillis = System.currentTimeMillis();
if (gcOverReplicatedLedgerIntervalMillis > 0) {
this.enableGcOverReplicatedLedger = true;
}
this.zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
this.zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval="
+ gcOverReplicatedLedgerIntervalMillis);

Expand Down Expand Up @@ -133,8 +129,6 @@ public void gc(GarbageCleaner garbageCleaner) {
boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime
- lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis);
if (checkOverreplicatedLedgers) {
zk = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(conf.getZkTimeout()).build();
// remove all the overreplicated ledgers from the local bookie
Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner);
if (overReplicatedLedgers.isEmpty()) {
Expand Down Expand Up @@ -216,37 +210,36 @@ public void gc(GarbageCleaner garbageCleaner) {
} catch (Throwable t) {
// ignore exception, collecting garbage next time
LOG.warn("Exception when iterating over the metadata", t);
} finally {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error closing zk session", e);
}
zk = null;
}
}
}

private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner)
throws InterruptedException, KeeperException {
final List<ACL> zkAcls = ZkUtils.getACLs(conf);
throws Exception {
final Set<Long> overReplicatedLedgers = Sets.newHashSet();
final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_METADATA_REQUESTS);
final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size());
// instantiate zookeeper client to initialize ledger manager

@Cleanup
MetadataBookieDriver metadataDriver = instantiateMetadataDriver(conf, statsLogger);

@Cleanup
LedgerManagerFactory lmf = metadataDriver.getLedgerManagerFactory();

@Cleanup
LedgerUnderreplicationManager lum = lmf.newLedgerUnderreplicationManager();

for (final Long ledgerId : bkActiveledgers) {
try {
// check if the ledger is being replicated already by the replication worker
if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) {
if (lum.isLedgerBeingReplicated(ledgerId)) {
latch.countDown();
continue;
}
// we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is
// already being checked for deletion, since that might change the ledger ensemble to include the
// current bookie again and, in that case, we cannot remove the ledger from local storage
ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId,
zkAcls);
lum.acquireUnderreplicatedLedger(ledgerId);
semaphore.acquire();
ledgerManager.readLedgerMetadata(ledgerId)
.whenComplete((metadata, exception) -> {
Expand Down Expand Up @@ -274,8 +267,7 @@ private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final G
semaphore.release();
latch.countDown();
try {
ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(
zk, zkLedgersRootPath, ledgerId);
lum.releaseUnderreplicatedLedger(ledgerId);
} catch (Throwable t) {
LOG.error("Exception when removing underreplicated lock for ledger {}",
ledgerId, t);
Expand All @@ -291,4 +283,23 @@ private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final G
bkActiveledgers.removeAll(overReplicatedLedgers);
return overReplicatedLedgers;
}

private static MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf, StatsLogger statsLogger)
throws BookieException {
try {
String metadataServiceUriStr = conf.getMetadataServiceUri();
MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUriStr));
driver.initialize(
conf,
() -> {
},
statsLogger);
return driver;
} catch (MetadataException me) {
throw new BookieException.MetadataStoreException("Failed to initialize metadata bookie driver", me);
} catch (ConfigurationException e) {
throw new BookieException.BookieIllegalOpException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ default void markLedgerUnderreplicated(long ledgerId, String missingReplica) thr
ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
}

/**
* Check whether the ledger is being replicated by any bookie.
*/
boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException;

/**
* Mark a ledger as underreplicated with missing bookies. The replication should then
* check which fragements are underreplicated and rereplicate them.
Expand Down Expand Up @@ -105,6 +110,7 @@ long getLedgerToRereplicate()
long pollLedgerToRereplicate()
throws ReplicationException.UnavailableException;

void acquireUnderreplicatedLedger(long ledgerId) throws ReplicationException;

/**
* Release a previously acquired ledger. This allows others to acquire the ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,13 @@ public void process(WatchedEvent e) {
/**
* Check whether the ledger is being replicated by any bookie.
*/
public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
throws KeeperException,
InterruptedException {
return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
@Override
public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationException {
try {
return zkc.exists(getUrLedgerLockZnode(urLockPath, ledgerId), false) != null;
} catch (Exception e) {
throw new ReplicationException.UnavailableException("Failed to check ledger lock", e);
}
}

/**
Expand All @@ -786,13 +789,15 @@ public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedg
LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
}

/**
* Release the underreplicated ledger lock if it exists.
*/
public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
throws InterruptedException, KeeperException {
if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
@Override
public void acquireUnderreplicatedLedger(long ledgerId)
throws ReplicationException {
try {
acquireUnderreplicatedLedgerLock(zkc, getUrLedgerLockZnode(urLockPath, ledgerId), ledgerId,
ZkUtils.getACLs(conf));
} catch (Exception e) {
throw new ReplicationException.UnavailableException(
"Failed to acquire underreplicated ledger lock for " + ledgerId, e);
}
}

Expand Down

0 comments on commit a4afaa4

Please sign in to comment.