Skip to content

Commit

Permalink
Merge pull request #204 from Syndica/ink/snapshot-fuzzing
Browse files Browse the repository at this point in the history
feat(accounts-db): snapshot fuzzing
  • Loading branch information
InKryption authored Aug 6, 2024
2 parents 78ffdb1 + 32c3aeb commit cccc96c
Show file tree
Hide file tree
Showing 22 changed files with 1,995 additions and 617 deletions.
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub fn build(b: *Build) void {
fuzz_exe.root_module.addImport("base58-zig", base58_module);
fuzz_exe.root_module.addImport("zig-network", zig_network_module);
fuzz_exe.root_module.addImport("httpz", httpz_mod);
fuzz_exe.root_module.addImport("zstd", zstd_mod);

const fuzz_exe_run = b.addRunArtifact(fuzz_exe);
fuzz_exe_run.addArgs(b.args orelse &.{});
Expand Down
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
.hash = "1220b8a918dfcee4fc8326ec337776e2ffd3029511c35f6b96d10aa7be98ca2faf99",
},
.zstd = .{
.url = "https://github.com/Syndica/zstd.zig/archive/20a21798a253ea1f0388ee633f4110e1deb2ddef.tar.gz",
.hash = "1220e27e4fece8ff0cabb2f7439891919e8ed294dc936c2a54f0702a2f0ec96f4b6c",
.url = "https://github.com/Syndica/zstd.zig/archive/b5078c9da8588d4b133ed93b6b13c01288346be5.tar.gz",
.hash = "1220409db8254e909be7277ec0ef0e26bf3f9daf089dcc5f8bb24c473ddc2adc9dfa",
},
.curl = .{
.url = "https://github.com/jiacai2050/zig-curl/archive/8a3f45798a80a5de4c11c6fa44dab8785c421d27.tar.gz",
Expand Down
581 changes: 291 additions & 290 deletions src/accountsdb/db.zig

Large diffs are not rendered by default.

19 changes: 6 additions & 13 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn downloadSnapshotsFromGossip(
// if null, then we trust any peer for snapshot download
maybe_trusted_validators: ?[]const Pubkey,
gossip_service: *GossipService,
output_dir: []const u8,
output_dir: std.fs.Dir,
min_mb_per_sec: usize,
) !void {
logger.infof("starting snapshot download with min download speed: {d} MB/s", .{min_mb_per_sec});
Expand Down Expand Up @@ -295,13 +295,12 @@ const DownloadProgress = struct {

pub fn init(
logger: Logger,
output_dir_str: []const u8,
output_dir: std.fs.Dir,
filename: []const u8,
download_size: usize,
min_mb_per_second: ?usize,
) !Self {
var output_dir = try std.fs.cwd().openDir(output_dir_str, .{});
var file = try output_dir.createFile(filename, .{ .read = true });
const file = try output_dir.createFile(filename, .{ .read = true });
defer file.close();

// resize the file
Expand Down Expand Up @@ -406,7 +405,7 @@ pub fn downloadFile(
allocator: std.mem.Allocator,
logger: Logger,
url: [:0]const u8,
output_dir_str: []const u8,
output_dir: std.fs.Dir,
filename: []const u8,
min_mb_per_second: ?usize,
) !void {
Expand All @@ -432,18 +431,12 @@ pub fn downloadFile(
easy.timeout_ms = std.time.ms_per_hour * 5; // 5 hours is probs too long but its ok
var download_progress = try DownloadProgress.init(
logger,
output_dir_str,
output_dir,
filename,
download_size,
min_mb_per_second,
);
errdefer {
// NOTE: this shouldnt fail because we open the dir in DownloadProgress.init
const output_dir = std.fs.cwd().openDir(output_dir_str, .{}) catch {
std.debug.panic("failed to open output dir: {s}", .{output_dir_str});
};
output_dir.deleteFile(filename) catch {};
}
errdefer output_dir.deleteFile(filename) catch {};
defer download_progress.deinit();

try setNoBody(easy, false); // full download
Expand Down
175 changes: 140 additions & 35 deletions src/accountsdb/fuzz.zig
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
const std = @import("std");
const sig = @import("../lib.zig");
const zstd = @import("zstd");

const AccountsDB = sig.accounts_db.AccountsDB;
const Logger = sig.trace.Logger;
const Account = sig.core.Account;
const Slot = sig.core.time.Slot;
const Pubkey = sig.core.pubkey.Pubkey;
const Hash = sig.core.Hash;
const BankFields = sig.accounts_db.snapshots.BankFields;
const BankHashInfo = sig.accounts_db.snapshots.BankHashInfo;

pub const TrackedAccount = struct {
pubkey: Pubkey,
Expand Down Expand Up @@ -45,56 +49,88 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
}
};

var gpa_allocator = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa_allocator.allocator();
var prng = std.Random.DefaultPrng.init(seed);
const rand = prng.random();

var gpa_state = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa_state.deinit();
const gpa = gpa_state.allocator();

const logger = Logger.init(allocator, .debug);
const logger = Logger.init(gpa, .debug);
defer logger.deinit();
logger.spawn();

var prng = std.rand.DefaultPrng.init(seed);
const rand = prng.random();

const use_disk = rand.boolean();
const snapshot_dir = "test_data/accountsdb_fuzz";

var test_data_dir = try std.fs.cwd().makeOpenPath("test_data", .{});
defer test_data_dir.close();

const snapshot_dir_name = "accountsdb_fuzz";
var snapshot_dir = try test_data_dir.makeOpenPath(snapshot_dir_name, .{});
defer snapshot_dir.close();
defer {
// NOTE: sometimes this can take a long time so we print when we start and finish
std.debug.print("deleting snapshot dir...\n", .{});
std.fs.cwd().deleteTree(snapshot_dir) catch |err| {
std.debug.print("failed to delete snapshot dir: {}\n", .{err});
test_data_dir.deleteTree(snapshot_dir_name) catch |err| {
std.debug.print("failed to delete snapshot dir ('{s}'): {}\n", .{ sig.utils.fmt.tryRealPath(snapshot_dir, "."), err });
};
std.debug.print("deleted snapshot dir\n", .{});
}
std.debug.print("use disk: {}\n", .{use_disk});

var accounts_db = try AccountsDB.init(allocator, logger, .{
// CONTEXT: we need a separate directory to unpack the snapshot
// generated by the accountsdb that's using the main directory,
// since otherwise the alternate accountsdb may race with the
// main one while reading/writing/deleting account files.
const alternative_snapshot_dir_name = "alt";
var alternative_snapshot_dir = try snapshot_dir.makeOpenPath(alternative_snapshot_dir_name, .{});
defer alternative_snapshot_dir.close();
defer {
// NOTE: sometimes this can take a long time so we print when we start and finish
std.debug.print("deleting snapshot dir...\n", .{});
test_data_dir.deleteTree(alternative_snapshot_dir_name) catch |err| {
std.debug.print("failed to delete snapshot dir ('{s}'): {}\n", .{ sig.utils.fmt.tryRealPath(snapshot_dir, "."), err });
};
std.debug.print("deleted snapshot dir\n", .{});
}

var accounts_db = try AccountsDB.init(gpa, logger, snapshot_dir, .{
.number_of_index_bins = sig.accounts_db.db.ACCOUNT_INDEX_BINS,
.use_disk_index = use_disk,
.snapshot_dir = snapshot_dir,
// TODO: other things we can fuzz (number of bins, ...)
});
defer accounts_db.deinit(true);

const exit = try allocator.create(std.atomic.Value(bool));
const exit = try gpa.create(std.atomic.Value(bool));
defer gpa.destroy(exit);
exit.* = std.atomic.Value(bool).init(false);

const manager_handle = try std.Thread.spawn(.{}, AccountsDB.runManagerLoop, .{
&accounts_db,
exit,
});

var tracked_accounts = std.AutoArrayHashMap(Pubkey, TrackedAccount).init(allocator);
defer {
for (tracked_accounts.keys()) |key| {
tracked_accounts.getEntry(key).?.value_ptr.deinit(allocator);
}
tracked_accounts.deinit();
errdefer {
exit.store(true, .seq_cst);
manager_handle.join();
}

var tracked_accounts = std.AutoArrayHashMap(Pubkey, TrackedAccount).init(gpa);
defer tracked_accounts.deinit();
defer for (tracked_accounts.values()) |*value| {
value.deinit(gpa);
};
try tracked_accounts.ensureTotalCapacity(10_000);

var largest_rooted_slot: usize = 0;
var slot: usize = 0;
var random_bank_fields = try BankFields.random(gpa, rand, 1 << 8);
defer random_bank_fields.deinit(gpa);

const random_bank_hash_info = BankHashInfo.random(rand);

const Actions = enum { put, get };
const zstd_compressor = try zstd.Compressor.init(.{});
defer zstd_compressor.deinit();

var largest_rooted_slot: Slot = 0;
var slot: Slot = 0;

// get/put a bunch of accounts
while (true) {
Expand All @@ -106,18 +142,18 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
}
defer slot += 1;

const action_int = rand.intRangeAtMost(u8, 0, 1);
const action: Actions = @enumFromInt(action_int);

const action = rand.enumValue(enum { put, get });
switch (action) {
.put => {
const N_ACCOUNTS_PER_SLOT = 10;

const accounts = try allocator.alloc(Account, N_ACCOUNTS_PER_SLOT);
const pubkeys = try allocator.alloc(Pubkey, N_ACCOUNTS_PER_SLOT);
var accounts: [N_ACCOUNTS_PER_SLOT]Account = undefined;
var pubkeys: [N_ACCOUNTS_PER_SLOT]Pubkey = undefined;

for (&accounts, &pubkeys, 0..) |*account, *pubkey, i| {
errdefer for (accounts[0..i]) |prev_account| prev_account.deinit(gpa);

for (0..N_ACCOUNTS_PER_SLOT) |i| {
var tracked_account = try TrackedAccount.random(rand, slot, allocator);
var tracked_account = try TrackedAccount.random(rand, slot, gpa);

const existing_pubkey = rand.boolean();
if (existing_pubkey and tracked_accounts.count() > 0) {
Expand All @@ -126,20 +162,21 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
tracked_account.pubkey = key;
}

accounts[i] = try tracked_account.toAccount(allocator);
pubkeys[i] = tracked_account.pubkey;
account.* = try tracked_account.toAccount(gpa);
pubkey.* = tracked_account.pubkey;

const r = try tracked_accounts.getOrPut(tracked_account.pubkey);
if (r.found_existing) {
r.value_ptr.deinit(allocator);
r.value_ptr.deinit(gpa);
}
// always overwrite the old slot
r.value_ptr.* = tracked_account;
}
defer for (accounts) |account| account.deinit(gpa);

try accounts_db.putAccountSlice(
accounts,
pubkeys,
&accounts,
&pubkeys,
slot,
);
},
Expand All @@ -153,7 +190,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {

const tracked_account = tracked_accounts.get(key).?;
var account = try accounts_db.getAccount(&tracked_account.pubkey);
defer account.deinit(allocator);
defer account.deinit(gpa);

if (!std.mem.eql(u8, tracked_account.data, account.data)) {
@panic("found accounts with different data");
Expand All @@ -166,6 +203,74 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
largest_rooted_slot = @min(slot, largest_rooted_slot + 2);
accounts_db.largest_root_slot.store(largest_rooted_slot, .seq_cst);
}

const empty_file_map = blk: {
const file_map: *const AccountsDB.FileMap, var file_map_lg = accounts_db.file_map.readWithLock();
defer file_map_lg.unlock();
break :blk file_map.count() == 0;
};

if (!empty_file_map and slot % 500 == 0 and slot != largest_rooted_slot) {
const snapshot_random_hash = Hash.random(rand);
const snap_info: sig.accounts_db.snapshots.FullSnapshotFileInfo = .{
.slot = largest_rooted_slot,
.hash = snapshot_random_hash,
};

std.debug.print("Generating snapshot for slot {}...\n", .{largest_rooted_slot});

const archive_file = try alternative_snapshot_dir.createFile(snap_info.snapshotNameStr().constSlice(), .{ .read = true });
defer archive_file.close();

// write the archive
var zstd_write_buffer: [4096 * 4]u8 = undefined;
const zstd_write_ctx = zstd.writerCtx(archive_file.writer(), &zstd_compressor, &zstd_write_buffer);

random_bank_fields.slot = largest_rooted_slot;
const lamports_per_signature = rand.int(u64);

try accounts_db.writeSnapshotTarFull(
zstd_write_ctx.writer(),
.{ .bank_slot_deltas = &.{} },
random_bank_fields,
lamports_per_signature,
random_bank_hash_info,
0,
);

try zstd_write_ctx.finish();

std.debug.print("Unpacking snapshot for slot {}...\n", .{largest_rooted_slot});
try archive_file.seekTo(0);
try sig.accounts_db.snapshots.parallelUnpackZstdTarBall(
gpa,
logger,
archive_file,
alternative_snapshot_dir,
std.Thread.getCpuCount() catch 1,
true,
);

const snap_files: sig.accounts_db.SnapshotFiles = .{
.full_snapshot = snap_info,
.incremental_snapshot = null,
};

var snap_fields_and_paths = try sig.accounts_db.AllSnapshotFields.fromFiles(gpa, logger, alternative_snapshot_dir, snap_files);
defer snap_fields_and_paths.deinit(gpa);

var alternative_accounts_db = try AccountsDB.init(gpa, logger, alternative_snapshot_dir, accounts_db.config);
defer alternative_accounts_db.deinit(false);

std.debug.print("Validating snapshot for slot {}...\n", .{largest_rooted_slot});

try alternative_accounts_db.loadFromSnapshot(
snap_fields_and_paths.full.accounts_db_fields.file_map,
1,
gpa,
);
std.debug.print("Validated snapshot for slot {d}\n", .{largest_rooted_slot});
}
}

std.debug.print("fuzzing complete\n", .{});
Expand Down
Loading

0 comments on commit cccc96c

Please sign in to comment.