From 97818f5123999396e66f5246420d3c7e3d25f53d Mon Sep 17 00:00:00 2001 From: frankxieke Date: Mon, 6 Sep 2021 09:07:47 +0800 Subject: [PATCH] Add metrics and internal command for QueryAutoRecoveryStatus, including underReplicatedSize metrics,read/write latency, internal command for querying recovering ledgersInfo (#2768) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: Current AutoRecovery does not have enough metrics or stat command that would help to monitor and debug. So we need to add metrics and admin stat interface to monitor the process of AutoRecovery.  For example, current recovering ledgerInfo and under replicated size, read/write latency in recovering. Changes: And QueryAutoRecoveryStatus command and under replicated size metric , read/write latency metric in recovering Documentation: Need doc. --- .../apache/bookkeeper/bookie/BookieShell.java | 40 ++++ .../client/LedgerFragmentReplicator.java | 31 ++- .../bookkeeper/replication/Auditor.java | 20 ++ .../replication/ReplicationStats.java | 3 + .../QueryAutoRecoveryStatusCommand.java | 151 +++++++++++++++ .../QueryAutoRecoveryStatusCommandTest.java | 179 ++++++++++++++++++ 6 files changed, 420 insertions(+), 4 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java create mode 100644 tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index c16efca0124..461623e1d0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand; +import org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand; @@ -155,6 +156,7 @@ public class BookieShell implements Tool { static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage"; static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index"; static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file"; + static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus"; // cookie commands static final String CMD_CREATE_COOKIE = "cookie_create"; @@ -1344,6 +1346,43 @@ int runCmd(CommandLine cmdLine) throws Exception { } } + + /** + * Command to query autorecovery status. + */ + class QueryAutoRecoveryStatusCmd extends MyCommand { + Options opts = new Options(); + + public QueryAutoRecoveryStatusCmd() { + super(CMD_QUERY_AUTORECOVERY_STATUS); + } + + @Override + Options getOptions() { + return opts; + } + + @Override + String getDescription() { + return "Query the autorecovery status"; + } + + @Override + String getUsage() { + return "queryautorecoverystatus"; + } + + @Override + int runCmd(CommandLine cmdLine) throws Exception { + final boolean verbose = cmdLine.hasOption("verbose"); + QueryAutoRecoveryStatusCommand.QFlags flags = new QueryAutoRecoveryStatusCommand.QFlags() + .verbose(verbose); + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + cmd.apply(bkConf, flags); + return 0; + } + } + /** * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store. */ @@ -2153,6 +2192,7 @@ int runCmd(CommandLine cmdLine) throws Exception { commands.put(CMD_READJOURNAL, new ReadJournalCmd()); commands.put(CMD_LASTMARK, new LastMarkCmd()); commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd()); + commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new QueryAutoRecoveryStatusCmd()); commands.put(CMD_LISTBOOKIES, new ListBookiesCmd()); commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd()); commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index cd8b50875ac..fb6259d6bc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -24,10 +24,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN; -import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; - +import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;; +import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;; +import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; import io.netty.buffer.Unpooled; - import java.util.Enumeration; import java.util.HashSet; import java.util.Iterator; @@ -35,11 +35,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.meta.LedgerManager; @@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,17 @@ public class LedgerFragmentReplicator { help = "The distribution of size of entries written by the replicator" ) private final OpStatsLogger numBytesWritten; + @StatsDoc( + name = READ_DATA_LATENCY, + help = "The distribution of latency of read entries by the replicator" + ) + private final OpStatsLogger readDataLatency; + @StatsDoc( + name = WRITE_DATA_LATENCY, + help = "The distribution of latency of write entries by the replicator" + ) + private final OpStatsLogger writeDataLatency; + public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) { this.bkc = bkc; @@ -98,6 +110,8 @@ public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) { numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ); numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN); numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN); + readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY); + writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY); } public LedgerFragmentReplicator(BookKeeper bkc) { @@ -334,6 +348,8 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob } } }; + + long startReadEntryTime = MathUtils.nowInNano(); /* * Read the ledger entry using the LedgerHandle. This will allow us to * read the entry from one of the other replicated bookies other than @@ -350,6 +366,10 @@ public void readComplete(int rc, LedgerHandle lh, ledgerFragmentEntryMcb.processResult(rc, null, null); return; } + + readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime), + TimeUnit.NANOSECONDS); + /* * Now that we've read the ledger entry, write it to the new * bookie we've selected. @@ -364,10 +384,13 @@ public void readComplete(int rc, LedgerHandle lh, lh.getLastAddConfirmed(), entry.getLength(), Unpooled.wrappedBuffer(data, 0, data.length)); for (BookieId newBookie : newBookies) { + long startWriteEntryTime = MathUtils.nowInNano(); bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, false, WriteFlag.NONE); + writeDataLatency.registerSuccessfulEvent( + MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); } toSend.release(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index f245614625f..064c7fc5177 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -38,6 +38,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS; import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME; +import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE; import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE; import static org.apache.bookkeeper.util.SafeRunnable.safeRun; @@ -71,6 +72,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable { help = "the distribution of num under_replicated ledgers on each auditor run" ) private final OpStatsLogger numUnderReplicatedLedger; + + @StatsDoc( + name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE, + help = "the distribution of under_replicated ledgers total size on each auditor run" + ) + private final OpStatsLogger underReplicatedLedgerTotalSize; @StatsDoc( name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE, help = "the latency distribution of publishing under replicated ledgers for lost bookies" @@ -341,6 +349,7 @@ public Auditor(final String bookieIdentifier, this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0); numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS); + underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE); uRLPublishTimeForLostBookies = this.statsLogger .getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE); bookieToLedgersMapCreationTime = this.statsLogger @@ -1131,6 +1140,17 @@ private CompletableFuture publishSuspectedLedgersAsync(Collection mis } LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies); numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size()); + LongAdder underReplicatedSize = new LongAdder(); + FutureUtils.processList( + Lists.newArrayList(ledgers), + ledgerId -> + ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + underReplicatedSize.add(metadata.getValue().getLength()); + } + }), null); + underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue()); + return FutureUtils.processList( Lists.newArrayList(ledgers), ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index 6ec9f4918a7..b0bbe47d95d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -30,6 +30,7 @@ public interface ReplicationStats { String AUDITOR_SCOPE = "auditor"; String ELECTION_ATTEMPTS = "election_attempts"; String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS"; + String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE = "UNDER_REPLICATED_LEDGERS_TOTAL_SIZE"; String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE"; String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME"; String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME"; @@ -58,6 +59,8 @@ public interface ReplicationStats { String NUM_BYTES_READ = "NUM_BYTES_READ"; String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN"; String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN"; + String READ_DATA_LATENCY = "READ_DATA_LATENCY"; + String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY"; String REPLICATE_EXCEPTION = "exceptions"; String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER"; String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java new file mode 100644 index 00000000000..0f86a2d2ec7 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory; +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; +import org.apache.bookkeeper.tools.framework.CliFlags; +import org.apache.bookkeeper.tools.framework.CliSpec; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Command to Query current auto recovery status. + */ +public class QueryAutoRecoveryStatusCommand + extends BookieCommand { + static final Logger LOG = LoggerFactory. + getLogger(QueryAutoRecoveryStatusCommand.class); + private static final String NAME = "queryautorecoverystatus"; + private static final String DESC = "Query autorecovery status."; + + public QueryAutoRecoveryStatusCommand() { + super(CliSpec.newBuilder() + .withName(NAME) + .withDescription(DESC) + .withFlags(new QFlags()) + .build()); + } + + @Override + public boolean apply(ServerConfiguration conf, QFlags cmdFlags) { + try { + return handler(conf, cmdFlags); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } + + /** + * Flags for list under replicated command. + */ + @Accessors(fluent = true) + @Setter + public static class QFlags extends CliFlags{ + @Parameter(names = {"-v", "--verbose"}, description = "list recovering detailed ledger info") + private Boolean verbose = false; + } + + private static class LedgerRecoverInfo { + Long ledgerId; + String bookieId; + LedgerRecoverInfo(Long ledgerId, String bookieId) { + this.ledgerId = ledgerId; + this.bookieId = bookieId; + } + } + + /* + Print Message format is like this: + + CurrentRecoverLedgerInfo: + LedgerId: BookieId: LedgerSize:(detail) + LedgerId: BookieId: LedgerSize:(detail) + */ + public boolean handler(ServerConfiguration conf, QFlags flag) throws Exception { + runFunctionWithLedgerManagerFactory(conf, mFactory -> { + LedgerUnderreplicationManager underreplicationManager; + LedgerManager ledgerManager = mFactory.newLedgerManager(); + List ledgerList = new LinkedList<>(); + try { + underreplicationManager = mFactory.newLedgerUnderreplicationManager(); + } catch (KeeperException | ReplicationException.CompatibilityException e) { + throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e); + } + Iterator iter = underreplicationManager.listLedgersToRereplicate(null); + while (iter.hasNext()) { + UnderreplicatedLedger underreplicatedLedger = iter.next(); + long urLedgerId = underreplicatedLedger.getLedgerId(); + try { + String replicationWorkerId = underreplicationManager + .getReplicationWorkerIdRereplicatingLedger(urLedgerId); + if (replicationWorkerId != null) { + ledgerList.add(new LedgerRecoverInfo(urLedgerId, replicationWorkerId)); + } + } catch (ReplicationException.UnavailableException e) { + LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId, + e.getMessage()); + } + } + + LOG.info("CurrentRecoverLedgerInfo:"); + if (!flag.verbose) { + for (int i = 0; i < ledgerList.size(); i++) { + LOG.info("\tLedgerId:{}\tBookieId:{}", ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId); + } + } else { + for (int i = 0; i < ledgerList.size(); i++) { + LedgerRecoverInfo info = ledgerList.get(i); + ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}", + info.ledgerId, info.bookieId, metadata.getValue().getLength()); + } else { + LOG.error("Unable to read the ledger: {} information", info.ledgerId); + throw new UncheckedExecutionException(exception); + } + }); + } + } + if (ledgerList.size() == 0) { + // NO ledger is being auto recovering + LOG.info("\t No Ledger is being recovered."); + } + return null; + }); + return true; + } +} diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java new file mode 100644 index 00000000000..ba92c8bd60b --- /dev/null +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Lists; +import java.lang.reflect.Constructor; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerMetadataBuilder; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieAddressResolver; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; +import org.apache.bookkeeper.tools.cli.helpers.CommandHelpers; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + + + +/** + * Unit test for {@link QueryAutoRecoveryStatusCommand}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ QueryAutoRecoveryStatusCommand.class, ZKMetadataDriverBase.class, ZooKeeperClient.class, + CommandHelpers.class, MetadataDrivers.class +}) +public class QueryAutoRecoveryStatusCommandTest extends BookieCommandTestBase { + public QueryAutoRecoveryStatusCommandTest() { + super(3, 0); + } + LedgerUnderreplicationManager underreplicationManager; + + @Override + public void setup() throws Exception { + super.setup(); + BookieId bookieId = BookieId.parse(UUID.randomUUID().toString()); + LedgerManagerFactory ledgerManagerFactory = mock(LedgerManagerFactory.class); + + PowerMockito.mockStatic(MetadataDrivers.class); + PowerMockito.doAnswer(invocationOnMock -> { + Function function = invocationOnMock.getArgument(1); + function.apply(ledgerManagerFactory); + return true; + }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class), + any(Function.class)); + + LedgerManager ledgerManager = mock(LedgerManager.class); + underreplicationManager = mock(LedgerUnderreplicationManager.class); + + when(ledgerManagerFactory.newLedgerManager()).thenReturn(ledgerManager); + when(ledgerManagerFactory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager); + + List ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.2", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.3", 1234).toBookieId()); + LedgerMetadata metadata = LedgerMetadataBuilder.create() + .withId(11112233) + .withClosedState() + .withLength(100000999) + .withLastEntryId(2000011) + .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2) + .withPassword("passwd".getBytes()) + .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType()) + .newEnsembleEntry(0L, ensemble).build(); + CompletableFuture> promise = new CompletableFuture<>(); + Versioned vmeta = new Versioned(metadata, new LongVersion(1000)); + promise.complete(vmeta); + + when(ledgerManager.readLedgerMetadata(1)).thenReturn(promise); + when(ledgerManager.readLedgerMetadata(33232)).thenReturn(promise); + + Constructor constructor = UnderreplicatedLedger.class. + getDeclaredConstructor(long.class); + constructor.setAccessible(true); + final Queue queue = new LinkedList(); + queue.add("1111"); + Iterator iter = new Iterator() { + @Override + public boolean hasNext() { + if (queue.size() > 0) { + queue.remove(); + try { + curBatch.add(constructor.newInstance(1)); + curBatch.add(constructor.newInstance(33232)); + } catch (Exception e) { + } + } + + if (curBatch.size() > 0) { + return true; + } + return false; + } + + @Override + public UnderreplicatedLedger next() { + return curBatch.remove(); + } + + final Queue curBatch = new LinkedList(); + }; + + when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(iter); + + PowerMockito.mockStatic(CommandHelpers.class); + PowerMockito.when(CommandHelpers + .getBookieSocketAddrStringRepresentation( + eq(bookieId), any(BookieAddressResolver.class))).thenReturn(""); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommand() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" })); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommandWithDetail() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } + + @Test(timeout = 3000) + public void testNoLedgerIsBeingRecovered() { + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } +}