Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockstore: Make WriteBatch operations go through LedgerColumn #3687

Merged
merged 10 commits into from
Nov 19, 2024
177 changes: 95 additions & 82 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,11 +1141,8 @@ impl Blockstore {
metrics.chaining_elapsed_us += start.as_us();

let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders.lock().unwrap(),
&mut write_batch,
)?;
let (should_signal, newly_completed_slots) =
self.commit_slot_meta_working_set(&slot_meta_working_set, &mut write_batch)?;

for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
if !working_erasure_meta.should_write() {
Expand Down Expand Up @@ -1212,7 +1209,8 @@ impl Blockstore {
continue;
}
let (slot, fec_set_index) = erasure_set.store_key();
write_batch.put::<cf::ErasureMeta>(
self.erasure_meta_cf.put_in_batch(
&mut write_batch,
(slot, u64::from(fec_set_index)),
working_erasure_meta.as_ref(),
)?;
Expand All @@ -1223,15 +1221,20 @@ impl Blockstore {
// No need to rewrite the column
continue;
}
write_batch.put::<cf::MerkleRootMeta>(
self.merkle_root_meta_cf.put_in_batch(
&mut write_batch,
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)?;
}

for (&slot, index_working_set_entry) in index_working_set.iter() {
if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
self.index_cf.put_in_batch(
&mut write_batch,
slot,
&index_working_set_entry.index,
)?;
}
}
start.stop();
Expand Down Expand Up @@ -1658,7 +1661,9 @@ impl Blockstore {
{} is not full, marking slot dead",
shred_index, slot_meta.received, slot
);
write_batch.put::<cf::DeadSlots>(slot, &true).unwrap();
self.dead_slots_cf
.put_in_batch(write_batch, slot, &true)
.unwrap();
}

