Skip to content

Commit

Permalink
[ENH] Add blockfile root
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Oct 11, 2024
1 parent f01b8f2 commit 246fb9e
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 231 deletions.
11 changes: 10 additions & 1 deletion rust/blockstore/src/arrow/block/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::Ordering::{Equal, Greater, Less};
use std::collections::HashMap;
use std::io::SeekFrom;

use crate::arrow::types::{ArrowReadableKey, ArrowReadableValue};
Expand Down Expand Up @@ -377,10 +378,18 @@ impl Block {
}

/// Returns the number of items in the block
pub fn len(&self) -> usize {
pub(crate) fn len(&self) -> usize {
self.data.num_rows()
}

/// Returns a reference to metadata of the block if any is present
/// ### Notes
/// - The metadata is stored in the Arrow RB schema as custom metadata
pub(crate) fn metadata<'me>(&'me self) -> &'me HashMap<String, String> {
let schema = self.data.schema_ref();
schema.metadata()
}

/*
===== Block Serialization =====
*/
Expand Down
112 changes: 53 additions & 59 deletions rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::provider::GetError;
use super::provider::{GetError, RootManager};
use super::root::{RootReader, RootWriter};
use super::{block::delta::BlockDelta, provider::BlockManager};
use super::{
block::Block,
flusher::ArrowBlockfileFlusher,
provider::SparseIndexManager,
sparse_index::SparseIndex,
types::{ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue},
};
use crate::arrow::root::CURRENT_VERSION;
use crate::key::CompositeKey;
use crate::key::KeyWrapper;
use crate::BlockfileError;
Expand All @@ -23,9 +23,8 @@ use uuid::Uuid;
#[derive(Clone)]
pub struct ArrowBlockfileWriter {
block_manager: BlockManager,
sparse_index_manager: SparseIndexManager,
root_manager: RootManager,
block_deltas: Arc<Mutex<HashMap<Uuid, BlockDelta>>>,
sparse_index: SparseIndex,
root: RootWriter,
id: Uuid,
write_mutex: Arc<tokio::sync::Mutex<()>>,
Expand All @@ -50,57 +49,44 @@ impl ArrowBlockfileWriter {
pub(super) fn new<K: ArrowWriteableKey, V: ArrowWriteableValue>(
id: Uuid,
block_manager: BlockManager,
sparse_index_manager: SparseIndexManager,
root_manager: RootManager,
) -> Self {
let initial_block = block_manager.create::<K, V>();
// TODO: we can update the constructor to take the initial block instead of having a seperate method
let sparse_index = SparseIndex::new(id);
sparse_index.add_initial_block(initial_block.id);

// TODO: remove clone and make this authorative source for sparse index
let root_writer = RootWriter::new(id, sparse_index.clone());
let root_writer = RootWriter::new(CURRENT_VERSION, id, sparse_index);

let block_deltas = Arc::new(Mutex::new(HashMap::new()));
{
let mut block_deltas_map = block_deltas.lock();
block_deltas_map.insert(initial_block.id, initial_block);
}
tracing::debug!(
"Constructed blockfile writer on empty sparse index with id {:?}",
id
);
tracing::debug!("Constructed blockfile writer with id {:?}", id);
Self {
block_manager,
sparse_index_manager,
root_manager,
block_deltas,
sparse_index,
root: root_writer,
id,
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
}
}

// TODO: rename to fork_root and use root to/from instead of sparse index
pub(super) fn from_sparse_index(
pub(super) fn from_root(
id: Uuid,
block_manager: BlockManager,
sparse_index_manager: SparseIndexManager,
new_sparse_index: SparseIndex,
root_manager: RootManager,
new_root: RootWriter,
) -> Self {
tracing::debug!(
"Constructed blockfile writer from existing sparse index id {:?}",
id
);
tracing::debug!("Constructed blockfile writer from existing root {:?}", id);
let block_deltas = Arc::new(Mutex::new(HashMap::new()));
// TODO: remove clone and make this authorative source for sparse index
let root_writer = RootWriter::new(id, new_sparse_index.clone());

Self {
block_manager,
sparse_index_manager,
root_manager,
block_deltas,
sparse_index: new_sparse_index,
root: root_writer,
root: new_root,
id,
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
}
Expand All @@ -115,7 +101,7 @@ impl ArrowBlockfileWriter {
// Skip empty blocks. Also, remove from sparse index.
if delta.len() == 0 {
tracing::info!("Delta with id {:?} is empty", delta.id);
removed = self.sparse_index.remove_block(&delta.id);
removed = self.root.sparse_index.remove_block(&delta.id);
}
if !removed {
let block = self.block_manager.commit::<K, V>(delta);
Expand All @@ -125,9 +111,9 @@ impl ArrowBlockfileWriter {

let flusher = ArrowBlockfileFlusher::new(
self.block_manager,
self.sparse_index_manager,
self.root_manager,
blocks,
self.sparse_index,
self.root,
self.id,
);

Expand All @@ -148,7 +134,7 @@ impl ArrowBlockfileWriter {

// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory
Expand Down Expand Up @@ -177,7 +163,8 @@ impl ArrowBlockfileWriter {
};
let new_id = new_delta.id;
// Blocks can be empty.
self.sparse_index
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
Expand All @@ -194,7 +181,7 @@ impl ArrowBlockfileWriter {
if delta.get_size::<K, V>() > self.block_manager.max_block_size_bytes() {
let new_blocks = delta.split::<K, V>(self.block_manager.max_block_size_bytes());
for (split_key, new_delta) in new_blocks {
self.sparse_index.add_block(split_key, new_delta.id);
self.root.sparse_index.add_block(split_key, new_delta.id);
let mut deltas = self.block_deltas.lock();
deltas.insert(new_delta.id, new_delta);
}
Expand All @@ -211,7 +198,7 @@ impl ArrowBlockfileWriter {
let _guard = self.write_mutex.lock().await;
// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// TODO: clean this up as its redudant with the set method
let delta = {
Expand All @@ -237,7 +224,8 @@ impl ArrowBlockfileWriter {
}
};
let new_id = new_delta.id;
self.sparse_index
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
Expand All @@ -263,23 +251,20 @@ pub struct ArrowBlockfileReader<
V: ArrowReadableValue<'me>,
> {
block_manager: BlockManager,
pub(super) sparse_index: SparseIndex,
// root: RootReader,
root: RootReader,
loaded_blocks: Arc<Mutex<HashMap<Uuid, Box<Block>>>>,
marker: std::marker::PhantomData<(K, V, &'me ())>,
id: Uuid,
}

impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me>>
ArrowBlockfileReader<'me, K, V>
{
pub(super) fn new(id: Uuid, block_manager: BlockManager, sparse_index: SparseIndex) -> Self {
pub(super) fn new(block_manager: BlockManager, root: RootReader) -> Self {
Self {
block_manager,
sparse_index,
root,
loaded_blocks: Arc::new(Mutex::new(HashMap::new())),
marker: std::marker::PhantomData,
id,
}
}

Expand Down Expand Up @@ -350,13 +335,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
composite_keys.push(composite_key);
}
}
let target_block_ids = self.sparse_index.get_all_target_block_ids(composite_keys);
let target_block_ids = self
.root
.sparse_index
.get_all_target_block_ids(composite_keys);
self.load_blocks(&target_block_ids).await;
}

pub(crate) async fn get(&'me self, prefix: &str, key: K) -> Result<V, Box<dyn ChromaError>> {
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
let block = self.get_block(target_block_id).await;
let res = match block {
Ok(Some(block)) => block.get(prefix, key.clone()),
Expand Down Expand Up @@ -388,10 +376,10 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
) -> Result<(&'me str, K, V), Box<dyn ChromaError>> {
let mut block_offset = 0;
let mut block = None;
let sparse_index_len = self.sparse_index.data.lock().len();
let sparse_index_len = self.root.sparse_index.data.lock().len();
for i in 0..sparse_index_len {
let uuid = {
let data = self.sparse_index.data.lock();
let data = self.root.sparse_index.data.lock();
*data.forward.iter().nth(i).unwrap().1
};
block = match self.get_block(uuid).await {
Expand Down Expand Up @@ -438,7 +426,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
key: K,
) -> Result<Vec<(K, V)>, Box<dyn ChromaError>> {
// Get all block ids that contain keys > key from sparse index for this prefix.
let block_ids = self.sparse_index.get_block_ids_gt(prefix, key.clone());
let block_ids = self.root.sparse_index.get_block_ids_gt(prefix, key.clone());
let mut result: Vec<(K, V)> = vec![];
// Read all the blocks individually to get keys > key.
for block_id in block_ids {
Expand Down Expand Up @@ -470,7 +458,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
key: K,
) -> Result<Vec<(K, V)>, Box<dyn ChromaError>> {
// Get all block ids that contain keys < key from sparse index.
let block_ids = self.sparse_index.get_block_ids_lt(prefix, key.clone());
let block_ids = self.root.sparse_index.get_block_ids_lt(prefix, key.clone());
let mut result: Vec<(K, V)> = vec![];
// Read all the blocks individually to get keys < key.
for block_id in block_ids {
Expand Down Expand Up @@ -502,7 +490,10 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
key: K,
) -> Result<Vec<(K, V)>, Box<dyn ChromaError>> {
// Get all block ids that contain keys >= key from sparse index.
let block_ids = self.sparse_index.get_block_ids_gte(prefix, key.clone());
let block_ids = self
.root
.sparse_index
.get_block_ids_gte(prefix, key.clone());
let mut result: Vec<(K, V)> = vec![];
// Read all the blocks individually to get keys >= key.
for block_id in block_ids {
Expand Down Expand Up @@ -534,7 +525,10 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
key: K,
) -> Result<Vec<(K, V)>, Box<dyn ChromaError>> {
// Get all block ids that contain keys <= key from sparse index.
let block_ids = self.sparse_index.get_block_ids_lte(prefix, key.clone());
let block_ids = self
.root
.sparse_index
.get_block_ids_lte(prefix, key.clone());
let mut result: Vec<(K, V)> = vec![];
// Read all the blocks individually to get keys <= key.
for block_id in block_ids {
Expand Down Expand Up @@ -564,7 +558,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
&'me self,
prefix: &str,
) -> Result<Vec<(K, V)>, Box<dyn ChromaError>> {
let block_ids = self.sparse_index.get_block_ids_prefix(prefix);
let block_ids = self.root.sparse_index.get_block_ids_prefix(prefix);
let mut result: Vec<(K, V)> = vec![];
for block_id in block_ids {
let block_opt = match self.get_block(block_id).await {
Expand Down Expand Up @@ -595,7 +589,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
key: K,
) -> Result<bool, Box<dyn ChromaError>> {
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.sparse_index.get_target_block_id(&search_key);
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
let block = match self.get_block(target_block_id).await {
Ok(Some(block)) => block,
Ok(None) => {
Expand All @@ -615,7 +609,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
pub(crate) async fn count(&self) -> Result<usize, Box<dyn ChromaError>> {
let mut block_ids: Vec<Uuid> = vec![];
{
let lock_guard = self.sparse_index.data.lock();
let lock_guard = self.root.sparse_index.data.lock();
let curr_iter = lock_guard.forward.iter();
for (_, block_id) in curr_iter {
block_ids.push(*block_id);
Expand All @@ -641,7 +635,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
}

pub(crate) fn id(&self) -> Uuid {
self.id
self.root.id
}
}

Expand Down Expand Up @@ -942,8 +936,8 @@ mod tests {
// Sparse index should have 3 blocks
match &reader {
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
assert_eq!(reader.sparse_index.len(), 3);
assert!(reader.sparse_index.is_valid());
assert_eq!(reader.root.sparse_index.len(), 3);
assert!(reader.root.sparse_index.is_valid());
}
_ => panic!("Unexpected reader type"),
}
Expand Down Expand Up @@ -977,8 +971,8 @@ mod tests {
// Sparse index should still have 3 blocks
match &reader {
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
assert_eq!(reader.sparse_index.len(), 3);
assert!(reader.sparse_index.is_valid());
assert_eq!(reader.root.sparse_index.len(), 3);
assert!(reader.root.sparse_index.is_valid());
}
_ => panic!("Unexpected reader type"),
}
Expand Down Expand Up @@ -1010,8 +1004,8 @@ mod tests {
// Sparse index should have 6 blocks
match &reader {
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
assert_eq!(reader.sparse_index.len(), 6);
assert!(reader.sparse_index.is_valid());
assert_eq!(reader.root.sparse_index.len(), 6);
assert!(reader.root.sparse_index.is_valid());
}
_ => panic!("Unexpected reader type"),
}
Expand Down
8 changes: 5 additions & 3 deletions rust/blockstore/src/arrow/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ pub const TEST_MAX_BLOCK_SIZE_BYTES: usize = 16384;
#[derive(Deserialize, Debug, Clone)]
pub struct ArrowBlockfileProviderConfig {
pub block_manager_config: BlockManagerConfig,
pub sparse_index_manager_config: SparseIndexManagerConfig,
#[serde(alias = "sparse_index_manager_config")]
pub root_manager_config: RootManagerConfig,
}

#[derive(Deserialize, Debug, Clone)]
Expand All @@ -17,6 +18,7 @@ pub struct BlockManagerConfig {
}

#[derive(Deserialize, Debug, Clone)]
pub struct SparseIndexManagerConfig {
pub sparse_index_cache_config: CacheConfig,
pub struct RootManagerConfig {
#[serde(alias = "sparse_index_cache_config")]
pub root_cache_config: CacheConfig,
}
Loading

0 comments on commit 246fb9e

Please sign in to comment.