diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 21362beba..db830fa3c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -145,7 +145,6 @@ jobs: include: - store: "memstore" - store: "rocksstore" - - store: "rocksstore-compat07" - store: "sledstore" steps: diff --git a/Cargo.toml b/Cargo.toml index 9dbd6fb86..fd1397d78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ members = [ "macros", "tests", "rocksstore", - "rocksstore-compat07", "sledstore", ] exclude = [ diff --git a/Makefile b/Makefile index fbc324882..a0f0ee24f 100644 --- a/Makefile +++ b/Makefile @@ -66,7 +66,7 @@ unused_dep: typos: # cargo install typos-cli - typos --write-changes openraft/ tests/ memstore/ rocksstore rocksstore-compat07/ examples/raft-kv-memstore/ examples/raft-kv-rocksdb/ + typos --write-changes openraft/ tests/ memstore/ rocksstore examples/raft-kv-memstore/ examples/raft-kv-rocksdb/ # typos clean: diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index 4b3ad2d4b..8bce47a90 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -74,10 +74,6 @@ single-term-leader = [] # Provide basic compatible types compat = [] -# Turn on to let openraft provide additional data types to build v0.7 compatible RaftStorage. -compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"] -compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] - # Allows an application to implement a custom the v2 storage API. # See `openraft::storage::v2` for more details. # V2 API are unstable and may change in the future. diff --git a/openraft/src/compat/compat07.rs b/openraft/src/compat/compat07.rs deleted file mode 100644 index 21a3feedf..000000000 --- a/openraft/src/compat/compat07.rs +++ /dev/null @@ -1,702 +0,0 @@ -//! This mod provides types that can be deserialized from data written by either v0.7 or -//! the latest openraft. -//! -//! This mod is enabled by feature flag `compat-07`. -//! See: [feature-flag-compat-07](`crate::docs::feature_flags#feature-flag-compat-07`). -//! Ap application does not needs to enable this feature if it chooses -//! to manually upgrade v0.7 on-disk data. -//! -//! In v0.7 compatible mode, openraft enables feature flags: -//! - `serde`: it adds `serde` implementation to types such as `LogId`. -//! Since generic type `NodeId` and `Node` are introduced in v0.8, the v0.7 compatible mode only -//! works when `NodeId=u64` and `Node=EmptyNode`. -//! -//! The compatible mode still need an application to upgrade codes, but it does not require -//! any manual upgrade to the on-disk data. -//! -//! An application that tries to upgrade from v0.7 can use types in this mod to replace the -//! corresponding ones in a `RaftStorage` implementation, so that v0.7 data and v0.8 data can both -//! be read. -//! [rocksstore-compat07](https://github.com/datafuselabs/openraft/tree/main/rocksstore-compat07) -//! is an example using these type to implement an upgraded RaftStorage -//! -//! This mod also provides a testing suite [`testing::Suite07`] to ensure old data will be correctly -//! read. -//! An application should ensure its storage -//! to pass test suite like [rocksstore-compat07/compatibility_test.rs](https://github.com/datafuselabs/openraft/blob/main/rocksstore-compat07/src/compatibility_test.rs) -//! does: -//! ```ignore -//! use openraft::compat; -//! -//! struct Builder07; -//! struct BuilderLatest; -//! -//! impl compat::testing::StoreBuilder07 for Builder07 { -//! type D = rocksstore07::RocksRequest; -//! type R = rocksstore07::RocksResponse; -//! type S = Arc; -//! -//! async fn build(&self, p: &Path) -> Arc { -//! rocksstore07::RocksStore::new(p).await -//! } -//! -//! fn sample_app_data(&self) -> Self::D { -//! rocksstore07::RocksRequest::Set { key: s("foo"), value: s("bar") } -//! } -//! } -//! -//! impl compat::testing::StoreBuilder for BuilderLatest { -//! type C = crate::Config; -//! type S = Arc; -//! -//! async fn build(&self, p: &Path) -> Arc { -//! crate::RocksStore::new(p).await -//! } -//! -//! fn sample_app_data(&self) -> <::C as openraft::RaftTypeConfig>::D { -//! crate::RocksRequest::Set { key: s("foo"), value: s("bar") } -//! } -//! } -//! -//! #[tokio::test] -//! async fn test_compatibility_with_rocksstore_07() -> anyhow::Result<()> { -//! compat::testing::Suite07 { -//! builder07: Builder07, -//! builder_latest: BuilderLatest, -//! }.test_all().await?; -//! Ok(()) -//! } -//! -//! fn s(v: impl ToString) -> String { -//! v.to_string() -//! } -//! ``` - -mod entry; -mod entry_payload; -mod log_id; -mod membership; -mod snapshot_meta; -mod stored_membership; -mod vote; - -pub use entry::Entry; -pub use entry_payload::EntryPayload; -pub use log_id::LogId; -pub use membership::Membership; -pub use snapshot_meta::SnapshotMeta; -pub use stored_membership::StoredMembership; -pub use vote::Vote; - -pub mod testing { - use std::path::Path; - - use macros::add_async_trait; - use maplit::btreemap; - use maplit::btreeset; - - use crate::compat; - use crate::compat::compat07; - use crate::entry::RaftPayload; - use crate::log_id::RaftLogId; - - /// Build a v0.7 `RaftStorage` implementation for compatibility test. - #[add_async_trait] - pub trait StoreBuilder07 { - type D: or07::AppData; - type R: or07::AppDataResponse; - type S: or07::RaftStorage; - - /// Build a store that is backed by data stored on file system. - async fn build(&self, p: &Path) -> Self::S; - - /// Build an `AppData` for testing. It has to always produce the same data. - fn sample_app_data(&self) -> Self::D; - } - - /// A test suite that ensures data written by an older version `RaftStorage` implementation can - /// be read correctly by a newer version `RaftStorage` implementation. - #[allow(unused_qualifications)] - pub struct Suite07 - where - B07: compat07::testing::StoreBuilder07, - BLatest: compat::testing::StoreBuilder, - { - pub builder07: B07, - pub builder_latest: BLatest, - } - - #[allow(unused_qualifications)] - impl Suite07 - where - B07: compat07::testing::StoreBuilder07, - BLatest: compat::testing::StoreBuilder, - { - pub async fn test_all(&self) -> anyhow::Result<()> { - self.test_v07_hard_state_voted_for_none().await?; - self.test_v07_hard_state_voted_for_some().await?; - self.test_v08_vote().await?; - self.test_v07_append_log().await?; - self.test_v07_truncate_purge().await?; - self.test_v07_apply().await?; - self.test_v07_snapshot().await?; - - Ok(()) - } - - async fn test_v07_hard_state_voted_for_none(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - or07::RaftStorage::save_hard_state(&s7, &or07::HardState { - current_term: 1, - voted_for: None, - }) - .await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - let got = crate::RaftStorage::read_vote(&mut s8).await?; - assert_eq!(Some(crate::Vote::new(1, 0)), got); - } - - Ok(()) - } - - async fn test_v07_hard_state_voted_for_some(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - or07::RaftStorage::save_hard_state(&s7, &or07::HardState { - current_term: 1, - voted_for: Some(3), - }) - .await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - let got = crate::RaftStorage::read_vote(&mut s8).await?; - assert_eq!(Some(crate::Vote::new(1, 3)), got); - } - - Ok(()) - } - - async fn test_v08_vote(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let mut s8 = self.builder_latest.build(td.path()).await; - crate::RaftStorage::save_vote(&mut s8, &crate::Vote::new(3, 5)).await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - let got = crate::RaftStorage::read_vote(&mut s8).await?; - assert_eq!(Some(crate::Vote::new(3, 5)), got); - } - - Ok(()) - } - - async fn test_v07_append_log(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - let entries = vec![ - or07::Entry { - log_id: or07::LogId { term: 1, index: 5 }, - payload: or07::EntryPayload::Blank, - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 6 }, - payload: or07::EntryPayload::Membership(or07::Membership::new_single(btreeset! {1,2})), - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 7 }, - payload: or07::EntryPayload::Normal(self.builder07.sample_app_data()), - }, - ]; - - or07::RaftStorage::append_to_log(&s7, entries.iter().collect::>().as_slice()).await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - let got = crate::RaftLogReader::try_get_log_entries(&mut s8, 4..9).await?; - let want: Vec> = vec![ - crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 5), - payload: crate::EntryPayload::Blank, - }, - crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 6), - payload: crate::EntryPayload::Membership(crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - )), - }, - crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 7), - payload: crate::EntryPayload::Normal(self.builder_latest.sample_app_data()), - }, - ]; - - assert_eq!(3, got.len()); - assert_eq!(want[0].log_id, *got[0].get_log_id()); - assert_eq!(want[1].log_id, *got[1].get_log_id()); - assert_eq!(want[2].log_id, *got[2].get_log_id()); - - assert!(got[0].is_blank()); - if let Some(m) = got[1].get_membership() { - assert_eq!( - &crate::Membership::::new( - vec![btreeset! {1,2}], - btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ), - m - ) - } else { - unreachable!("expect Membership"); - } - - let s = serde_json::to_string(&got[2])?; - let want = serde_json::to_string(&crate::Entry:: { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 7), - payload: crate::EntryPayload::Normal(self.builder_latest.sample_app_data()), - })?; - assert_eq!(want, s); - } - - Ok(()) - } - - async fn test_v07_truncate_purge(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - let entries = vec![ - or07::Entry { - log_id: or07::LogId { term: 1, index: 5 }, - payload: or07::EntryPayload::Blank, - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 6 }, - payload: or07::EntryPayload::Membership(or07::Membership::new_single(btreeset! {1,2})), - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 7 }, - payload: or07::EntryPayload::Normal(self.builder07.sample_app_data()), - }, - ]; - - or07::RaftStorage::append_to_log(&s7, entries.iter().collect::>().as_slice()).await?; - or07::RaftStorage::delete_conflict_logs_since(&s7, or07::LogId { term: 1, index: 7 }).await?; - or07::RaftStorage::purge_logs_upto(&s7, or07::LogId { term: 1, index: 5 }).await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - - let got = crate::RaftLogReader::try_get_log_entries(&mut s8, 4..9).await?; - let want: Vec> = vec![crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 6), - payload: crate::EntryPayload::Membership(crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - )), - }]; - - assert_eq!(1, got.len()); - assert_eq!(want[0].log_id, *got[0].get_log_id()); - - if let Some(m) = got[0].get_membership() { - assert_eq!( - &crate::Membership::::new( - vec![btreeset! {1,2}], - btreemap! {1=> crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ), - m - ) - } else { - unreachable!("expect Membership"); - } - - // get_log_state - let got = crate::RaftStorage::get_log_state(&mut s8).await?; - assert_eq!( - crate::LogState { - last_purged_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 5)), - last_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 6)), - }, - got - ); - } - - Ok(()) - } - - async fn test_v07_apply(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - let entries = vec![ - or07::Entry { - log_id: or07::LogId { term: 1, index: 5 }, - payload: or07::EntryPayload::Blank, - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 6 }, - payload: or07::EntryPayload::Membership(or07::Membership::new_single(btreeset! {1,2})), - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 7 }, - payload: or07::EntryPayload::Normal(self.builder07.sample_app_data()), - }, - ]; - - or07::RaftStorage::apply_to_state_machine(&s7, entries.iter().collect::>().as_slice()).await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - - let (last_log_id, last_membership) = crate::RaftStorage::last_applied_state(&mut s8).await?; - assert_eq!( - Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 7),), - last_log_id - ); - assert_eq!( - crate::StoredMembership::new( - Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 6),), - crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}} - ) - ), - last_membership - ); - } - - Ok(()) - } - - async fn test_v07_snapshot(&self) -> anyhow::Result<()> { - let td = tmp_dir(); - { - let s7 = self.builder07.build(td.path()).await; - let entries = vec![ - or07::Entry { - log_id: or07::LogId { term: 1, index: 5 }, - payload: or07::EntryPayload::Blank, - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 6 }, - payload: or07::EntryPayload::Membership(or07::Membership::new_single(btreeset! {1,2})), - }, - or07::Entry { - log_id: or07::LogId { term: 1, index: 7 }, - payload: or07::EntryPayload::Normal(self.builder07.sample_app_data()), - }, - ]; - - or07::RaftStorage::apply_to_state_machine(&s7, entries.iter().collect::>().as_slice()).await?; - or07::RaftStorage::build_snapshot(&s7).await?; - } - { - let mut s8 = self.builder_latest.build(td.path()).await; - - let snapshot = crate::RaftStorage::get_current_snapshot(&mut s8).await?; - assert!( - snapshot.is_none(), - "SnapshotMeta.last_membership is introduced in 0.8, can not use an old snapshot" - ); - } - - Ok(()) - } - } - - fn tmp_dir() -> tempfile::TempDir { - tempfile::TempDir::new().expect("couldn't create temp dir") - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use maplit::btreemap; - use maplit::btreeset; - - use crate::compat::Upgrade; - use crate::CommittedLeaderId; - use crate::TokioRuntime; - - #[test] - fn test_serde_log_id() -> anyhow::Result<()> { - use super::LogId; - - let v7 = or07::LogId::new(10, 5); - let want = crate::LogId::new(CommittedLeaderId::new(10, 0), 5); - - let s = serde_json::to_string(&v7)?; - let c: LogId = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: LogId = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_vote() -> anyhow::Result<()> { - use super::Vote; - - let v7 = or07::HardState { - current_term: 5, - voted_for: Some(3), - }; - let want = crate::Vote::new(5, 3); - - let s = serde_json::to_string(&v7)?; - let c: Vote = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: Vote = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_membership() -> anyhow::Result<()> { - use super::Membership; - - let v7 = or07::Membership::new_single(btreeset! {1,2}); - let want = crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ); - - let s = serde_json::to_string(&v7)?; - let c: Membership = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: Membership = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_effective_membership() -> anyhow::Result<()> { - use super::StoredMembership; - - let m7 = || or07::Membership::new_single(btreeset! {1,2}); - let m8 = || { - crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ) - }; - - let v7 = or07::EffectiveMembership::new(or07::LogId::new(5, 3), m7()); - let want = crate::StoredMembership::new(Some(crate::LogId::new(CommittedLeaderId::new(5, 0), 3)), m8()); - - let s = serde_json::to_string(&v7)?; - let c: StoredMembership = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: StoredMembership = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - crate::declare_raft_types!( - pub TestingConfig: - D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, - Entry = crate::Entry, SnapshotData = Cursor>, - AsyncRuntime = TokioRuntime - ); - - #[test] - fn test_serde_entry_payload_blank() -> anyhow::Result<()> { - use super::EntryPayload; - - let v7 = or07::EntryPayload::::Blank; - let want = crate::EntryPayload::::Blank; - - let s = serde_json::to_string(&v7)?; - let c: EntryPayload = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: EntryPayload = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_entry_payload_normal() -> anyhow::Result<()> { - use super::EntryPayload; - - let v7 = or07::EntryPayload::::Normal(3); - let want = crate::EntryPayload::::Normal(3); - - let s = serde_json::to_string(&v7)?; - let c: EntryPayload = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: EntryPayload = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_entry_payload_membership() -> anyhow::Result<()> { - use super::EntryPayload; - - let m7 = || or07::Membership::new_single(btreeset! {1,2}); - let m8 = || { - crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ) - }; - - let v7 = or07::EntryPayload::::Membership(m7()); - let want = crate::EntryPayload::::Membership(m8()); - - let s = serde_json::to_string(&v7)?; - let c: EntryPayload = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(want, v8); - - let s = serde_json::to_string(&v8)?; - let c: EntryPayload = serde_json::from_str(&s)?; - assert_eq!(want, c.upgrade()); - Ok(()) - } - - #[test] - fn test_serde_entry_blank() -> anyhow::Result<()> { - use super::Entry; - - let v7 = or07::Entry { - log_id: or07::LogId::new(10, 5), - payload: or07::EntryPayload::::Blank, - }; - let want = crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5), - payload: crate::EntryPayload::::Blank, - }; - - let s = serde_json::to_string(&v7)?; - let c: Entry = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&v8)?); - - let s = serde_json::to_string(&v8)?; - let c: Entry = serde_json::from_str(&s)?; - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c.upgrade())?); - Ok(()) - } - - #[test] - fn test_serde_entry_normal() -> anyhow::Result<()> { - use super::Entry; - - let v7 = or07::Entry { - log_id: or07::LogId::new(10, 5), - payload: or07::EntryPayload::::Normal(3), - }; - let want = crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5), - payload: crate::EntryPayload::::Normal(3), - }; - - let s = serde_json::to_string(&v7)?; - let c: Entry = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&v8)?); - - let s = serde_json::to_string(&v8)?; - let c: Entry = serde_json::from_str(&s)?; - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c.upgrade())?); - Ok(()) - } - - #[test] - fn test_serde_entry_membership() -> anyhow::Result<()> { - use super::Entry; - - let m7 = || or07::Membership::new_single(btreeset! {1,2}); - let m8 = || { - crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ) - }; - let v7 = or07::Entry { - log_id: or07::LogId::new(10, 5), - payload: or07::EntryPayload::::Membership(m7()), - }; - let want = crate::Entry { - log_id: crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5), - payload: crate::EntryPayload::::Membership(m8()), - }; - - let s = serde_json::to_string(&v7)?; - let c: Entry = serde_json::from_str(&s)?; - let v8 = c.upgrade(); - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&v8)?); - - let s = serde_json::to_string(&v8)?; - let c: Entry = serde_json::from_str(&s)?; - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c.upgrade())?); - Ok(()) - } - - #[test] - fn test_serde_snapshot_meta() -> anyhow::Result<()> { - use super::SnapshotMeta; - - let m8 = || { - crate::Membership::new( - vec![btreeset! {1,2}], - btreemap! {1=>crate::EmptyNode{}, 2=>crate::EmptyNode{}}, - ) - }; - let v7 = or07::SnapshotMeta { - last_log_id: Some(or07::LogId::new(10, 5)), - snapshot_id: "a".to_string(), - }; - let want = crate::SnapshotMeta { - last_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5)), - last_membership: crate::StoredMembership::new( - Some(crate::LogId::new(crate::CommittedLeaderId::new(10, 0), 5)), - m8(), - ), - snapshot_id: "a".to_string(), - }; - - let s = serde_json::to_string(&v7)?; - let c: SnapshotMeta = serde_json::from_str(&s)?; - assert!( - c.try_upgrade().is_err(), - "snapshot_meta can not be upgrade because it has no membership" - ); - - let s = serde_json::to_string(&want)?; - let c: SnapshotMeta = serde_json::from_str(&s)?; - let c = c.try_upgrade().unwrap(); - assert_eq!(serde_json::to_string(&want)?, serde_json::to_string(&c)?); - - Ok(()) - } -} diff --git a/openraft/src/compat/compat07/entry.rs b/openraft/src/compat/compat07/entry.rs deleted file mode 100644 index 6e78bfbee..000000000 --- a/openraft/src/compat/compat07/entry.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::fmt::Debug; - -use super::EntryPayload; -use super::LogId; -use crate::compat::Upgrade; - -/// v0.7 compatible Entry. -/// -/// To load from either v0.7 or the latest format data and upgrade it to the latest type: -/// ```ignore -/// let x:openraft::Entry = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -#[derive(serde::Serialize, serde::Deserialize)] -#[serde(bound = "")] -pub struct Entry { - pub log_id: LogId, - pub payload: EntryPayload, -} - -impl Upgrade> for or07::Entry -where - C: crate::RaftTypeConfig, - ::D: or07::AppData + Debug, -{ - fn upgrade(self) -> crate::Entry { - let log_id = self.log_id.upgrade(); - let payload = self.payload.upgrade(); - crate::Entry { log_id, payload } - } -} - -impl> Upgrade> for Entry { - fn upgrade(self) -> crate::Entry { - crate::Entry { - log_id: self.log_id.upgrade(), - payload: self.payload.upgrade(), - } - } -} diff --git a/openraft/src/compat/compat07/entry_payload.rs b/openraft/src/compat/compat07/entry_payload.rs deleted file mode 100644 index 2bd52db1c..000000000 --- a/openraft/src/compat/compat07/entry_payload.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::fmt::Debug; - -use super::Membership; -use crate::compat::Upgrade; - -/// v0.7 compatible EntryPayload. -/// -/// To load from either v0.7 or the latest format data and upgrade it to the latest type: -/// ```ignore -/// let x:openraft::EntryPayload = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum EntryPayload { - Blank, - Normal(C::D), - Membership(Membership), -} - -impl Upgrade> for or07::EntryPayload -where - C: crate::RaftTypeConfig, - ::D: or07::AppData + Debug, -{ - fn upgrade(self) -> crate::EntryPayload { - match self { - Self::Blank => crate::EntryPayload::Blank, - Self::Membership(m) => crate::EntryPayload::Membership(m.upgrade()), - Self::Normal(d) => crate::EntryPayload::Normal(d), - } - } -} - -impl> Upgrade> - for EntryPayload -{ - fn upgrade(self) -> crate::EntryPayload { - match self { - EntryPayload::Blank => crate::EntryPayload::Blank, - EntryPayload::Normal(d) => crate::EntryPayload::Normal(d), - EntryPayload::Membership(m) => crate::EntryPayload::Membership(m.upgrade()), - } - } -} diff --git a/openraft/src/compat/compat07/log_id.rs b/openraft/src/compat/compat07/log_id.rs deleted file mode 100644 index e2629dea9..000000000 --- a/openraft/src/compat/compat07/log_id.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::compat::Compat; -use crate::compat::Upgrade; - -/// v0.7 compatible LogId. -/// -/// To load from either v0.7 or the latest format data and upgrade it to the latest type: -/// ```ignore -/// let x:openraft::LogId = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -pub type LogId = Compat>; - -impl Upgrade> for or07::LogId { - fn upgrade(self) -> crate::LogId { - let committed_leader_id = crate::CommittedLeaderId::new(self.term, 0); - crate::LogId::new(committed_leader_id, self.index) - } -} diff --git a/openraft/src/compat/compat07/membership.rs b/openraft/src/compat/compat07/membership.rs deleted file mode 100644 index 3d609fa78..000000000 --- a/openraft/src/compat/compat07/membership.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::collections::BTreeMap; -use std::collections::BTreeSet; -use std::fmt::Debug; - -use crate::compat::Upgrade; - -/// v0.7 compatible Membership. -/// -/// To load from either v0.7 or the latest format data and upgrade it to the latest type: -/// ```ignore -/// let x:openraft::Membership = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Membership { - pub configs: Vec>, - pub nodes: Option>, - pub all_nodes: Option>, -} - -impl Upgrade> for or07::Membership { - fn upgrade(self) -> crate::Membership { - let configs = self.get_configs().clone(); - let nodes = self.all_nodes().iter().map(|nid| (*nid, crate::EmptyNode::new())).collect::>(); - crate::Membership::new(configs, nodes) - } -} - -impl Upgrade> for Membership { - fn upgrade(self) -> crate::Membership { - if let Some(ns) = self.nodes { - crate::Membership::new(self.configs, ns) - } else { - crate::Membership::new(self.configs, self.all_nodes.unwrap()) - } - } -} diff --git a/openraft/src/compat/compat07/snapshot_meta.rs b/openraft/src/compat/compat07/snapshot_meta.rs deleted file mode 100644 index bedfa4061..000000000 --- a/openraft/src/compat/compat07/snapshot_meta.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::fmt::Debug; - -use super::LogId; -use super::StoredMembership; -use crate::compat::Upgrade; - -/// v0.7 compatible SnapshotMeta. -/// -/// SnapshotMeta can not be upgraded, an old snapshot should be discarded and a new one should be -/// re-built. -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct SnapshotMeta { - pub last_log_id: Option, - - pub last_membership: Option, - - pub snapshot_id: crate::SnapshotId, -} - -impl Upgrade> for or07::SnapshotMeta { - fn upgrade(self) -> crate::SnapshotMeta { - unimplemented!("can not upgrade SnapshotMeta") - } - fn try_upgrade(self) -> Result, (Self, &'static str)> { - Err((self, "v07 snapshot meta does not contain membership to upgrade")) - } -} - -impl Upgrade> for SnapshotMeta { - fn upgrade(self) -> crate::SnapshotMeta { - unimplemented!("can not upgrade SnapshotMeta") - } - fn try_upgrade(self) -> Result, (Self, &'static str)> { - if self.last_membership.is_none() { - Err((self, "v07 snapshot meta does not contain membership to upgrade")) - } else { - Ok(crate::SnapshotMeta { - last_log_id: self.last_log_id.map(|lid| lid.upgrade()), - last_membership: self.last_membership.unwrap().upgrade(), - snapshot_id: self.snapshot_id, - }) - } - } -} diff --git a/openraft/src/compat/compat07/stored_membership.rs b/openraft/src/compat/compat07/stored_membership.rs deleted file mode 100644 index d8ef35952..000000000 --- a/openraft/src/compat/compat07/stored_membership.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::fmt::Debug; - -use super::LogId; -use super::Membership; -use crate::compat::Upgrade; - -/// v0.7 compatible StoredMembership. -/// -/// To load from either v0.7 or the latest format data and upgrade it to the latest type: -/// ```ignore -/// let x:openraft::StoredMembership = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct StoredMembership { - pub log_id: Option, - pub membership: Membership, - #[serde(skip)] - pub quorum_set: Option<()>, - #[serde(skip)] - pub voter_ids: Option<()>, -} - -impl Upgrade> for or07::EffectiveMembership { - fn upgrade(self) -> crate::StoredMembership { - let membership = self.membership.upgrade(); - let log_id = self.log_id.upgrade(); - - crate::StoredMembership::new(Some(log_id), membership) - } -} - -impl Upgrade> for StoredMembership { - fn upgrade(self) -> crate::StoredMembership { - crate::StoredMembership::new(self.log_id.map(|lid| lid.upgrade()), self.membership.upgrade()) - } -} diff --git a/openraft/src/compat/compat07/vote.rs b/openraft/src/compat/compat07/vote.rs deleted file mode 100644 index a233fb677..000000000 --- a/openraft/src/compat/compat07/vote.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::compat::Compat; -use crate::compat::Upgrade; - -/// v0.7 compatible Vote(in v0.7 the corresponding type is `HardState`). -/// -/// To load from either v0.7 or the latest format data and upgrade it to latest type: -/// ```ignore -/// let x: openraft::Vote = serde_json::from_slice::(&serialized_bytes)?.upgrade() -/// ``` -pub type Vote = Compat>; - -impl Upgrade> for or07::HardState { - fn upgrade(self) -> crate::Vote { - // When it has not yet voted for any node, let it vote for any node won't break the consensus. - crate::Vote::new(self.current_term, self.voted_for.unwrap_or_default()) - } -} diff --git a/openraft/src/compat/mod.rs b/openraft/src/compat/mod.rs index 7b6d8d722..6217a4570 100644 --- a/openraft/src/compat/mod.rs +++ b/openraft/src/compat/mod.rs @@ -1,9 +1,6 @@ //! This mod is a upgrade helper that provides functionalities for a newer openraft application to //! read data written by an older application. -#[cfg(feature = "compat-07")] pub mod compat07; -pub mod testing; - mod upgrade; pub use upgrade::Compat; diff --git a/openraft/src/compat/testing.rs b/openraft/src/compat/testing.rs deleted file mode 100644 index 394b448f8..000000000 --- a/openraft/src/compat/testing.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! This mod provides supporting utilities for compatibility testing. -//! -//! An application that tries to be compatible with an older format data -//! must ensure its `RaftStorage` implementation to pass test suite, -//! just like [rocksstore-compat07/compatibility_test.rs](https://github.com/datafuselabs/openraft/blob/main/rocksstore-compat07/src/compatibility_test.rs) -//! does - -use std::path::Path; - -use macros::add_async_trait; - -/// Build a latest `RaftStorage` implementation for compatibility test. -#[add_async_trait] -pub trait StoreBuilder { - type C: crate::RaftTypeConfig; - type S: crate::RaftStorage; - - /// Build a store that is backed by data stored on file system. - async fn build(&self, p: &Path) -> Self::S; - - /// Build an `AppData` for testing. It has to always produce the same data. - fn sample_app_data(&self) -> <::C as crate::RaftTypeConfig>::D; -} - -#[cfg(feature = "compat-07")] -pub use crate::compat::compat07::testing::StoreBuilder07; -#[cfg(feature = "compat-07")] pub use crate::compat::compat07::testing::Suite07; diff --git a/openraft/src/docs/feature_flags/feature-flags-toc.md b/openraft/src/docs/feature_flags/feature-flags-toc.md index c36ce6bca..08967687e 100644 --- a/openraft/src/docs/feature_flags/feature-flags-toc.md +++ b/openraft/src/docs/feature_flags/feature-flags-toc.md @@ -1,6 +1,5 @@ - [feature-flag `bench`](#feature-flag-bench) - [feature-flag `bt`](#feature-flag-bt) -- [feature-flag `compat-07`](#feature-flag-compat-07) - [feature-flag `generic-snapshot-data`](#feature-flag-generic-snapshot-data) - [feature-flag `loosen-follower-log-revert`](#feature-flag-loosen-follower-log-revert) - [feature-flag `serde`](#feature-flag-serde) diff --git a/openraft/src/docs/feature_flags/feature-flags.md b/openraft/src/docs/feature_flags/feature-flags.md index 29fb35be4..c04bc428c 100644 --- a/openraft/src/docs/feature_flags/feature-flags.md +++ b/openraft/src/docs/feature_flags/feature-flags.md @@ -13,14 +13,9 @@ toolchain, the unstable features have to be enabled explicitly with environment attaches backtrace to generated errors. This feature works ONLY with nightly rust, because it requires unstable feature `error_generic_member_access`. -## feature-flag `compat-07` +## feature-flag `compat` -Provides additional data types to build v0.7 compatible RaftStorage. - - ```toml - compat-07 = ["compat", "single-term-leader", "serde", "dep:or07", "compat-07-testing"] - compat-07-testing = ["dep:tempdir", "anyhow", "dep:serde_json"] - ``` +Enables compatibility supporting types. ## feature-flag `generic-snapshot-data` diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 583c1b0fc..d50ad81b2 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -37,7 +37,6 @@ mod summary; mod vote; #[cfg(feature = "compat")] pub mod compat; -#[cfg(feature = "compat-07")] pub use or07; pub mod async_runtime; pub mod entry; diff --git a/rocksstore-compat07/Cargo.toml b/rocksstore-compat07/Cargo.toml deleted file mode 100644 index 5969e9fcd..000000000 --- a/rocksstore-compat07/Cargo.toml +++ /dev/null @@ -1,35 +0,0 @@ -[package] -name = "openraft-rocksstore-compat07" -description = "A example v07 compatible implementation of the `openraft::RaftStorage` trait." -documentation = "https://docs.rs/openraft-rocksstore-copmat07" -readme = "README.md" - -version = { workspace = true } -edition = { workspace = true } -authors = { workspace = true } -categories = { workspace = true } -homepage = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -repository = { workspace = true } - - -[dependencies] -openraft = { path = "../openraft", package = "openraft", version = "0.9.0", features = ["compat-07", "bt"] } - -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -byteorder = "1.4.3" -rocksdb = "0.20.1" -serde = { version = "1.0.114", features = ["derive"] } -serde_json = "1.0.57" -tracing = "0.1.29" - -[dev-dependencies] -anyhow = { workspace = true } -tempfile = { version = "3.4.0" } -tokio = { version = "1.25.0" } - -rocksstore07 = { package = "openraft-rocksstore", version = "0.7.4" } - -[package.metadata.docs.rs] -all-features = true diff --git a/rocksstore-compat07/README.md b/rocksstore-compat07/README.md deleted file mode 100644 index e744b3d57..000000000 --- a/rocksstore-compat07/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# openraft-rockcsstore-0708 - -This is an example `RaftStorage` implementation illustrating how to upgrade an application based on [openraft-0.7](https://github.com/datafuselabs/openraft/tree/release-0.7) to [openraft-0.8](https://github.com/datafuselabs/openraft/tree/release-0.8). - -In this example, it assumes there is an on-disk rocksdb dir that backs an 0.7 application, -and the compatible 0708 should be able to read data from this dir and write data to it. diff --git a/rocksstore-compat07/src/compatibility_test.rs b/rocksstore-compat07/src/compatibility_test.rs deleted file mode 100644 index f2dcd67ad..000000000 --- a/rocksstore-compat07/src/compatibility_test.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::path::Path; -use std::sync::Arc; - -use openraft::compat; - -/// A builder builds openraft-0.7 based rocksstore. -struct Builder07; -/// A builder builds latest openraft based rocksstore. -struct BuilderLatest; - -impl compat::testing::StoreBuilder07 for Builder07 { - type D = rocksstore07::RocksRequest; - type R = rocksstore07::RocksResponse; - type S = Arc; - - async fn build(&self, p: &Path) -> Arc { - rocksstore07::RocksStore::new(p).await - } - - fn sample_app_data(&self) -> Self::D { - rocksstore07::RocksRequest::Set { - key: s("foo"), - value: s("bar"), - } - } -} - -impl compat::testing::StoreBuilder for BuilderLatest { - type C = crate::TypeConfig; - type S = Arc; - - async fn build(&self, p: &Path) -> Arc { - crate::RocksStore::new(p).await - } - - fn sample_app_data(&self) -> <::C as openraft::RaftTypeConfig>::D { - crate::RocksRequest::Set { - key: s("foo"), - value: s("bar"), - } - } -} - -#[tokio::test] -async fn test_compatibility_with_rocksstore_07() -> anyhow::Result<()> { - let suite = compat::testing::Suite07 { - builder07: Builder07, - builder_latest: BuilderLatest, - }; - - suite.test_all().await?; - - Ok(()) -} - -fn s(v: impl ToString) -> String { - v.to_string() -} diff --git a/rocksstore-compat07/src/lib.rs b/rocksstore-compat07/src/lib.rs deleted file mode 100644 index e4dd13ee8..000000000 --- a/rocksstore-compat07/src/lib.rs +++ /dev/null @@ -1,696 +0,0 @@ -#![deny(unused_crate_dependencies)] -#![deny(unused_qualifications)] - -//! This is an example implementation of the [`RaftStorage`] trait for an application that -//! needs to upgrade from openraft v0.7 to v0.8. -//! -//! Openraft v0.8 introduced several changes to the data types related to persistent data. This -//! example demonstrates how to upgrade the implementation of the storage without requiring any -//! modifications to the on-disk data, using the [`openraft::compat`] compatibility layer. -//! -//! This is a modified version of rocksstore that tries to -//! deserialize data into a compatible type, such as [`compat07::LogId`], when reading data from -//! rocksdb, and then upgrade it to the latest format. You can find usages of `compat07::*` that are -//! used in this implementation to provide compatibility with older data. -//! -//! [`RaftStorage`]: RaftStorage -//! [`openraft::compat`]: openraft::compat -//! [`compat07::LogId`]: compat07::LogId - -#[cfg(test)] mod compatibility_test; -#[cfg(test)] mod test; - -use std::collections::BTreeMap; -use std::error::Error; -use std::fmt::Debug; -use std::io::Cursor; -use std::ops::RangeBounds; -use std::path::Path; -use std::sync::Arc; - -use async_std::sync::RwLock; -use byteorder::BigEndian; -use byteorder::ReadBytesExt; -use byteorder::WriteBytesExt; -use openraft::compat::compat07; -use openraft::compat::Upgrade; -use openraft::AnyError; -use openraft::EmptyNode; -use openraft::Entry; -use openraft::EntryPayload; -use openraft::ErrorSubject; -use openraft::ErrorVerb; -use openraft::LogId; -use openraft::LogState; -use openraft::OptionalSend; -use openraft::RaftLogReader; -use openraft::RaftSnapshotBuilder; -use openraft::RaftStorage; -use openraft::RaftTypeConfig; -use openraft::Snapshot; -use openraft::SnapshotMeta; -use openraft::StorageError; -use openraft::StorageIOError; -use openraft::StoredMembership; -use openraft::TokioRuntime; -use openraft::Vote; -use rocksdb::ColumnFamily; -use rocksdb::ColumnFamilyDescriptor; -use rocksdb::Direction; -use rocksdb::Options; -use rocksdb::DB; -use serde::Deserialize; -use serde::Serialize; - -pub type RocksNodeId = u64; - -openraft::declare_raft_types!( - /// Declare the type configuration for `MemStore`. - pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = EmptyNode, - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime -); - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum RocksRequest { - Set { key: String, value: String }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct RocksResponse { - pub value: Option, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct RocksSnapshot { - pub meta: SnapshotMeta, - pub data: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct RocksSnapshotCompat { - pub meta: compat07::SnapshotMeta, - pub data: Vec, -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct SerializableRocksStateMachine { - pub last_applied_log: Option>, - pub last_membership: StoredMembership, - pub data: BTreeMap, -} - -impl From<&RocksStateMachine> for SerializableRocksStateMachine { - fn from(state: &RocksStateMachine) -> Self { - let mut data = BTreeMap::new(); - - let it = state.db.iterator_cf(state.cf_sm_data(), rocksdb::IteratorMode::Start); - - for item in it { - let (key, value) = item.expect("invalid kv record"); - - let key: &[u8] = &key; - let value: &[u8] = &value; - data.insert( - String::from_utf8(key.to_vec()).expect("invalid key"), - String::from_utf8(value.to_vec()).expect("invalid data"), - ); - } - Self { - last_applied_log: state.get_last_applied_log().expect("last_applied_log"), - last_membership: state.get_last_membership().expect("last_membership"), - data, - } - } -} - -#[derive(Debug, Clone)] -pub struct RocksStateMachine { - /// Application data. - pub db: Arc, -} - -fn sm_r_err(e: E) -> StorageError { - StorageIOError::read_state_machine(&e).into() -} -fn sm_w_err(e: E) -> StorageError { - StorageIOError::write_state_machine(&e).into() -} - -impl RocksStateMachine { - fn cf_sm_meta(&self) -> &ColumnFamily { - self.db.cf_handle("sm_meta").unwrap() - } - - fn cf_sm_data(&self) -> &ColumnFamily { - self.db.cf_handle("sm_data").unwrap() - } - - fn get_last_membership(&self) -> StorageResult> { - let bs = self.db.get_cf(self.cf_sm_meta(), b"last_membership").map_err(sm_r_err)?; - let bs = if let Some(x) = bs { - x - } else { - return Ok(StoredMembership::default()); - }; - - let em = serde_json::from_slice::(&bs).map_err(sm_r_err)?; - Ok(em.upgrade()) - } - - fn set_last_membership(&self, membership: StoredMembership) -> StorageResult<()> { - self.db - .put_cf( - self.cf_sm_meta(), - b"last_membership", - serde_json::to_vec(&membership).map_err(sm_w_err)?, - ) - .map_err(sm_w_err) - } - - fn get_last_applied_log(&self) -> StorageResult>> { - let bs = self.db.get_cf(self.cf_sm_meta(), b"last_applied_log").map_err(sm_r_err)?; - let bs = if let Some(x) = bs { - x - } else { - return Ok(None); - }; - - let log_id = serde_json::from_slice::(&bs).map_err(sm_r_err)?; - Ok(Some(log_id.upgrade())) - } - - fn set_last_applied_log(&self, log_id: LogId) -> StorageResult<()> { - self.db - .put_cf( - self.cf_sm_meta(), - "last_applied_log".as_bytes(), - serde_json::to_vec(&log_id).map_err(sm_w_err)?, - ) - .map_err(sm_w_err) - } - - fn from_serializable(sm: SerializableRocksStateMachine, db: Arc) -> StorageResult { - let r = Self { db }; - - let cf = r.cf_sm_data(); - for (key, value) in sm.data { - r.db.put_cf(cf, key.as_bytes(), value.as_bytes()).map_err(sm_w_err)?; - } - - if let Some(log_id) = sm.last_applied_log { - r.set_last_applied_log(log_id)?; - } - - r.set_last_membership(sm.last_membership)?; - - Ok(r) - } - - fn new(db: Arc) -> RocksStateMachine { - Self { db } - } - - fn insert(&self, key: String, value: String) -> StorageResult<()> { - self.db - .put_cf(self.cf_sm_data(), key.as_bytes(), value.as_bytes()) - .map_err(|e| StorageIOError::write(&e).into()) - } - - pub fn get(&self, key: &str) -> StorageResult> { - let key = key.as_bytes(); - self.db - .get_cf(self.cf_sm_data(), key) - .map(|value| value.map(|v| String::from_utf8(v).expect("invalid data"))) - .map_err(|e| StorageIOError::read(&e).into()) - } -} - -#[derive(Debug)] -pub struct RocksStore { - db: Arc, - - /// The Raft state machine. - pub state_machine: RwLock, -} -type StorageResult = Result>; - -/// converts an id to a byte vector for storing in the database. -/// Note that we're using big endian encoding to ensure correct sorting of keys -fn id_to_bin(id: u64) -> Vec { - let mut buf = Vec::with_capacity(8); - buf.write_u64::(id).unwrap(); - buf -} - -fn bin_to_id(buf: &[u8]) -> u64 { - (&buf[0..8]).read_u64::().unwrap() -} - -/// Meta data of a raft-store. -/// -/// In raft, except logs and state machine, the store also has to store several piece of metadata. -/// This sub mod defines the key-value pairs of these metadata. -mod meta { - use openraft::ErrorSubject; - use openraft::LogId; - - use crate::RocksNodeId; - use crate::RocksSnapshot; - - /// Defines metadata key and value - pub(crate) trait StoreMeta { - /// The key used to store in rocksdb - const KEY: &'static str; - - /// The type of the value to store - type Value: serde::Serialize + serde::de::DeserializeOwned; - - /// The subject this meta belongs to, and will be embedded into the returned storage error. - fn subject(v: Option<&Self::Value>) -> ErrorSubject; - } - - pub(crate) struct LastPurged {} - pub(crate) struct SnapshotIndex {} - pub(crate) struct HardState {} - pub(crate) struct Vote {} - pub(crate) struct Snapshot {} - - impl StoreMeta for LastPurged { - const KEY: &'static str = "last_purged_log_id"; - type Value = LogId; - - fn subject(_v: Option<&Self::Value>) -> ErrorSubject { - ErrorSubject::Store - } - } - impl StoreMeta for SnapshotIndex { - const KEY: &'static str = "snapshot_index"; - type Value = u64; - - fn subject(_v: Option<&Self::Value>) -> ErrorSubject { - ErrorSubject::Store - } - } - impl StoreMeta for HardState { - const KEY: &'static str = "hard_state"; - type Value = (); - - fn subject(_v: Option<&Self::Value>) -> ErrorSubject { - ErrorSubject::Vote - } - } - impl StoreMeta for Vote { - // hard_state is renamed to vote, to hold compatibility, store them by the same key. - const KEY: &'static str = "hard_state"; - type Value = openraft::Vote; - - fn subject(_v: Option<&Self::Value>) -> ErrorSubject { - ErrorSubject::Vote - } - } - impl StoreMeta for Snapshot { - const KEY: &'static str = "snapshot"; - type Value = RocksSnapshot; - - fn subject(_v: Option<&Self::Value>) -> ErrorSubject { - ErrorSubject::None - } - } -} - -impl RocksStore { - fn cf_meta(&self) -> &ColumnFamily { - self.db.cf_handle("meta").unwrap() - } - - fn cf_logs(&self) -> &ColumnFamily { - self.db.cf_handle("logs").unwrap() - } - - /// Get a store metadata. - /// - /// It returns `None` if the store does not have such a metadata stored. - fn get_meta(&self) -> Result, StorageError> { - let v = self - .db - .get_cf(self.cf_meta(), M::KEY) - .map_err(|e| StorageIOError::new(M::subject(None), ErrorVerb::Read, AnyError::new(&e)))?; - - let t = match v { - None => None, - Some(bytes) => Some( - serde_json::from_slice(&bytes) - .map_err(|e| StorageIOError::new(M::subject(None), ErrorVerb::Read, AnyError::new(&e)))?, - ), - }; - Ok(t) - } - - fn get_meta_vec(&self) -> Result>, StorageError> { - let v = self - .db - .get_cf(self.cf_meta(), M::KEY) - .map_err(|e| StorageIOError::new(M::subject(None), ErrorVerb::Read, AnyError::new(&e)))?; - - Ok(v) - } - - /// Save a store metadata. - fn put_meta(&self, value: &M::Value) -> Result<(), StorageError> { - let json_value = serde_json::to_vec(value) - .map_err(|e| StorageIOError::new(M::subject(Some(value)), ErrorVerb::Write, AnyError::new(&e)))?; - - self.db - .put_cf(self.cf_meta(), M::KEY, json_value) - .map_err(|e| StorageIOError::new(M::subject(Some(value)), ErrorVerb::Write, AnyError::new(&e)))?; - - Ok(()) - } -} - -impl RaftLogReader for Arc { - async fn try_get_log_entries + Clone + Debug + OptionalSend>( - &mut self, - range: RB, - ) -> StorageResult>> { - let start = match range.start_bound() { - std::ops::Bound::Included(x) => id_to_bin(*x), - std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), - std::ops::Bound::Unbounded => id_to_bin(0), - }; - - let mut res = Vec::new(); - - let it = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::From(&start, Direction::Forward)); - for item_res in it { - let (id, val) = item_res.map_err(read_logs_err)?; - - let id = bin_to_id(&id); - if !range.contains(&id) { - break; - } - - let entry = serde_json::from_slice::>(&val).map_err(read_logs_err)?; - let entry = entry.upgrade(); - - assert_eq!(id, entry.log_id.index); - - res.push(entry); - } - Ok(res) - } -} - -impl RaftSnapshotBuilder for Arc { - #[tracing::instrument(level = "trace", skip(self))] - async fn build_snapshot(&mut self) -> Result, StorageError> { - let data; - let last_applied_log; - let last_membership; - - { - // Serialize the data of the state machine. - let state_machine = SerializableRocksStateMachine::from(&*self.state_machine.read().await); - data = serde_json::to_vec(&state_machine).map_err(|e| StorageIOError::read_state_machine(&e))?; - - last_applied_log = state_machine.last_applied_log; - last_membership = state_machine.last_membership; - } - - // TODO: we probably want this to be atomic. - let snapshot_idx: u64 = self.get_meta::()?.unwrap_or_default() + 1; - self.put_meta::(&snapshot_idx)?; - - let snapshot_id = if let Some(last) = last_applied_log { - format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) - } else { - format!("--{}", snapshot_idx) - }; - - let meta = SnapshotMeta { - last_log_id: last_applied_log, - last_membership, - snapshot_id, - }; - - let snapshot = RocksSnapshot { - meta: meta.clone(), - data: data.clone(), - }; - - self.put_meta::(&snapshot)?; - - Ok(Snapshot { - meta, - snapshot: Box::new(Cursor::new(data)), - }) - } -} - -impl RaftStorage for Arc { - type LogReader = Self; - type SnapshotBuilder = Self; - - async fn get_log_state(&mut self) -> StorageResult> { - let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); - - let last_log_id = match last { - None => None, - Some(res) => { - let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - let ent = ent.upgrade(); - Some(ent.log_id) - } - }; - - let last_purged_log_id = self.get_meta_vec::()?; - let last_purged_log_id = match last_purged_log_id { - None => None, - Some(bs) => { - let log_id = serde_json::from_slice::(&bs).map_err(read_logs_err)?; - Some(log_id.upgrade()) - } - }; - - let last_log_id = match last_log_id { - None => last_purged_log_id, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - self.put_meta::(vote) - } - - async fn read_vote(&mut self) -> Result>, StorageError> { - // TODO: ? - // Read by old key - let bs = self.get_meta_vec::()?; - let bs = if let Some(bs) = bs { - bs - } else { - return Ok(None); - }; - - let hs = serde_json::from_slice::(&bs).map_err(|e| StorageIOError::read_vote(&e))?; - - let vote = hs.upgrade(); - Ok(Some(vote)) - } - - #[tracing::instrument(level = "trace", skip(self, entries))] - async fn append_to_log(&mut self, entries: I) -> StorageResult<()> - where I: IntoIterator> + OptionalSend { - for entry in entries { - let id = id_to_bin(entry.log_id.index); - assert_eq!(bin_to_id(&id), entry.log_id.index); - self.db - .put_cf( - self.cf_logs(), - id, - serde_json::to_vec(&entry).map_err(|e| StorageIOError::write_logs(&e))?, - ) - .map_err(|e| StorageIOError::write_logs(&e))?; - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn delete_conflict_logs_since(&mut self, log_id: LogId) -> StorageResult<()> { - tracing::debug!("delete_log: [{:?}, +oo)", log_id); - - let from = id_to_bin(log_id.index); - let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff); - self.db - .delete_range_cf(self.cf_logs(), &from, &to) - .map_err(|e| StorageIOError::write_logs(&e).into()) - } - - #[tracing::instrument(level = "debug", skip(self))] - async fn purge_logs_upto(&mut self, log_id: LogId) -> Result<(), StorageError> { - tracing::debug!("delete_log: [0, {:?}]", log_id); - - self.put_meta::(&log_id)?; - - let from = id_to_bin(0); - let to = id_to_bin(log_id.index + 1); - self.db - .delete_range_cf(self.cf_logs(), &from, &to) - .map_err(|e| StorageIOError::write_logs(&e).into()) - } - - async fn last_applied_state( - &mut self, - ) -> Result<(Option>, StoredMembership), StorageError> { - let state_machine = self.state_machine.read().await; - Ok(( - state_machine.get_last_applied_log()?, - state_machine.get_last_membership()?, - )) - } - - #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply_to_state_machine( - &mut self, - entries: &[Entry], - ) -> Result, StorageError> { - let mut res = Vec::with_capacity(entries.len()); - - let sm = self.state_machine.write().await; - - for entry in entries { - tracing::debug!(%entry.log_id, "replicate to sm"); - - sm.set_last_applied_log(entry.log_id)?; - - match entry.payload { - EntryPayload::Blank => res.push(RocksResponse { value: None }), - EntryPayload::Normal(ref req) => match req { - RocksRequest::Set { key, value } => { - sm.insert(key.clone(), value.clone())?; - res.push(RocksResponse { - value: Some(value.clone()), - }) - } - }, - EntryPayload::Membership(ref mem) => { - sm.set_last_membership(StoredMembership::new(Some(entry.log_id), mem.clone()))?; - res.push(RocksResponse { value: None }) - } - }; - } - self.db.flush_wal(true).map_err(|e| StorageIOError::write_logs(&e))?; - Ok(res) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot( - &mut self, - ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) - } - - #[tracing::instrument(level = "trace", skip(self, snapshot))] - async fn install_snapshot( - &mut self, - meta: &SnapshotMeta, - snapshot: Box<::SnapshotData>, - ) -> Result<(), StorageError> { - tracing::info!( - { snapshot_size = snapshot.get_ref().len() }, - "decoding snapshot for installation" - ); - - let new_snapshot = RocksSnapshot { - meta: meta.clone(), - data: snapshot.into_inner(), - }; - - // Update the state machine. - { - let updated_state_machine: SerializableRocksStateMachine = serde_json::from_slice(&new_snapshot.data) - .map_err(|e| { - StorageIOError::new( - ErrorSubject::Snapshot(Some(new_snapshot.meta.signature())), - ErrorVerb::Read, - AnyError::new(&e), - ) - })?; - let mut state_machine = self.state_machine.write().await; - *state_machine = RocksStateMachine::from_serializable(updated_state_machine, self.db.clone())?; - } - - self.put_meta::(&new_snapshot)?; - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&mut self) -> Result>, StorageError> { - let curr_snap = self.get_meta_vec::()?; - let bs = if let Some(x) = curr_snap { - x - } else { - return Ok(None); - }; - - println!("bs: {:?}", String::from_utf8(bs.clone()).unwrap()); - - let curr_snap = serde_json::from_slice::(&bs).map_err(|e| StorageIOError::read(&e))?; - - let d = curr_snap.data; - let meta = if let Ok(meta) = curr_snap.meta.try_upgrade() { - meta - } else { - // SnapshotMeta can not be upgrade. - // It does not have `last_membership` field and can not be installed by a follower. - return Ok(None); - }; - - Ok(Some(Snapshot { - meta, - snapshot: Box::new(Cursor::new(d)), - })) - } - - async fn get_log_reader(&mut self) -> Self::LogReader { - self.clone() - } - - async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { - self.clone() - } -} - -impl RocksStore { - pub async fn new>(db_path: P) -> Arc { - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - - let meta = ColumnFamilyDescriptor::new("meta", Options::default()); - let sm_meta = ColumnFamilyDescriptor::new("sm_meta", Options::default()); - let sm_data = ColumnFamilyDescriptor::new("sm_data", Options::default()); - let logs = ColumnFamilyDescriptor::new("logs", Options::default()); - - let db = DB::open_cf_descriptors(&db_opts, db_path, vec![meta, sm_meta, sm_data, logs]).unwrap(); - - let db = Arc::new(db); - let state_machine = RwLock::new(RocksStateMachine::new(db.clone())); - Arc::new(RocksStore { db, state_machine }) - } -} - -fn read_logs_err(e: impl Error + 'static) -> StorageError { - StorageError::IO { - source: StorageIOError::read_logs(&e), - } -} diff --git a/rocksstore-compat07/src/test.rs b/rocksstore-compat07/src/test.rs deleted file mode 100644 index 6e8e895b1..000000000 --- a/rocksstore-compat07/src/test.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::Arc; - -use openraft::storage::Adaptor; -use openraft::testing::StoreBuilder; -use openraft::testing::Suite; -use openraft::StorageError; -use tempfile::TempDir; - -use crate::RocksNodeId; -use crate::RocksStore; -use crate::TypeConfig; - -type LogStore = Adaptor>; -type StateMachine = Adaptor>; - -struct RocksBuilder {} - -impl StoreBuilder for RocksBuilder { - async fn build(&self) -> Result<(TempDir, LogStore, StateMachine), StorageError> { - let td = TempDir::new().expect("couldn't create temp dir"); - let store = RocksStore::new(td.path()).await; - let (log_store, sm) = Adaptor::new(store); - Ok((td, log_store, sm)) - } -} - -#[test] -pub fn test_rocksstore() -> Result<(), StorageError> { - Suite::test_all(RocksBuilder {})?; - Ok(()) -}