Skip to content

Commit

Permalink
add code fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu authored and 0xNineteen committed Mar 24, 2024
1 parent 688975c commit 0c31738
Showing 1 changed file with 92 additions and 55 deletions.
147 changes: 92 additions & 55 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ pub const GossipStats = struct {
handle_batch_prune_time: StatU64 = .{},
handle_trim_table_time: StatU64 = .{},

push_message_n_values: StatU64 = .{},
push_message_n_failed_inserts: StatU64 = .{},
push_message_n_invalid_values: StatU64 = .{},
push_messages_time_to_insert: StatU64 = .{},
push_messages_time_build_prune: StatU64 = .{},

packets_verified_loop_time: StatU64 = .{},

table_n_values: StatU64 = .{},
table_n_pubkeys: StatU64 = .{},
};
Expand Down Expand Up @@ -358,48 +366,52 @@ pub const GossipService = struct {

const VerifyMessageTask = struct {
allocator: std.mem.Allocator,
packet: *const Packet,
packet_batch: ArrayList(Packet),
verified_incoming_channel: *Channel(GossipMessageWithEndpoint),
logger: Logger,

task: Task,
done: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
done: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(true),

pub fn callback(task: *Task) void {
var self = @fieldParentPtr(@This(), "task", task);
std.debug.assert(!self.done.load(std.atomic.Ordering.Acquire));
defer self.done.store(true, std.atomic.Ordering.Release);
defer self.packet_batch.deinit();

for (self.packet_batch.items) |*packet| {
var message = bincode.readFromSlice(
self.allocator,
GossipMessage,
packet.data[0..packet.size],
bincode.Params.standard,
) catch {
self.logger.errf("gossip: packet_verify: failed to deserialize", .{});
continue;
};

var message = bincode.readFromSlice(
self.allocator,
GossipMessage,
self.packet.data[0..self.packet.size],
bincode.Params.standard,
) catch {
self.logger.errf("gossip: packet_verify: failed to deserialize", .{});
return;
};
message.sanitize() catch {
self.logger.errf("gossip: packet_verify: failed to sanitize", .{});
bincode.free(self.allocator, message);
continue;
};

message.sanitize() catch {
self.logger.errf("gossip: packet_verify: failed to sanitize", .{});
bincode.free(self.allocator, message);
return;
};
message.verifySignature() catch |e| {
self.logger.errf(
"gossip: packet_verify: failed to verify signature: {} from {}",
.{ e, packet.addr },
);
bincode.free(self.allocator, message);
continue;
};

message.verifySignature() catch |e| {
self.logger.errf(
"gossip: packet_verify: failed to verify signature: {} from {}",
.{ e, self.packet.addr },
);
bincode.free(self.allocator, message);
return;
};
const msg = GossipMessageWithEndpoint{
.from_endpoint = packet.addr,
.message = message,
};

const msg = GossipMessageWithEndpoint{
.from_endpoint = self.packet.addr,
.message = message,
};
self.verified_incoming_channel.send(msg) catch unreachable;
self.verified_incoming_channel.send(msg) catch unreachable;
}
}

/// waits for the task to be done, then resets the done state to false
Expand All @@ -421,10 +433,11 @@ pub const GossipService = struct {
// pre-allocate all the tasks
for (tasks) |*task| {
task.* = VerifyMessageTask{
.allocator = self.allocator,
// .allocator = std.heap.page_allocator, // TODO: swap out with arg
.allocator = self.allocator, // TODO: swap out with arg
.task = .{ .callback = VerifyMessageTask.callback },
.verified_incoming_channel = self.verified_incoming_channel,
.packet = &Packet.default(),
.packet_batch = undefined,
.logger = self.logger,
};
}
Expand All @@ -437,9 +450,9 @@ pub const GossipService = struct {

const packet_batches = maybe_packets.?;
defer {
for (packet_batches) |*packet_batch| {
packet_batch.deinit();
}
// for (packet_batches) |*packet_batch| {
// packet_batch.deinit();
// }
self.packet_incoming_channel.allocator.free(packet_batches);
}

Expand All @@ -451,27 +464,28 @@ pub const GossipService = struct {
self.stats.gossip_packets_received.add(n_packets_drained);

// verify in parallel using the threadpool
var count: usize = 0;
for (packet_batches) |*packet_batch| {
for (packet_batch.items) |*packet| {
var task = &tasks[count % socket_utils.PACKETS_PER_BATCH];
if (count >= socket_utils.PACKETS_PER_BATCH) {
task.awaitAndReset();
}
task.packet = packet;

const batch = Batch.from(&task.task);
self.thread_pool.schedule(batch);

count += 1;
// PERF: investigate CPU pinning
var task_i: usize = 0;
var n_tasks = tasks.len;
for (packet_batches) |packet_batch| {
// find a free task
var task_ptr = &tasks[task_i];
while (!task_ptr.done.load(std.atomic.Ordering.Acquire)) {
task_i = (task_i + 1) % n_tasks;
task_ptr = &tasks[task_i];
}
}
// schedule it
task_ptr.done.store(false, std.atomic.Ordering.Release);
task_ptr.packet_batch = packet_batch;

for (tasks[0..@min(count, socket_utils.PACKETS_PER_BATCH)]) |*task| {
task.awaitAndReset();
const batch = Batch.from(&task_ptr.task);
self.thread_pool.schedule(batch);
}
}

// for (tasks) |*task| {
// task.awaitAndReset();
// }
self.logger.debugf("verify_packets loop closed", .{});
}

Expand Down Expand Up @@ -1587,16 +1601,29 @@ pub const GossipService = struct {

// insert values and track the failed origins per pubkey
{
var timer = try std.time.Timer.start();

var gossip_table_lock = self.gossip_table_rw.write();
defer gossip_table_lock.unlock();
var gossip_table: *GossipTable = gossip_table_lock.mut();

defer {
gossip_table_lock.unlock();
self.stats.push_messages_time_to_insert.add(timer.read());
}

var n_gossip_data: usize = 0;
var n_failed_inserts: usize = 0;
var n_invalid_data: usize = 0;

for (batch_push_messages.items) |*push_message| {
var gossip_table: *GossipTable = gossip_table_lock.mut();
n_gossip_data += push_message.gossip_values.len;

const valid_len = self.filterBasedOnShredVersion(
gossip_table,
push_message.gossip_values,
push_message.from_pubkey.*,
);
n_invalid_data += push_message.gossip_values.len - valid_len;

var result = try gossip_table.insertValues(
push_message.gossip_values[0..valid_len],
Expand All @@ -1606,6 +1633,7 @@ pub const GossipService = struct {
);
const failed_insert_indexs = result.failed.?;
defer failed_insert_indexs.deinit();
n_failed_inserts += failed_insert_indexs.items.len;

self.logger
.field("n_values", valid_len)
Expand Down Expand Up @@ -1646,10 +1674,18 @@ pub const GossipService = struct {
try failed_origins.put(origin, {});
}
}

self.stats.push_message_n_values.add(n_gossip_data);
self.stats.push_message_n_invalid_values.add(n_failed_inserts);
self.stats.push_message_n_invalid_values.add(n_invalid_data);
}

// build prune packets
const now = getWallclockMs();
var timer = try std.time.Timer.start();
defer {
self.stats.push_messages_time_build_prune.add(timer.read());
}
var pubkey_to_failed_origins_iter = pubkey_to_failed_origins.iterator();

var n_packets = pubkey_to_failed_origins_iter.len;
Expand Down Expand Up @@ -1885,13 +1921,14 @@ pub const GossipService = struct {
) usize {
// we use swap remove which just reorders the array
// (order dm), so we just track the new len -- ie, no allocations/frees
var gossip_values_array = ArrayList(SignedGossipData).fromOwnedSlice(self.allocator, gossip_values);
const my_shred_version = self.my_shred_version.load(.Monotonic);
if (my_shred_version == 0) {
return gossip_values_array.items.len;
return gossip_values.len;
}
var i: usize = 0;

var gossip_values_array = ArrayList(SignedGossipData).fromOwnedSlice(self.allocator, gossip_values);
const sender_matches = gossip_table.checkMatchingShredVersion(from_pubkey, my_shred_version);
var i: usize = 0;
while (i < gossip_values_array.items.len) {
const gossip_value = &gossip_values[i];
switch (gossip_value.data) {
Expand Down

0 comments on commit 0c31738

Please sign in to comment.