Skip to content

Commit

Permalink
fix(ci/cd): provision benchmark collection
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen committed Jan 9, 2025
1 parent 62a2da4 commit 4e41e2b
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 141 deletions.
6 changes: 2 additions & 4 deletions scripts/collect_benchmarks.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#!/usr/bin/env bash

# crontab -e
# 0 5 * * * bash /home/ubuntu/benchmarks/sig/scripts/collect_benchmarks.sh

# now in the scripts/ dir
cd "$(dirname "$0")"
# now in the sig dir
Expand All @@ -22,7 +19,8 @@ if ls $result_file 1> /dev/null 2>&1; then
echo "Results for commit $git_commit already exist. Skipping benchmark."
else
# Run the benchmark only if the result file doesn't exist
zig build -Doptimize=ReleaseSafe benchmark -- --metrics all
zig build -Doptimize=ReleaseSafe -Dno-run benchmark
./zig-out/bin/benchmark --metrics -e -f all

mv results/output.json "${result_dir}/output-${git_commit}-${timestamp}.json"
echo "Benchmark results saved to ${result_dir}/output-${git_commit}-${timestamp}.json"
Expand Down
8 changes: 8 additions & 0 deletions scripts/setup_cron.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")

(crontab -l; echo "\
0 6 * * * . $HOME/.bashrc; (bash $SCRIPT_DIR/collect_benchmarks.sh) 2>&1 | logger -t sig_bench \
") | crontab

echo "Cron job added. Current crontab:"
crontab -l
108 changes: 78 additions & 30 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -291,25 +291,23 @@ pub const AccountsDB = struct {
var timer = try sig.time.Timer.start();
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();
self.logger.info().log("fast loading accountsdb...");
try self.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
self.logger.info().logf("loaded from snapshot in {s}", .{timer.read()});
self.logger.info().logf("fastload: total time: {s}", .{timer.read()});
} else {
const load_duration = try self.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
n_threads,
allocator,
accounts_per_file_estimate,
);
self.logger.info().logf("loaded from snapshot in {s}", .{load_duration});
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_duration});
}

// no need to re-save if we just loaded from a fastload
if (!should_fastload and save_index) {
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();

try self.account_index.saveToDisk(fastload_dir);
if (save_index and !should_fastload) {
var timer = try sig.time.Timer.start();
_ = try self.saveStateForFastload();
self.logger.info().logf("saveStateForFastload: total time: {s}", .{timer.read()});
}

if (validate) {
Expand All @@ -331,17 +329,28 @@ pub const AccountsDB = struct {
.capitalization = inc_persistence.incremental_capitalization,
} else null,
});
self.logger.info().logf("validated from snapshot in {s}", .{validate_timer.read()});
self.logger.info().logf("validateLoadFromSnapshot: total time: {s}", .{validate_timer.read()});
}

return collapsed_manifest;
}

pub fn saveStateForFastload(
self: *Self,
) !void {
self.logger.info().log("running saveStateForFastload...");
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();
try self.account_index.saveToDisk(fastload_dir);
}

pub fn fastload(
self: *Self,
dir: std.fs.Dir,
snapshot_manifest: AccountsDbFields,
) !void {
self.logger.info().log("running fastload...");

var accounts_dir = try self.snapshot_dir.openDir("accounts", .{});
defer accounts_dir.close();

Expand Down Expand Up @@ -383,7 +392,6 @@ pub const AccountsDB = struct {
}

// NOTE: index loading was the most expensive part which we fastload here
self.logger.info().log("loading account index");
try self.account_index.loadFromDisk(dir);
}

