Skip to content

Commit

Permalink
feat: use data container wrapper for mutable metadata (#559)
Browse files Browse the repository at this point in the history
* feat: use data container wrapper for mutable metadata

Existing MID events have a header on data payloads. This data is mutable
metadata. We needed a mechanism to represent this kind of data within
the conclusion events and doc states. We added a simple wrapper type
with metadata and data fields.

* fix: update tests after rebase

* fix: treat metadata as transitive

Instead of expecting each data event header to repeat the metadata we
carry forward the metadata from the previous state and only if its
present do we apply a change to the metadata. Tests have been updated to
reflect this new behavior.

* update test

* refactor: small code reorg for clarity
  • Loading branch information
nathanielc authored Oct 11, 2024
1 parent 052196f commit aad49de
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 46 additions & 11 deletions event-svc/src/event/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex, MutexGuard},
};

Expand Down Expand Up @@ -349,7 +349,7 @@ impl EventService {
} = event;
let stream_cid = event.stream_cid();
let init_event = self.get_event_by_cid(stream_cid).await?;
let init = ConclusionInit::try_from(init_event).map_err(|e| {
let init = ConclusionInit::try_from(&init_event).map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Malformed event found in the database: {}",
e
Expand All @@ -372,7 +372,12 @@ impl EventService {
event_cid,
init,
previous: vec![*data.prev()],
data: data.data().to_json_bytes().map_err(|e| {
data: MIDDataContainer::new_with_should_index(
data.header().and_then(|header| header.should_index()),
Some(data.data()),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
Expand All @@ -386,7 +391,12 @@ impl EventService {
event_cid,
init,
previous: vec![],
data: init_event.data().to_json_bytes().map_err(|e| {
data: MIDDataContainer::new_with_should_index(
Some(init_event.header().should_index()),
init_event.data(),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!(
"Failed to serialize IPLD data: {}",
e
Expand All @@ -402,13 +412,14 @@ impl EventService {
event_cid,
init,
previous: vec![],
data: unsigned_event
.payload()
.data()
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e))
})?,
data: MIDDataContainer::new_with_should_index(
Some(unsigned_event.payload().header().should_index()),
unsigned_event.payload().data(),
)
.to_json_bytes()
.map_err(|e| {
Error::new_app(anyhow::anyhow!("Failed to serialize IPLD data: {}", e))
})?,
index: delivered as u64,
}))
}
Expand Down Expand Up @@ -471,6 +482,30 @@ impl EventService {
}
}

// Small wrapper container around the data field to hold other mutable metadata for the
// event.
// This is Model Instance Document specific. When we have other generic types of ceramic events
// we will need to determine if/how to generalize this container.
#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct MIDDataContainer<'a> {
metadata: BTreeMap<String, Ipld>,
data: Option<&'a Ipld>,
}

impl<'a> MIDDataContainer<'a> {
fn new_with_should_index(should_index: Option<bool>, data: Option<&'a Ipld>) -> Self {
Self {
metadata: should_index
.map(|should_index| {
BTreeMap::from([("shouldIndex".to_string(), should_index.into())])
})
.unwrap_or_default(),
data,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationError {
InvalidFormat {
Expand Down
52 changes: 26 additions & 26 deletions event-svc/src/tests/event.rs

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion event-svc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ async fn data_event(
prev: Cid,
data: Ipld,
signer: &signed::JwkSigner,
should_index: Option<bool>,
) -> signed::Event<Ipld> {
let commit = unvalidated::Builder::data()
.with_id(init_id)
.with_prev(prev)
.with_data(data)
.with_should_index(should_index)
.build();

signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap()
Expand Down Expand Up @@ -220,7 +222,7 @@ async fn get_init_plus_n_events_with_model(
"raw": data.as_slice(),
});

let data = data_event(init_cid, prev, data, &signer).await;
let data = data_event(init_cid, prev, data, &signer, None).await;
let (data_id, data_car) = (
build_event_id(data.envelope_cid(), &init_cid, model),
data.encode_car().unwrap(),
Expand Down Expand Up @@ -288,6 +290,7 @@ pub(crate) async fn generate_signed_stream_data() -> Vec<(EventId, unvalidated::
"stream_1" : "data_1"
}),
&signer,
None,
)
.await;

Expand All @@ -298,6 +301,7 @@ pub(crate) async fn generate_signed_stream_data() -> Vec<(EventId, unvalidated::
"stream_1" : "data_2"
}),
&signer,
None,
)
.await;

Expand Down Expand Up @@ -332,6 +336,7 @@ pub(crate) async fn generate_unsigned_stream_data_anchored(
"stream2" : "data_1"
}),
&signer,
Some(false),
)
.await;
let time = time_event(
Expand Down
4 changes: 2 additions & 2 deletions event/src/unvalidated/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ impl DataBuilder<DataBuilderWithPrev> {

impl<D> DataBuilder<DataBuilderWithData<D>> {
/// Specify should_index.
pub fn with_should_index(mut self, should_index: bool) -> Self {
self.state.should_index = Some(should_index);
pub fn with_should_index(mut self, should_index: Option<bool>) -> Self {
self.state.should_index = should_index;
self
}

Expand Down
4 changes: 2 additions & 2 deletions event/src/unvalidated/payload/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Header {
}

/// Signal to indexers whether this stream should be indexed
pub fn should_index(&self) -> bool {
self.should_index.unwrap_or(true)
pub fn should_index(&self) -> Option<bool> {
self.should_index
}
}
4 changes: 2 additions & 2 deletions flight/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ pub struct ConclusionTime {
//TODO Add temporal conclusions, i.e the block timestamp of this event
}

impl TryFrom<unvalidated::Event<Ipld>> for ConclusionInit {
impl<'a> TryFrom<&'a unvalidated::Event<Ipld>> for ConclusionInit {
type Error = anyhow::Error;

fn try_from(event: unvalidated::Event<Ipld>) -> Result<Self> {
fn try_from(event: &'a unvalidated::Event<Ipld>) -> Result<Self> {
// Extract the init payload from the event
let init_payload = event
.init_payload()
Expand Down
1 change: 1 addition & 0 deletions olap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ names.workspace = true
object_store = { version = "0.11", features = ["aws"] }
prometheus-client.workspace = true
serde_json.workspace = true
serde.workspace = true
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
tokio = { workspace = true, features = ["fs", "rt-multi-thread"] }
Expand Down
38 changes: 29 additions & 9 deletions olap/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use arrow::{
array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder},
Expand All @@ -13,6 +13,7 @@ use datafusion::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
},
};
use json_patch::PatchOperation;

/// Applies a Ceramic data event to a document state returning the new document state.
#[derive(Debug)]
Expand Down Expand Up @@ -60,19 +61,40 @@ impl WindowUDFImpl for CeramicPatch {
Ok(Box::new(CeramicPatchEvaluator))
}
}
// Small wrapper container around the data/state fields to hold
// other mutable metadata for the event.
// This is specific to Model Instance Documents.
// Metadata is considered to be mutable from event to event and a overriting merge is performed
// with the previous metadata to the current metadata.
// This means if a metadata key is missing it is propogated forward until a new data event changes
// its value.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct MIDDataContainer<D> {
metadata: BTreeMap<String, serde_json::Value>,
data: D,
}

type MIDDataContainerPatch = MIDDataContainer<Vec<PatchOperation>>;
type MIDDataContainerState = MIDDataContainer<serde_json::Value>;

#[derive(Debug)]
struct CeramicPatchEvaluator;

impl CeramicPatchEvaluator {
fn apply_patch(patch: &str, previous_state: &str) -> Result<String> {
let patch: Vec<json_patch::PatchOperation> = serde_json::from_str(patch)
let patch: MIDDataContainerPatch = serde_json::from_str(patch)
.map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?;
let mut new_state: serde_json::Value = serde_json::from_str(previous_state)
let mut state: MIDDataContainerState = serde_json::from_str(previous_state)
.map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?;
json_patch::patch(&mut new_state, &patch)
// If the state is null use an empty object in order to apply the patch to a valid object.
if serde_json::Value::Null == state.data {
state.data = serde_json::Value::Object(serde_json::Map::default());
}
state.metadata.extend(patch.metadata);
json_patch::patch(&mut state.data, &patch.data)
.map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?;
serde_json::to_string(&new_state)
serde_json::to_string(&state)
.map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}"))
}
}
Expand Down Expand Up @@ -143,10 +165,8 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
} else {
//Init event, patch value is the initial state
if patches.is_null(i) {
// If we have an init event without data use an empty object as the initial
// state. This feels like a leaky abstraction is this expected based on the
// Ceramic spec?
new_states.append_value("{}");
// We have an init event without data
new_states.append_null();
} else {
new_states.append_value(patches.value(i));
}
Expand Down
Loading

0 comments on commit aad49de

Please sign in to comment.