From 4d5af1b0c4e193c893f023f8b9e053b1794049fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Gracia?= Date: Mon, 11 Oct 2021 15:35:02 +0200 Subject: [PATCH] Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779) --- .../apache/bookkeeper/bookie/EntryLogger.java | 30 ++++++++ .../bookie/GarbageCollectorThread.java | 28 ++++++-- .../bookkeeper/bookie/CompactionTest.java | 70 +++++++++++++++++++ .../org/apache/bookkeeper/util/TestUtils.java | 36 +++++++--- 4 files changed, 147 insertions(+), 17 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java index 504adfa4214..49a9ca40594 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java @@ -482,6 +482,27 @@ long getLeastUnflushedLogId() { return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId(); } + /** + * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector + * process needs to look beyond the least unflushed entry log file, as there may be entry logs + * ready to be garbage collected. + * + * @return last entry log id created. + */ + long getLastLogId() { + return recentlyCreatedEntryLogsStatus.getLastLogId(); + } + + /** + * Returns whether the current log id exists and has been rotated already. + * + * @param entryLogId EntryLog id to check. + * @return Whether the given entryLogId exists and has been rotated. + */ + boolean isFlushedEntryLog(Long entryLogId) { + return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId); + } + long getPreviousAllocatedEntryLogId() { return entryLoggerAllocator.getPreallocatedLogId(); } @@ -1249,5 +1270,14 @@ synchronized void flushRotatedEntryLog(Long entryLogId) { synchronized long getLeastUnflushedLogId() { return leastUnflushedLogId; } + + synchronized long getLastLogId() { + return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0; + } + + synchronized boolean isFlushedEntryLog(Long entryLogId) { + return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId) + || entryLogId < leastUnflushedLogId; + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index fb548905ff7..cafbf53eef5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import lombok.Getter; import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner; @@ -583,12 +584,15 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) { * @throws IOException */ protected Map extractMetaFromEntryLogs(Map entryLogMetaMap) { - // Extract it for every entry log except for the current one. - // Entry Log ID's are just a long value that starts at 0 and increments - // by 1 when the log fills up and we roll to a new one. - long curLogId = entryLogger.getLeastUnflushedLogId(); + // Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll + // to a new one. We scan entry logs as follows: + // - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed). + // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id. + Supplier finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() : + entryLogger.getLeastUnflushedLogId(); boolean hasExceptionWhenScan = false; - for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) { + boolean increaseScannedLogId = true; + for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) { // Comb the current entry log file if it has not already been extracted. if (entryLogMetaMap.containsKey(entryLogId)) { continue; @@ -600,6 +604,15 @@ protected Map extractMetaFromEntryLogs(Map extractMetaFromEntryLogs(Map 0); } + @Test + public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception { + // restart bookies + restartBookies(c-> { + c.setMajorCompactionThreshold(0.0f); + c.setGcWaitTime(60000); + c.setMinorCompactionInterval(120000); + c.setMajorCompactionInterval(240000); + c.setForceAllowCompaction(true); + c.setEntryLogPerLedgerEnabled(true); + return c; + }); + + // prepare data + LedgerHandle[] lhs = prepareData(3, false); + + for (LedgerHandle lh : lhs) { + lh.close(); + } + + long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime; + long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime; + assertFalse(getGCThread().enableMajorCompaction); + assertTrue(getGCThread().enableMinorCompaction); + + // remove ledgers 1 and 2 + bkc.deleteLedger(lhs[1].getId()); + bkc.deleteLedger(lhs[2].getId()); + + // Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions. + while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) { + TimeUnit.MILLISECONDS.sleep(100); + } + + LOG.info("Finished deleting the ledgers contains most entries."); + getGCThread().triggerGC(true, false, false).get(); + + assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime); + assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime); + + // At this point, we have the following state of ledgers end entry logs: + // L0 (not deleted) -> E0 (un-flushed): Entry log should exist. + // L1 (deleted) -> E1 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC. + // L2 (deleted) -> E2 (flushed): Entry log should have been garbage collected. + // E3 (flushed): Entry log should have been garbage collected. + // E4 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC. + assertTrue("Not found entry log files [0, 1, 4].log that should not have been compacted in: " + + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0, 1, 4)); + assertTrue("Found entry log files [2, 3].log that should have been compacted in ledgerDirectory: " + + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2, 3)); + + // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm + // should consider it for deletion. + getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L); + getGCThread().triggerGC(true, false, false).get(); + assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: " + + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 1)); + + // Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed). + bkc.deleteLedger(lhs[0].getId()); + getGCThread().triggerGC(true, false, false).get(); + assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: " + + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0)); + getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L); + getGCThread().triggerGC(true, false, false).get(); + assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: " + + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 0)); + } + @Test public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception { // prepare data diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java index 462d4729bd4..27f1abbb96a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.util; import java.io.File; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -49,9 +50,31 @@ public static String buildStatsCounterPathFromBookieID(BookieId bookieId) { return bookieId.toString().replace('.', '_').replace('-', '_').replace(":", "_"); } + public static boolean hasAllLogFiles(File ledgerDirectory, Integer... logsId) { + Set logs = findEntryLogFileIds(ledgerDirectory); + return logs.containsAll(Arrays.asList(logsId)); + } + + public static boolean hasNoneLogFiles(File ledgerDirectory, Integer... logsId) { + Set logs = findEntryLogFileIds(ledgerDirectory); + return Arrays.stream(logsId).noneMatch(logs::contains); + } + public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) { - boolean result = partial ? false : true; - Set logs = new HashSet(); + boolean result = !partial; + Set logs = findEntryLogFileIds(ledgerDirectory); + for (Integer logId : logsId) { + boolean exist = logs.contains(logId); + if ((partial && exist) + || (!partial && !exist)) { + return !result; + } + } + return result; + } + + private static Set findEntryLogFileIds(File ledgerDirectory) { + Set logs = new HashSet<>(); for (File file : BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) { if (file.isFile()) { String name = file.getName(); @@ -61,14 +84,7 @@ public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer logs.add(Integer.parseInt(name.split("\\.")[0], 16)); } } - for (Integer logId : logsId) { - boolean exist = logs.contains(logId); - if ((partial && exist) - || (!partial && !exist)) { - return !result; - } - } - return result; + return logs; } public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {