Skip to content

Commit

Permalink
Add metrics and internal command for QueryAutoRecoveryStatus, includi…
Browse files Browse the repository at this point in the history
…ng underReplicatedSize metrics,read/write latency, internal command for querying recovering ledgersInfo (#2768)

Motivation:

Current AutoRecovery does not have enough metrics or stat command that would help to monitor and debug. So we need to add metrics and admin stat interface to monitor the process of AutoRecovery.  For example, current recovering ledgerInfo and under replicated size, read/write latency in recovering.

Changes:

And QueryAutoRecoveryStatus command and under replicated size metric , read/write latency metric in recovering

Documentation:

Need doc.
  • Loading branch information
frankxieke authored Sep 6, 2021
1 parent 7c74fbb commit 97818f5
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.QueryAutoRecoveryStatusCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.ToggleCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.TriggerAuditCommand;
import org.apache.bookkeeper.tools.cli.commands.autorecovery.WhoIsAuditorCommand;
Expand Down Expand Up @@ -155,6 +156,7 @@ public class BookieShell implements Tool {
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file";
static final String CMD_QUERY_AUTORECOVERY_STATUS = "queryrecoverystatus";

// cookie commands
static final String CMD_CREATE_COOKIE = "cookie_create";
Expand Down Expand Up @@ -1344,6 +1346,43 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}


/**
* Command to query autorecovery status.
*/
class QueryAutoRecoveryStatusCmd extends MyCommand {
Options opts = new Options();

public QueryAutoRecoveryStatusCmd() {
super(CMD_QUERY_AUTORECOVERY_STATUS);
}

@Override
Options getOptions() {
return opts;
}

@Override
String getDescription() {
return "Query the autorecovery status";
}

@Override
String getUsage() {
return "queryautorecoverystatus";
}

@Override
int runCmd(CommandLine cmdLine) throws Exception {
final boolean verbose = cmdLine.hasOption("verbose");
QueryAutoRecoveryStatusCommand.QFlags flags = new QueryAutoRecoveryStatusCommand.QFlags()
.verbose(verbose);
QueryAutoRecoveryStatusCommand cmd = new QueryAutoRecoveryStatusCommand();
cmd.apply(bkConf, flags);
return 0;
}
}

/**
* Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store.
*/
Expand Down Expand Up @@ -2153,6 +2192,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_READJOURNAL, new ReadJournalCmd());
commands.put(CMD_LASTMARK, new LastMarkCmd());
commands.put(CMD_AUTORECOVERY, new AutoRecoveryCmd());
commands.put(CMD_QUERY_AUTORECOVERY_STATUS, new QueryAutoRecoveryStatusCmd());
commands.put(CMD_LISTBOOKIES, new ListBookiesCmd());
commands.put(CMD_LISTFILESONDISC, new ListDiskFilesCmd());
commands.put(CMD_UPDATECOOKIE, new UpdateCookieCmd());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;

import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY;;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;;
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
import io.netty.buffer.Unpooled;

import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.meta.LedgerManager;
Expand All @@ -53,6 +53,7 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +91,17 @@ public class LedgerFragmentReplicator {
help = "The distribution of size of entries written by the replicator"
)
private final OpStatsLogger numBytesWritten;
@StatsDoc(
name = READ_DATA_LATENCY,
help = "The distribution of latency of read entries by the replicator"
)
private final OpStatsLogger readDataLatency;
@StatsDoc(
name = WRITE_DATA_LATENCY,
help = "The distribution of latency of write entries by the replicator"
)
private final OpStatsLogger writeDataLatency;


public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
this.bkc = bkc;
Expand All @@ -98,6 +110,8 @@ public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger) {
numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ);
numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN);
numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN);
readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY);
writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY);
}

public LedgerFragmentReplicator(BookKeeper bkc) {
Expand Down Expand Up @@ -334,6 +348,8 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob
}
}
};

long startReadEntryTime = MathUtils.nowInNano();
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
* read the entry from one of the other replicated bookies other than
Expand All @@ -350,6 +366,10 @@ public void readComplete(int rc, LedgerHandle lh,
ledgerFragmentEntryMcb.processResult(rc, null, null);
return;
}

readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime),
TimeUnit.NANOSECONDS);

