diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index c16efca0124..461623e1d0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.replication.ReplicationException; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand; +import org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand; @@ -155,6 +156,7 @@ public class BookieShell implements Tool { static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage"; static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index"; static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file"; + static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus"; // cookie commands static final String CMD_CREATE_COOKIE = "cookie_create"; @@ -1344,6 +1346,43 @@ int runCmd(CommandLine cmdLine) throws Exception { } } + + /** + * Command to query autorecovery status. + */ + class QueryAutoRecoveryStatusCmd extends MyCommand { + Options opts = new Options(); + + public QueryAutoRecoveryStatusCmd() { + super(CMD_QUERY_AUTORECOVERY_STATUS); + } + + @Override + Options getOptions() { + return opts; + } + + @Override + String getDescription() { + return "Query the autorecovery status"; + } + + @Override + String getUsage() { + return "queryautorecoverystatus"; + } + + @Override + int runCmd(CommandLine cmdLine) throws Exception { + final boolean verbose = cmdLine.hasOption("verbose"); + QueryAutoRecoveryStatusCommand.QFlags flags = new QueryAutoRecoveryStatusCommand.QFlags() + .verbose(verbose); + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + cmd.apply(bkConf, flags); + return 0; + } + } + /** * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store. */ @@ -2153,6 +2192,7 @@ int runCmd(CommandLine cmdLine) throws Exception { commands.put(CMD_READJOURNAL, new ReadJournalCmd()); commands.put(CMD_LASTMARK, new LastMarkCmd()); commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd()); + commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new QueryAutoRecoveryStatusCmd()); commands.put(CMD_LISTBOOKIES, new ListBookiesCmd()); commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd()); commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index cd8b50875ac..fb6259d6bc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -24,10 +24,10 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN; -import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; - +import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;; +import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;; +import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; import io.netty.buffer.Unpooled; - import java.util.Enumeration; import java.util.HashSet; import java.util.Iterator; @@ -35,11 +35,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.meta.LedgerManager; @@ -53,6 +53,7 @@ import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.annotations.StatsDoc; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,17 @@ public class LedgerFragmentReplicator { help = "The distribution of size of entries written by the replicator" ) private final OpStatsLogger numBytesWritten; + @StatsDoc( + name = READ_DATA_LATENCY, + help = "The distribution of latency of read entries by the replicator" + ) + private final OpStatsLogger readDataLatency; + @StatsDoc( + name = WRITE_DATA_LATENCY, + help = "The distribution of latency of write entries by the replicator" + ) + private final OpStatsLogger writeDataLatency; + public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) { this.bkc = bkc; @@ -98,6 +110,8 @@ public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) { numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ); numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN); numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN); + readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY); + writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY); } public LedgerFragmentReplicator(BookKeeper bkc) { @@ -334,6 +348,8 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob } } }; + + long startReadEntryTime = MathUtils.nowInNano(); /* * Read the ledger entry using the LedgerHandle. This will allow us to * read the entry from one of the other replicated bookies other than @@ -350,6 +366,10 @@ public void readComplete(int rc, LedgerHandle lh, ledgerFragmentEntryMcb.processResult(rc, null, null); return; } + + readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime), + TimeUnit.NANOSECONDS); + /* * Now that we've read the ledger entry, write it to the new * bookie we've selected. @@ -364,10 +384,13 @@ public void readComplete(int rc, LedgerHandle lh, lh.getLastAddConfirmed(), entry.getLength(), Unpooled.wrappedBuffer(data, 0, data.length)); for (BookieId newBookie : newBookies) { + long startWriteEntryTime = MathUtils.nowInNano(); bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, ByteBufList.clone(toSend), multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, false, WriteFlag.NONE); + writeDataLatency.registerSuccessfulEvent( + MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); } toSend.release(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index f245614625f..064c7fc5177 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -38,6 +38,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS; import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME; +import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE; import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE; import static org.apache.bookkeeper.util.SafeRunnable.safeRun; @@ -71,6 +72,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable { help = "the distribution of num under_replicated ledgers on each auditor run" ) private final OpStatsLogger numUnderReplicatedLedger; + + @StatsDoc( + name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE, + help = "the distribution of under_replicated ledgers total size on each auditor run" + ) + private final OpStatsLogger underReplicatedLedgerTotalSize; @StatsDoc( name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE, help = "the latency distribution of publishing under replicated ledgers for lost bookies" @@ -341,6 +349,7 @@ public Auditor(final String bookieIdentifier, this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0); numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS); + underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE); uRLPublishTimeForLostBookies = this.statsLogger .getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE); bookieToLedgersMapCreationTime = this.statsLogger @@ -1131,6 +1140,17 @@ private CompletableFuture publishSuspectedLedgersAsync(Collection mis } LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies); numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size()); + LongAdder underReplicatedSize = new LongAdder(); + FutureUtils.processList( + Lists.newArrayList(ledgers), + ledgerId -> + ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + underReplicatedSize.add(metadata.getValue().getLength()); + } + }), null); + underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue()); + return FutureUtils.processList( Lists.newArrayList(ledgers), ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index 6ec9f4918a7..b0bbe47d95d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -30,6 +30,7 @@ public interface ReplicationStats { String AUDITOR_SCOPE = "auditor"; String ELECTION_ATTEMPTS = "election_attempts"; String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS"; + String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE = "UNDER_REPLICATED_LEDGERS_TOTAL_SIZE"; String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE"; String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME"; String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME"; @@ -58,6 +59,8 @@ public interface ReplicationStats { String NUM_BYTES_READ = "NUM_BYTES_READ"; String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN"; String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN"; + String READ_DATA_LATENCY = "READ_DATA_LATENCY"; + String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY"; String REPLICATE_EXCEPTION = "exceptions"; String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER"; String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java new file mode 100644 index 00000000000..0f86a2d2ec7 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory; +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; +import org.apache.bookkeeper.tools.framework.CliFlags; +import org.apache.bookkeeper.tools.framework.CliSpec; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Command to Query current auto recovery status. + */ +public class QueryAutoRecoveryStatusCommand + extends BookieCommand { + static final Logger LOG = LoggerFactory. + getLogger(QueryAutoRecoveryStatusCommand.class); + private static final String NAME = "queryautorecoverystatus"; + private static final String DESC = "Query autorecovery status."; + + public QueryAutoRecoveryStatusCommand() { + super(CliSpec.newBuilder() + .withName(NAME) + .withDescription(DESC) + .withFlags(new QFlags()) + .build()); + } + + @Override + public boolean apply(ServerConfiguration conf, QFlags cmdFlags) { + try { + return handler(conf, cmdFlags); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } + + /** + * Flags for list under replicated command. + */ + @Accessors(fluent = true) + @Setter + public static class QFlags extends CliFlags{ + @Parameter(names = {"-v", "--verbose"}, description = "list recovering detailed ledger info") + private Boolean verbose = false; + } + + private static class LedgerRecoverInfo { + Long ledgerId; + String bookieId; + LedgerRecoverInfo(Long ledgerId, String bookieId) { + this.ledgerId = ledgerId; + this.bookieId = bookieId; + } + } + + /* + Print Message format is like this: + + CurrentRecoverLedgerInfo: + LedgerId: BookieId: LedgerSize:(detail) + LedgerId: BookieId: LedgerSize:(detail) + */ + public boolean handler(ServerConfiguration conf, QFlags flag) throws Exception { + runFunctionWithLedgerManagerFactory(conf, mFactory -> { + LedgerUnderreplicationManager underreplicationManager; + LedgerManager ledgerManager = mFactory.newLedgerManager(); + List ledgerList = new LinkedList<>(); + try { + underreplicationManager = mFactory.newLedgerUnderreplicationManager(); + } catch (KeeperException | ReplicationException.CompatibilityException e) { + throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e); + } + Iterator iter = underreplicationManager.listLedgersToRereplicate(null); + while (iter.hasNext()) { + UnderreplicatedLedger underreplicatedLedger = iter.next(); + long urLedgerId = underreplicatedLedger.getLedgerId(); + try { + String replicationWorkerId = underreplicationManager + .getReplicationWorkerIdRereplicatingLedger(urLedgerId); + if (replicationWorkerId != null) { + ledgerList.add(new LedgerRecoverInfo(urLedgerId, replicationWorkerId)); + } + } catch (ReplicationException.UnavailableException e) { + LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId, + e.getMessage()); + } + } + + LOG.info("CurrentRecoverLedgerInfo:"); + if (!flag.verbose) { + for (int i = 0; i < ledgerList.size(); i++) { + LOG.info("\tLedgerId:{}\tBookieId:{}", ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId); + } + } else { + for (int i = 0; i < ledgerList.size(); i++) { + LedgerRecoverInfo info = ledgerList.get(i); + ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata, exception) -> { + if (exception == null) { + LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}", + info.ledgerId, info.bookieId, metadata.getValue().getLength()); + } else { + LOG.error("Unable to read the ledger: {} information", info.ledgerId); + throw new UncheckedExecutionException(exception); + } + }); + } + } + if (ledgerList.size() == 0) { + // NO ledger is being auto recovering + LOG.info("\t No Ledger is being recovered."); + } + return null; + }); + return true; + } +} diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java new file mode 100644 index 00000000000..ba92c8bd60b --- /dev/null +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommandTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import com.google.common.collect.Lists; +import java.lang.reflect.Constructor; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerMetadataBuilder; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieAddressResolver; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; +import org.apache.bookkeeper.tools.cli.helpers.CommandHelpers; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + + + +/** + * Unit test for {@link QueryAutoRecoveryStatusCommand}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ QueryAutoRecoveryStatusCommand.class, ZKMetadataDriverBase.class, ZooKeeperClient.class, + CommandHelpers.class, MetadataDrivers.class +}) +public class QueryAutoRecoveryStatusCommandTest extends BookieCommandTestBase { + public QueryAutoRecoveryStatusCommandTest() { + super(3, 0); + } + LedgerUnderreplicationManager underreplicationManager; + + @Override + public void setup() throws Exception { + super.setup(); + BookieId bookieId = BookieId.parse(UUID.randomUUID().toString()); + LedgerManagerFactory ledgerManagerFactory = mock(LedgerManagerFactory.class); + + PowerMockito.mockStatic(MetadataDrivers.class); + PowerMockito.doAnswer(invocationOnMock -> { + Function function = invocationOnMock.getArgument(1); + function.apply(ledgerManagerFactory); + return true; + }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class), + any(Function.class)); + + LedgerManager ledgerManager = mock(LedgerManager.class); + underreplicationManager = mock(LedgerUnderreplicationManager.class); + + when(ledgerManagerFactory.newLedgerManager()).thenReturn(ledgerManager); + when(ledgerManagerFactory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager); + + List ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.2", 1234).toBookieId(), + new BookieSocketAddress("192.0.2.3", 1234).toBookieId()); + LedgerMetadata metadata = LedgerMetadataBuilder.create() + .withId(11112233) + .withClosedState() + .withLength(100000999) + .withLastEntryId(2000011) + .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2) + .withPassword("passwd".getBytes()) + .withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType()) + .newEnsembleEntry(0L, ensemble).build(); + CompletableFuture> promise = new CompletableFuture<>(); + Versioned vmeta = new Versioned(metadata, new LongVersion(1000)); + promise.complete(vmeta); + + when(ledgerManager.readLedgerMetadata(1)).thenReturn(promise); + when(ledgerManager.readLedgerMetadata(33232)).thenReturn(promise); + + Constructor constructor = UnderreplicatedLedger.class. + getDeclaredConstructor(long.class); + constructor.setAccessible(true); + final Queue queue = new LinkedList(); + queue.add("1111"); + Iterator iter = new Iterator() { + @Override + public boolean hasNext() { + if (queue.size() > 0) { + queue.remove(); + try { + curBatch.add(constructor.newInstance(1)); + curBatch.add(constructor.newInstance(33232)); + } catch (Exception e) { + } + } + + if (curBatch.size() > 0) { + return true; + } + return false; + } + + @Override + public UnderreplicatedLedger next() { + return curBatch.remove(); + } + + final Queue curBatch = new LinkedList(); + }; + + when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(iter); + + PowerMockito.mockStatic(CommandHelpers.class); + PowerMockito.when(CommandHelpers + .getBookieSocketAddrStringRepresentation( + eq(bookieId), any(BookieAddressResolver.class))).thenReturn(""); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommand() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" })); + } + + @Test(timeout = 30000) + public void testQueryRecoverStatusCommandWithDetail() { + try { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1)).thenReturn("192.168.0.103"); + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(33232)).thenReturn("192.168.0.103"); + } catch (Exception e) { + } + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } + + @Test(timeout = 3000) + public void testNoLedgerIsBeingRecovered() { + QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-v" })); + } +}