if !self.should_insert_data_shred(
Expand Down Expand Up @@ -1731,7 +1736,8 @@ impl Blockstore {

// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), shred.payload())?;
self.code_shred_cf
.put_bytes_in_batch(write_batch, (slot, shred_index), shred.payload())?;
index_meta.coding_mut().insert(shred_index);

Ok(())
Expand Down Expand Up @@ -2194,7 +2200,11 @@ impl Blockstore {

// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), shred.bytes_to_store())?;
self.data_shred_cf.put_bytes_in_batch(
write_batch,
(slot, index),
shred.bytes_to_store(),
)?;
data_index.insert(index);
let newly_completed_data_sets = update_slot_meta(
last_in_slot,
Expand Down Expand Up @@ -2940,7 +2950,7 @@ impl Blockstore {
keys_with_writable: impl Iterator<Item = (&'a Pubkey, bool)>,
status: TransactionStatusMeta,
transaction_index: usize,
db_write_batch: &mut WriteBatch<'_>,
db_write_batch: &mut WriteBatch,
) -> Result<()> {
self.write_transaction_status_helper(
slot,
Expand All @@ -2949,7 +2959,8 @@ impl Blockstore {
status,
transaction_index,
|address, slot, tx_index, signature, writeable| {
db_write_batch.put::<cf::AddressSignatures>(
self.address_signatures_cf.put_in_batch(
db_write_batch,
(*address, slot, tx_index, signature),
&AddressSignatureMeta { writeable },
)
Expand Down Expand Up @@ -2989,9 +3000,10 @@ impl Blockstore {
signature: &Signature,
slot: Slot,
memos: String,
db_write_batch: &mut WriteBatch<'_>,
db_write_batch: &mut WriteBatch,
) -> Result<()> {
db_write_batch.put::<cf::TransactionMemos>((*signature, slot), &memos)
self.transaction_memos_cf
.put_in_batch(db_write_batch, (*signature, slot), &memos)
}

/// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock
Expand Down Expand Up @@ -3989,7 +4001,8 @@ impl Blockstore {
frozen_hash,
is_duplicate_confirmed: true,
});
write_batch.put::<cf::BankHash>(slot, &data)?;
self.bank_hash_cf
.put_in_batch(&mut write_batch, slot, &data)?;
}

self.db.write(write_batch)?;
Expand All @@ -4001,7 +4014,7 @@ impl Blockstore {
let mut max_new_rooted_slot = 0;
for slot in rooted_slots {
max_new_rooted_slot = std::cmp::max(max_new_rooted_slot, *slot);
write_batch.put::<cf::Root>(*slot, &true)?;
self.roots_cf.put_in_batch(&mut write_batch, *slot, &true)?;
}

self.db.write(write_batch)?;
Expand Down Expand Up @@ -4306,7 +4319,8 @@ impl Blockstore {
// slot match the flags of slots that become connected the typical way.
root_meta.set_parent_connected();
root_meta.set_connected();
write_batch.put::<cf::SlotMeta>(root_meta.slot, &root_meta)?;
self.meta_cf
.put_in_batch(&mut write_batch, root_meta.slot, &root_meta)?;

let mut next_slots = VecDeque::from(root_meta.next_slots);
while !next_slots.is_empty() {
Expand All @@ -4318,7 +4332,8 @@ impl Blockstore {
if meta.set_parent_connected() {
next_slots.extend(meta.next_slots.iter());
}
write_batch.put::<cf::SlotMeta>(meta.slot, &meta)?;
self.meta_cf
.put_in_batch(&mut write_batch, meta.slot, &meta)?;
}

self.db.write(write_batch)?;
Expand Down Expand Up @@ -4360,7 +4375,7 @@ impl Blockstore {
// Write all the newly changed slots in new_chained_slots to the write_batch
for (slot, meta) in new_chained_slots.iter() {
let meta: &SlotMeta = &RefCell::borrow(meta);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
self.meta_cf.put_in_batch(write_batch, *slot, meta)?;
}
Ok(())
}
Expand Down Expand Up @@ -4436,14 +4451,15 @@ impl Blockstore {
// If the parent of `slot` is a newly inserted orphan, insert it into the orphans
// column family
if RefCell::borrow(&*prev_slot_meta).is_orphan() {
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
self.orphans_cf
.put_in_batch(write_batch, prev_slot, &true)?;
}
}
}

// At this point this slot has received a parent, so it's no longer an orphan
if was_orphan_slot {
write_batch.delete::<cf::Orphans>(slot)?;
self.orphans_cf.delete_in_batch(write_batch, slot)?;
}
}

Expand Down Expand Up @@ -4506,6 +4522,52 @@ impl Blockstore {
Ok(())
}

/// For each slot in the slot_meta_working_set which has any change, include
/// corresponding updates to cf::SlotMeta via the specified `write_batch`.
/// The `write_batch` will later be atomically committed to the blockstore.
///
/// Arguments:
/// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta`
/// mapping.
/// - `completed_slot_senders`: the units which are responsible for sending
steviez marked this conversation as resolved.
Show resolved Hide resolved
/// signals for completed slots.
/// - `write_batch`: the write batch which includes all the updates of the
/// the current write and ensures their atomicity.
///
/// On success, the function returns an Ok result with <should_signal,
/// newly_completed_slots> pair where:
/// - `should_signal`: a boolean flag indicating whether to send signal.
steviez marked this conversation as resolved.
Show resolved Hide resolved
/// - `newly_completed_slots`: a subset of slot_meta_working_set which are
/// newly completed.
fn commit_slot_meta_working_set(
&self,
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];
let completed_slots_senders = self.completed_slots_senders.lock().unwrap();

// Check if any metadata was changed, if so, insert the new version of the
// metadata into the write batch
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
// Any slot that wasn't written to should have been filtered out by now.
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
// Check if the working copy of the metadata has changed
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, meta_backup);
self.meta_cf.put_in_batch(write_batch, *slot, meta)?;
}
}

Ok((should_signal, newly_completed_slots))
}

/// Obtain the SlotMeta from the in-memory slot_meta_working_set or load
/// it from the database if it does not exist in slot_meta_working_set.
///
Expand Down Expand Up @@ -4625,7 +4687,7 @@ impl Blockstore {
res
}

pub fn get_write_batch(&self) -> std::result::Result<WriteBatch<'_>, BlockstoreError> {
pub fn get_write_batch(&self) -> std::result::Result<WriteBatch, BlockstoreError> {
self.db.batch()
}

Expand Down Expand Up @@ -4751,51 +4813,6 @@ fn send_signals(
}
}

/// For each slot in the slot_meta_working_set which has any change, include
/// corresponding updates to cf::SlotMeta via the specified `write_batch`.
/// The `write_batch` will later be atomically committed to the blockstore.
///
/// Arguments:
/// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta`
/// mapping.
/// - `completed_slot_senders`: the units which are responsible for sending
/// signals for completed slots.
/// - `write_batch`: the write batch which includes all the updates of the
/// the current write and ensures their atomicity.
///
/// On success, the function returns an Ok result with <should_signal,
/// newly_completed_slots> pair where:
/// - `should_signal`: a boolean flag indicating whether to send signal.
/// - `newly_completed_slots`: a subset of slot_meta_working_set which are
/// newly completed.
fn commit_slot_meta_working_set(
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
completed_slots_senders: &[Sender<Vec<u64>>],
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];

// Check if any metadata was changed, if so, insert the new version of the
// metadata into the write batch
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
// Any slot that wasn't written to should have been filtered out by now.
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
// Check if the working copy of the metadata has changed
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, meta_backup);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
}
}

Ok((should_signal, newly_completed_slots))
}

/// Returns the `SlotMeta` of the specified `slot` from the two cached states:
/// `working_set` and `chained_slots`. If both contain the `SlotMeta`, then
/// the latest one from the `working_set` will be returned.
Expand Down Expand Up @@ -7522,11 +7539,9 @@ pub mod tests {
);

for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
blockstore
.merkle_root_meta_cf
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -7721,11 +7736,9 @@ pub mod tests {
);

for (erasure_set, working_merkle_root_meta) in merkle_root_metas {
write_batch
.put::<cf::MerkleRootMeta>(
erasure_set.store_key(),
working_merkle_root_meta.as_ref(),
)
blockstore
.merkle_root_meta_cf
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
Expand Down Expand Up @@ -11913,8 +11926,8 @@ pub mod tests {
.unwrap();
let mut write_batch = blockstore.db.batch().unwrap();
blockstore
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, slot, slot)
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
assert!(blockstore
Expand Down Expand Up @@ -11979,8 +11992,8 @@ pub mod tests {
// an older version.
let mut write_batch = blockstore.db.batch().unwrap();
blockstore
.db
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, slot, slot)
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
assert!(blockstore
Expand Down
Loading
Loading