/*
* Now that we've read the ledger entry, write it to the new
* bookie we've selected.
Expand All @@ -364,10 +384,13 @@ public void readComplete(int rc, LedgerHandle lh,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId, ByteBufList.clone(toSend),
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
}
toSend.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
import static org.apache.bookkeeper.replication.ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE;
import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;

Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -170,6 +172,12 @@ public class Auditor implements AutoCloseable {
help = "the distribution of num under_replicated ledgers on each auditor run"
)
private final OpStatsLogger numUnderReplicatedLedger;

@StatsDoc(
name = UNDER_REPLICATED_LEDGERS_TOTAL_SIZE,
help = "the distribution of under_replicated ledgers total size on each auditor run"
)
private final OpStatsLogger underReplicatedLedgerTotalSize;
@StatsDoc(
name = URL_PUBLISH_TIME_FOR_LOST_BOOKIE,
help = "the latency distribution of publishing under replicated ledgers for lost bookies"
Expand Down Expand Up @@ -341,6 +349,7 @@ public Auditor(final String bookieIdentifier,
this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0);

numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
underReplicatedLedgerTotalSize = this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
uRLPublishTimeForLostBookies = this.statsLogger
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
bookieToLedgersMapCreationTime = this.statsLogger
Expand Down Expand Up @@ -1131,6 +1140,17 @@ private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> mis
}
LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
LongAdder underReplicatedSize = new LongAdder();
FutureUtils.processList(
Lists.newArrayList(ledgers),
ledgerId ->
ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> {
if (exception == null) {
underReplicatedSize.add(metadata.getValue().getLength());
}
}), null);
underReplicatedLedgerTotalSize.registerSuccessfulValue(underReplicatedSize.longValue());

return FutureUtils.processList(
Lists.newArrayList(ledgers),
ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface ReplicationStats {
String AUDITOR_SCOPE = "auditor";
String ELECTION_ATTEMPTS = "election_attempts";
String NUM_UNDER_REPLICATED_LEDGERS = "NUM_UNDER_REPLICATED_LEDGERS";
String UNDER_REPLICATED_LEDGERS_TOTAL_SIZE = "UNDER_REPLICATED_LEDGERS_TOTAL_SIZE";
String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
Expand Down Expand Up @@ -58,6 +59,8 @@ public interface ReplicationStats {
String NUM_BYTES_READ = "NUM_BYTES_READ";
String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
String READ_DATA_LATENCY = "READ_DATA_LATENCY";
String WRITE_DATA_LATENCY = "WRITE_DATA_LATENCY";
String REPLICATE_EXCEPTION = "exceptions";
String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.tools.cli.commands.autorecovery;

import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Command to Query current auto recovery status.
*/
public class QueryAutoRecoveryStatusCommand
extends BookieCommand<QueryAutoRecoveryStatusCommand.QFlags> {
static final Logger LOG = LoggerFactory.
getLogger(QueryAutoRecoveryStatusCommand.class);
private static final String NAME = "queryautorecoverystatus";
private static final String DESC = "Query autorecovery status.";

public QueryAutoRecoveryStatusCommand() {
super(CliSpec.<QFlags>newBuilder()
.withName(NAME)
.withDescription(DESC)
.withFlags(new QFlags())
.build());
}

@Override
public boolean apply(ServerConfiguration conf, QFlags cmdFlags) {
try {
return handler(conf, cmdFlags);
} catch (Exception e) {
throw new UncheckedExecutionException(e.getMessage(), e);
}
}

/**
* Flags for list under replicated command.
*/
@Accessors(fluent = true)
@Setter
public static class QFlags extends CliFlags{
@Parameter(names = {"-v", "--verbose"}, description = "list recovering detailed ledger info")
private Boolean verbose = false;
}

private static class LedgerRecoverInfo {
Long ledgerId;
String bookieId;
LedgerRecoverInfo(Long ledgerId, String bookieId) {
this.ledgerId = ledgerId;
this.bookieId = bookieId;
}
}

/*
Print Message format is like this:
CurrentRecoverLedgerInfo:
LedgerId: BookieId: LedgerSize:(detail)
LedgerId: BookieId: LedgerSize:(detail)
*/
public boolean handler(ServerConfiguration conf, QFlags flag) throws Exception {
runFunctionWithLedgerManagerFactory(conf, mFactory -> {
LedgerUnderreplicationManager underreplicationManager;
LedgerManager ledgerManager = mFactory.newLedgerManager();
List<LedgerRecoverInfo> ledgerList = new LinkedList<>();
try {
underreplicationManager = mFactory.newLedgerUnderreplicationManager();
} catch (KeeperException | ReplicationException.CompatibilityException e) {
throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e);
}
Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(null);
while (iter.hasNext()) {
UnderreplicatedLedger underreplicatedLedger = iter.next();
long urLedgerId = underreplicatedLedger.getLedgerId();
try {
String replicationWorkerId = underreplicationManager
.getReplicationWorkerIdRereplicatingLedger(urLedgerId);
if (replicationWorkerId != null) {
ledgerList.add(new LedgerRecoverInfo(urLedgerId, replicationWorkerId));
}
} catch (ReplicationException.UnavailableException e) {
LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId,
e.getMessage());
}
}

LOG.info("CurrentRecoverLedgerInfo:");
if (!flag.verbose) {
for (int i = 0; i < ledgerList.size(); i++) {
LOG.info("\tLedgerId:{}\tBookieId:{}", ledgerList.get(i).ledgerId, ledgerList.get(i).bookieId);
}
} else {
for (int i = 0; i < ledgerList.size(); i++) {
LedgerRecoverInfo info = ledgerList.get(i);
ledgerManager.readLedgerMetadata(info.ledgerId).whenComplete((metadata, exception) -> {
if (exception == null) {
LOG.info("\tLedgerId:{}\tBookieId:{}\tLedgerSize:{}",
info.ledgerId, info.bookieId, metadata.getValue().getLength());
} else {
LOG.error("Unable to read the ledger: {} information", info.ledgerId);
throw new UncheckedExecutionException(exception);
}
});
}
}
if (ledgerList.size() == 0) {
// NO ledger is being auto recovering
LOG.info("\t No Ledger is being recovered.");
}
return null;
});
return true;
}
}
Loading

0 comments on commit 97818f5

Please sign in to comment.