Expand All @@ -397,7 +405,7 @@ pub const AccountsDB = struct {
per_thread_allocator: std.mem.Allocator,
accounts_per_file_estimate: u64,
) !sig.time.Duration {
self.logger.info().log("loading from snapshot...");
self.logger.info().log("running loadFromSnapshot...");

// used to read account files
const n_parse_threads = n_threads;
Expand Down Expand Up @@ -472,12 +480,11 @@ pub const AccountsDB = struct {
try geyser_writer.writePayloadToPipe(end_of_snapshot);
}

self.logger.info().logf("[{d} threads]: merging thread indexes...", .{n_combine_threads});
var merge_timer = try sig.time.Timer.start();
try self.mergeMultipleDBs(loading_threads, n_combine_threads);
self.logger.debug().logf("merging thread indexes took: {}", .{merge_timer.read()});
self.logger.debug().logf("mergeMultipleDBs: total time: {}", .{merge_timer.read()});

self.logger.debug().logf("total time: {s}", .{timer.read()});
self.logger.debug().logf("loadFromSnapshot: total time: {s}", .{timer.read()});
return timer.read();
}

Expand Down Expand Up @@ -768,6 +775,8 @@ pub const AccountsDB = struct {
thread_dbs: []AccountsDB,
n_threads: usize,
) !void {
self.logger.info().logf("[{d} threads]: running mergeMultipleDBs...", .{n_threads});

var merge_indexes_wg: std.Thread.WaitGroup = .{};
defer merge_indexes_wg.wait();
try spawnThreadTasks(mergeThreadIndexesMultiThread, .{
Expand Down Expand Up @@ -928,8 +937,8 @@ pub const AccountsDB = struct {
) !struct { Hash, u64 } {
var timer = try sig.time.Timer.start();
// TODO: make cli arg
const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount())) * 2;
// const n_threads = 1;
// const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount()));
const n_threads = 4;

// alloc the result
const hashes = try self.allocator.alloc(std.ArrayListUnmanaged(Hash), n_threads);
Expand All @@ -944,7 +953,10 @@ pub const AccountsDB = struct {
@memset(lamports, 0);

// split processing the bins over muliple threads
self.logger.info().logf("collecting hashes from accounts...", .{});
self.logger.info().logf(
"collecting hashes from accounts using {} threads...",
.{n_threads},
);
if (n_threads == 1) {
try getHashesFromIndex(
self,
Expand Down Expand Up @@ -1044,17 +1056,17 @@ pub const AccountsDB = struct {

if (params.expected_full.accounts_hash.order(&accounts_hash) != .eq) {
self.logger.err().logf(
\\ incorrect accounts hash
\\ expected vs calculated: {d} vs {d}
, .{ params.expected_full.accounts_hash, accounts_hash });
"incorrect accounts hash: expected vs calculated: {d} vs {d}",
.{ params.expected_full.accounts_hash, accounts_hash },
);
return error.IncorrectAccountsHash;
}

if (params.expected_full.capitalization != total_lamports) {
self.logger.err().logf(
\\ incorrect total lamports
\\ expected vs calculated: {d} vs {d}
, .{ params.expected_full.capitalization, total_lamports });
"incorrect total lamports: expected vs calculated: {d} vs {d}",
.{ params.expected_full.capitalization, total_lamports },
);
return error.IncorrectTotalLamports;
}

Expand Down Expand Up @@ -1093,17 +1105,17 @@ pub const AccountsDB = struct {

if (expected_incremental.capitalization != incremental_lamports) {
self.logger.err().logf(
\\ incorrect incremental lamports
\\ expected vs calculated: {d} vs {d}
, .{ expected_incremental.capitalization, incremental_lamports });
"incorrect incremental lamports: expected vs calculated: {d} vs {d}",
.{ expected_incremental.capitalization, incremental_lamports },
);
return error.IncorrectIncrementalLamports;
}

if (expected_incremental.accounts_hash.order(&accounts_delta_hash) != .eq) {
self.logger.err().logf(
\\ incorrect accounts delta hash
\\ expected vs calculated: {d} vs {d}
, .{ expected_incremental.accounts_hash, accounts_delta_hash });
"incorrect accounts delta hash: expected vs calculated: {d} vs {d}",
.{ expected_incremental.accounts_hash, accounts_delta_hash },
);
return error.IncorrectAccountsDeltaHash;
}

Expand Down Expand Up @@ -4425,13 +4437,18 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
pub fn loadAndVerifySnapshot(units: BenchTimeUnit, bench_args: BenchArgs) !struct {
load_time: u64,
validate_time: u64,
fastload_save_time: u64,
fastload_time: u64,
} {
const allocator = std.heap.c_allocator;
var print_logger = sig.trace.DirectPrintLogger.init(allocator, .debug);
const logger = print_logger.logger();

// unpack the snapshot
var snapshot_dir = std.fs.cwd().openDir(SNAPSHOT_DIR_PATH, .{ .iterate = true }) catch {
var snapshot_dir = std.fs.cwd().openDir(
SNAPSHOT_DIR_PATH ++ sig.ACCOUNTS_DB_SUBDIR,
.{ .iterate = true },
) catch {
// not snapshot -> early exit
std.debug.print(
"need to setup a snapshot in {s} for this benchmark...\n",
Expand All @@ -4441,6 +4458,8 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
return .{
.load_time = zero_duration.asNanos(),
.validate_time = zero_duration.asNanos(),
.fastload_save_time = zero_duration.asNanos(),
.fastload_time = zero_duration.asNanos(),
};
};
defer snapshot_dir.close();
Expand Down Expand Up @@ -4474,6 +4493,12 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);

const fastload_save_duration = blk: {
var timer = try sig.time.Timer.start();
try accounts_db.saveStateForFastload();
break :blk timer.read();
};

const full_snapshot = full_inc_manifest.full;
var validate_timer = try sig.time.Timer.start();
try accounts_db.validateLoadFromSnapshot(.{
Expand All @@ -4489,9 +4514,32 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
});
const validate_duration = validate_timer.read();

const fastload_duration = blk: {
var fastload_accounts_db = try AccountsDB.init(.{
.allocator = allocator,
.logger = logger,
.snapshot_dir = snapshot_dir,
.geyser_writer = null,
.gossip_view = null,
.index_allocation = if (bench_args.use_disk) .disk else .ram,
.number_of_index_shards = 32,
.lru_size = null,
});
defer fastload_accounts_db.deinit();

var fastload_dir = try snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();

var fastload_timer = try sig.time.Timer.start();
try fastload_accounts_db.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
break :blk fastload_timer.read();
};

return .{
.load_time = units.convertDuration(loading_duration),
.validate_time = units.convertDuration(validate_duration),
.fastload_save_time = units.convertDuration(fastload_save_duration),
.fastload_time = units.convertDuration(fastload_duration),
};
}
};
Expand Down
3 changes: 1 addition & 2 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ pub fn getOrDownloadAndUnpackSnapshot(

// download a new snapshot if required
const snapshot_exists = blk: {
_ = SnapshotFiles.find(allocator, snapshot_dir) catch |err| {
std.debug.print("failed to find snapshot files: {}\n", .{err});
_ = SnapshotFiles.find(allocator, snapshot_dir) catch {
break :blk false;
};
break :blk true;
Expand Down
2 changes: 1 addition & 1 deletion src/accountsdb/index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub const AccountIndex = struct {
// manager must be empty
std.debug.assert(self.reference_manager.capacity == 0);

self.logger.info().log("loading state from disk...");
self.logger.info().log("running account_index.loadFromDisk");
const reference_file = try dir.openFile("index.bin", .{});
const size = (try reference_file.stat()).size;
const index_memory = try std.posix.mmap(
Expand Down
17 changes: 14 additions & 3 deletions src/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub fn main() !void {
continue;
} else if (std.mem.startsWith(u8, arg, "-e")) {
run_expensive_benchmarks = true;
collect_metrics = true; // by default collect metrics when running expensive benchmarks
continue;
} else if (std.mem.startsWith(u8, arg, "-f")) {
force_fresh_state = true;
Expand All @@ -130,6 +129,17 @@ pub fn main() !void {
} else {
logger.info().logf("running benchmark with filter: {s}", .{@tagName(filter)});
}
if (collect_metrics) {
logger.info().log("collecting metrics");
}
if (run_expensive_benchmarks) {
logger.info().log("running expensive benchmarks");
}
if (force_fresh_state) {
logger.info().log("forcing fresh state for expensive benchmarks");
} else {
logger.info().log("re-using state for expensive benchmarks");
}

const max_time_per_bench = Duration.fromSecs(5); // !!
const run_all_benchmarks = filter == .all;
Expand Down Expand Up @@ -197,12 +207,13 @@ pub fn main() !void {
if (download_new_snapshot) {
// delete existing snapshot dir
if (test_snapshot_exists) {
std.debug.print("deleting snapshot dir...\n", .{});
logger.info().log("deleting snapshot dir...");
std.fs.cwd().deleteTreeMinStackSize(BENCH_SNAPSHOT_DIR_PATH) catch |err| {
std.debug.print("failed to delete snapshot dir ('{s}'): {}\n", .{
logger.err().logf("failed to delete snapshot dir ('{s}'): {}", .{
BENCH_SNAPSHOT_DIR_PATH,
err,
});
return err;
};
}

Expand Down
6 changes: 3 additions & 3 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ pub fn run() !void {
.long_name = "accounts-per-file-estimate",
.short_alias = 'a',
.help =
\\number of accounts to estimate inside of account files (used for pre-allocation).
\\Safer to set it larger than smaller.
\\(approx values we found work well testnet/devnet: 1_500, mainnet: 3_000)"
\\number of accounts to estimate inside of account files (used for pre-allocation).\
\\Safer to set it larger than smaller.\
\\(approx values we found work well testnet/devnet: 300, mainnet: TODO?)"
,
.value_ref = cli.mkRef(&config.current.accounts_db.accounts_per_file_estimate),
.required = false,
Expand Down
16 changes: 11 additions & 5 deletions src/gossip/helpers.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub fn initGossipFromCluster(
const socket_addr = try resolveSocketAddr(allocator, entrypoint_str);
try entrypoints.append(socket_addr);
}
logger.info().logf("using predefined entrypoints: {any}", .{entrypoints});

// create contact info
const echo_data = try getShredAndIPFromEchoServer(
Expand All @@ -40,20 +39,27 @@ pub fn initGossipFromCluster(
entrypoints.items,
);
const my_shred_version = echo_data.shred_version orelse 0;
logger.info().logf("my shred version: {d}", .{my_shred_version});
const my_ip = echo_data.ip orelse IpAddr.newIpv4(127, 0, 0, 1);
logger.info().logf("my ip: {any}", .{my_ip});

const default_config = sig.cmd.config.GossipConfig{};
const my_port = default_config.port; // default port
// NOTE: we dont use the default port to avoid port collisions with other gossip
// services running on the same machine
const my_port = default_config.port + 5;
const my_keypair = try getOrInitIdentity(allocator, logger);
logger.info().logf("gossip_port: {d}", .{my_port});

const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
var contact_info = ContactInfo.init(allocator, my_pubkey, getWallclockMs(), 0);
try contact_info.setSocket(.gossip, SocketAddr.init(my_ip, my_port));
contact_info.shred_version = my_shred_version;

logger.info()
.field("my_pubkey", my_pubkey)
.field("my_ip", my_ip)
.field("my_shred_version", my_shred_version)
.field("gossip_port", my_port)
.field("entrypoints", entrypoints.items)
.log("setting up gossip");

// create gossip
return try GossipService.create(
allocator,
Expand Down
Loading

0 comments on commit 4e41e2b

Please sign in to comment.