diff --git a/Cargo.lock b/Cargo.lock index 1f814ea605a2e..869b9e44dd4f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,6 +254,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "autotools" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ec8e3c65baca58d892ae26d20bbe34713d20d0b02fd0ced4254f256aebbbb80" +dependencies = [ + "cc", +] + [[package]] name = "avro-derive" version = "0.0.0" @@ -622,7 +631,8 @@ dependencies = [ "mz-protoc", "ore", "postgres-types", - "protobuf", + "prost", + "prost-types", "rand", "rand_distr", "test-util", @@ -839,6 +849,16 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "compile-time-run" version = "0.2.12" @@ -1145,6 +1165,50 @@ dependencies = [ "syn", ] +[[package]] +name = "cxx" +version = "1.0.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5beeee74e023d146e6fb933296b3b5e07de146ff3c9498c5af6e53cabc986d51" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b49af8e551e84f85d6def97c42b8d93bc5bb0817cce96b56a472b1b19b5bfc2" +dependencies = [ + "cc", + "codespan-reporting", + "lazy_static", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d2033701dc9a0cd15c14bb24d4423b6ba57f6eba079b65b7049b95288dad6b3" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7ddcc58b10f23413d3fef111d1bd64b2bfb76af53f31611e02e9d0b6537be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "darling" version = "0.12.4" @@ -1202,6 +1266,7 @@ dependencies = [ "aws-sdk-sqs", "bincode", "byteorder", + "bytes", "ccsr", "chrono", "crossbeam-channel", @@ -1231,7 +1296,7 @@ dependencies = [ "postgres-protocol", "postgres-util", "prometheus", - "protobuf", + "prost", "pubnub-hyper", "rand", "rdkafka", @@ -1269,6 +1334,7 @@ dependencies = [ "aws-config", "aws-smithy-http", "aws-types", + "bytes", "ccsr", "crossbeam-channel", "enum-iterator", @@ -1388,6 +1454,12 @@ dependencies = [ "syn", ] +[[package]] +name = "diff" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" + [[package]] name = "differential-dataflow" version = "0.12.0" @@ -1746,6 +1818,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fixedbitset" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" + [[package]] name = "flatbuffers" version = "2.0.0" @@ -2362,7 +2440,9 @@ dependencies = [ "num-traits", "ordered-float", "ore", + "prost", "protobuf", + "protobuf-native", "regex", "repr", "serde", @@ -2513,6 +2593,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "link-cplusplus" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cae2cd7ba2f3f63938b9c724475dfb7b9861b545a90324476324ed21dbc8c8" +dependencies = [ + "cc", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -2810,6 +2899,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "mz-avro" version = "0.6.5" @@ -2883,6 +2978,7 @@ dependencies = [ "anyhow", "clap", "ore", + "prost-build", "protobuf", "protobuf-codegen", "protobuf-parse", @@ -3230,6 +3326,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "output_vt100" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9" +dependencies = [ + "winapi", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -3340,6 +3445,7 @@ dependencies = [ "base64", "bincode", "build-info", + "bytes", "criterion", "crossbeam-channel", "differential-dataflow", @@ -3353,7 +3459,7 @@ dependencies = [ "ore", "parquet2", "persist-types", - "protobuf", + "prost", "rand", "semver", "serde", @@ -3368,6 +3474,19 @@ dependencies = [ [[package]] name = "persist-types" version = "0.0.0" +dependencies = [ + "bytes", +] + +[[package]] +name = "petgraph" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f" +dependencies = [ + "fixedbitset", + "indexmap", +] [[package]] name = "pgrepr" @@ -3686,6 +3805,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be91bcc43e73799dc46a6c194a55e7aae1d86cc867c860fd4a436019af21bd8c" +[[package]] +name = "pretty_assertions" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0cfe1b2403f172ba0f234e500906ee0a3e493fb81092dac23ebefe129301cc" +dependencies = [ + "ansi_term", + "ctor", + "diff", + "output_vt100", +] + [[package]] name = "proc-macro-crate" version = "1.0.0" @@ -3792,6 +3923,56 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "prost" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/prost.git#28bc721d6a74b9ed4628cc8f13527a9573cf082d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.9.1" +source = "git+https://github.com/MaterializeInc/prost.git#28bc721d6a74b9ed4628cc8f13527a9573cf082d" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "protobuf-src", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/prost.git#28bc721d6a74b9ed4628cc8f13527a9573cf082d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/prost.git#28bc721d6a74b9ed4628cc8f13527a9573cf082d" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "protobuf" version = "3.0.0-alpha.2" @@ -3810,6 +3991,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "protobuf-native" +version = "0.2.1+3.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86df76d0f2a6933036e8a9f28f1adc8b48081fa681dba07eaa30ac75663f7f4e" +dependencies = [ + "cxx", + "cxx-build", + "paste", + "pretty_assertions", + "protobuf-src", + "tempfile", +] + [[package]] name = "protobuf-parse" version = "3.0.0-alpha.2" @@ -3822,6 +4017,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "protobuf-src" +version = "1.0.4+3.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152161ab96ab8f66c4faeda5179798cc03a6c55c653d314d0e6d2f3ae7a46f1f" +dependencies = [ + "autotools", +] + [[package]] name = "protoc" version = "3.0.0-alpha.2" @@ -4072,6 +4276,7 @@ version = "0.0.0" dependencies = [ "anyhow", "byteorder", + "bytes", "chrono", "chrono-tz", "criterion", @@ -4089,7 +4294,7 @@ dependencies = [ "ore", "persist-types", "proptest", - "protobuf", + "prost", "rand", "regex", "ryu", @@ -4264,6 +4469,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96311ef4a16462c757bb6a39152c40f58f31cd2602a40fceb937e2bc34e6cbab" + [[package]] name = "security-framework" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index acc769ed6e3f5..b35e419f745e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,3 +82,6 @@ debug = 1 headers = { git = "https://github.com/MaterializeInc/headers.git" } # Until https://github.com/jorgecarleitao/parquet-format-rs/pull/2 is merged and released parquet-format-async-temp = { git = "https://github.com/MaterializeInc/parquet-format-rs", branch = "main" } +# Waiting on https://github.com/tokio-rs/prost/pull/576. +prost = { git = "https://github.com/MaterializeInc/prost.git" } +prost-types = { git = "https://github.com/MaterializeInc/prost.git" } diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index 3645178ceaeff..d4bf46a4d28b6 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -204,7 +204,7 @@ ENV RANLIB=$ARCH_GCC-unknown-linux-gnu-ranlib ENV CPP=$ARCH_GCC-unknown-linux-gnu-cpp ENV CC=$ARCH_GCC-unknown-linux-gnu-cc ENV CXX=$ARCH_GCC-unknown-linux-gnu-c++ -ENV LDFLAGS=-fuse-ld=lld +ENV LDFLAGS="-fuse-ld=lld -static-libstdc++" ENV TARGET_CC=$CC ENV TARGET_CXX=$CXX ENV PATH=/opt/x-tools/$ARCH_GCC-unknown-linux-gnu/bin:$PATH diff --git a/ci/test/cargo-test/image/.gitignore b/ci/test/cargo-test/image/.gitignore index 28a4a56196271..88130d85cc0d5 100644 --- a/ci/test/cargo-test/image/.gitignore +++ b/ci/test/cargo-test/image/.gitignore @@ -1,4 +1,7 @@ +/materialized +/protobuf-install +/protoc /shlib -/tests /test-binaries.json /testdrive +/tests diff --git a/ci/test/cargo-test/image/Dockerfile b/ci/test/cargo-test/image/Dockerfile index 321a391a1ab53..2e3eea70bf0f6 100644 --- a/ci/test/cargo-test/image/Dockerfile +++ b/ci/test/cargo-test/image/Dockerfile @@ -17,4 +17,8 @@ COPY tests /tests/ COPY run-tests /usr/local/bin COPY shlib /usr/local/share/shlib/ +# Install the Protobuf compiler from protobuf-src for use by mz_protoc's tests. +COPY protobuf-install /usr/local/ +ENV PROTOC /usr/local/bin/protoc + WORKDIR /workdir diff --git a/demo/billing/Cargo.toml b/demo/billing/Cargo.toml index d5469435fb432..ff306090fdf56 100644 --- a/demo/billing/Cargo.toml +++ b/demo/billing/Cargo.toml @@ -18,7 +18,8 @@ hex = "0.4.3" tracing = "0.1.29" ore = { path = "../../src/ore" } postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" } -protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +prost = "0.9.0" +prost-types = "0.9.0" rand = "0.8.4" rand_distr = "0.4.2" test-util = { path = "../../test/test-util" } diff --git a/demo/billing/src/main.rs b/demo/billing/src/main.rs index 5b834b633e84d..ed04a6f8f84fb 100644 --- a/demo/billing/src/main.rs +++ b/demo/billing/src/main.rs @@ -22,7 +22,7 @@ use std::process; use std::sync::Arc; use anyhow::Result; -use protobuf::Message; +use prost::Message; use tokio::time::{self, Duration}; use test_util::kafka::kafka_client; @@ -114,7 +114,7 @@ async fn create_kafka_messages(config: KafkaConfig) -> Result<()> { let messages_to_send = std::cmp::min(config.messages_per_second, messages_remaining); for _ in 0..messages_to_send { let m = randomizer::random_batch(rng, &mut recordstate); - m.write_to_vec(&mut buf)?; + m.encode(&mut buf)?; tracing::trace!("sending: {:?}", m); let res = k_client.send(&config.topic, &buf); match res { diff --git a/demo/billing/src/randomizer.rs b/demo/billing/src/randomizer.rs index 7e827a265bd94..bebb8c945bc6f 100644 --- a/demo/billing/src/randomizer.rs +++ b/demo/billing/src/randomizer.rs @@ -9,8 +9,7 @@ use chrono::prelude::*; use chrono::DateTime; -use protobuf::well_known_types::Timestamp; -use protobuf::MessageField; +use prost_types::Timestamp; use rand::distributions::Distribution; use rand::seq::SliceRandom; use rand::Rng; @@ -30,10 +29,10 @@ pub struct RecordState { } fn protobuf_timestamp(time: DateTime) -> Timestamp { - let mut ret = Timestamp::new(); - ret.seconds = time.timestamp(); - ret.nanos = time.timestamp_subsec_nanos() as i32; - ret + Timestamp { + seconds: time.timestamp(), + nanos: time.timestamp_subsec_nanos() as i32, + } } /// Construct a Batch that depends on `state` @@ -56,13 +55,12 @@ pub fn random_batch(rng: &mut impl Rng, state: &mut RecordState) -> Batch { records.push(random_record(rng, interval_start_time, dur_val)); } - let mut batch = Batch::new(); - batch.id = id.to_string(); - batch.interval_start = MessageField::some(interval_start); - batch.interval_end = MessageField::some(interval_end); - batch.records = records; - - batch + Batch { + id: id.to_string(), + interval_start: Some(interval_start), + interval_end: Some(interval_end), + records, + } } fn random_record(rng: &mut impl Rng, start_at: DateTime, max_secs: i64) -> Record { @@ -88,15 +86,14 @@ fn random_record(rng: &mut impl Rng, start_at: DateTime, max_secs: i64) -> } } - let mut record = Record::new(); - record.id = Uuid::new_v4().to_string(); - record.interval_start = MessageField::some(protobuf_timestamp(interval_start)); - record.interval_end = MessageField::some(protobuf_timestamp(interval_end)); - record.meter = meter; - record.value = val as i32; - record.info = MessageField::some(ResourceInfo::random(rng)); - - record + Record { + id: Uuid::new_v4().to_string(), + interval_start: Some(protobuf_timestamp(interval_start)), + interval_end: Some(protobuf_timestamp(interval_end)), + meter, + value: val as i32, + info: Some(ResourceInfo::random(rng)), + } } impl Randomizer for ResourceInfo { @@ -104,14 +101,12 @@ impl Randomizer for ResourceInfo { static POSSIBLE_CPUS: &[i32] = &[1, 2]; static POSSIBLE_MEM: &[i32] = &[8, 16]; static POSSIBLE_DISK: &[i32] = &[128]; - - let mut resource_info = ResourceInfo::new(); - resource_info.cpu_num = *POSSIBLE_CPUS.choose(rng).unwrap(); - resource_info.memory_gb = *POSSIBLE_MEM.choose(rng).unwrap(); - resource_info.disk_gb = *POSSIBLE_DISK.choose(rng).unwrap(); - resource_info.client_id = rng.gen_range(1..NUM_CLIENTS as i32); - resource_info.vm_id = rng.gen_range(1000..2000); - - resource_info + ResourceInfo { + cpu_num: *POSSIBLE_CPUS.choose(rng).unwrap(), + memory_gb: *POSSIBLE_MEM.choose(rng).unwrap(), + disk_gb: *POSSIBLE_DISK.choose(rng).unwrap(), + client_id: rng.gen_range(1..NUM_CLIENTS as i32), + vm_id: rng.gen_range(1000..2000), + } } } diff --git a/deny.toml b/deny.toml index 505fe1d56c4ce..642cd371e56b0 100644 --- a/deny.toml +++ b/deny.toml @@ -118,6 +118,9 @@ allow-git = [ # Waiting on https://github.com/stepancheg/rust-protobuf/pull/581. "https://github.com/MaterializeInc/rust-protobuf.git", + # Waiting on https://github.com/tokio-rs/prost/pull/576. + "https://github.com/MaterializeInc/prost.git", + # Waiting on https://github.com/bheisler/criterion.rs/pull/543. "https://github.com/MaterializeInc/criterion.rs.git", diff --git a/doc/developer/mzbuild.md b/doc/developer/mzbuild.md index 80dbd39ed81dc..7d0cbb443087f 100644 --- a/doc/developer/mzbuild.md +++ b/doc/developer/mzbuild.md @@ -399,13 +399,12 @@ publish: true In rare cases, it may be necessary to extract files from the build directory of a dependency. The `extract` key specifies a mapping from a - dependent package to a list of files to copy into the build context. Paths - should be relative and are interpreted relative to that crate's build - directory. Each extracted file will be placed in the root of the build - context with the same name as the original file. Copying directories is not - supported. Note that `extract` is only relevant if the dependency has a - custom Cargo build script, as Rust crates without a build script do not - have a build directory. + dependent package to a mapping from source files and directories to + destination directories. Source paths are interpreted relative to that + crate's build directory while destination paths are interpreted relative to + the build context. Note that `extract` is only relevant if the dependency + has a custom Cargo build script, as Rust crates without a build script do + not have a build directory. * `type: cargo-test` builds a special image that simulates `cargo test`. This plugin is very special-cased at the moment, and unlikely to be generally diff --git a/misc/python/materialize/mzbuild.py b/misc/python/materialize/mzbuild.py index ae94ecc4ee025..afb278d1e13a9 100644 --- a/misc/python/materialize/mzbuild.py +++ b/misc/python/materialize/mzbuild.py @@ -284,8 +284,10 @@ def build(self) -> None: if message["reason"] != "build-script-executed": continue package = message["package_id"].split()[0] - for d in self.extract.get(package, []): - shutil.copy(Path(message["out_dir"]) / d, self.path / Path(d).name) + for src, dst in self.extract.get(package, {}).items(): + spawn.runv( + ["cp", "-R", Path(message["out_dir"]) / src, self.path / dst] + ) def run(self) -> None: super().run() @@ -312,6 +314,15 @@ def __init__(self, rd: RepositoryDetails, path: Path, config: Dict[str, Any]): def run(self) -> None: super().run() + CargoBuild( + self.rd, + self.path, + { + "bin": "protoc", + "strip": True, + "extract": {"protobuf-src": {"install": "protobuf-install"}}, + }, + ).build() CargoBuild(self.rd, self.path, {"bin": "testdrive", "strip": True}).build() CargoBuild(self.rd, self.path, {"bin": "materialized", "strip": True}).build() @@ -323,8 +334,10 @@ def run(self) -> None: # error messages would also be sent to the output file in JSON, and the # user would only see a vague "could not compile " error. args = [*self.rd.cargo("test", rustflags=[]), "--locked", "--no-run"] - spawn.runv(args) - output = spawn.capture(args + ["--message-format=json"], unicode=True) + spawn.runv(args, cwd=self.rd.root) + output = spawn.capture( + args + ["--message-format=json"], unicode=True, cwd=self.rd.root + ) tests = [] for line in output.split("\n"): @@ -334,8 +347,6 @@ def run(self) -> None: if message.get("profile", {}).get("test", False): crate_name = message["package_id"].split()[0] target_kind = "".join(message["target"]["kind"]) - # TODO - ask Nikhil if this is long-term correct, - # but it unblocks us for now. if target_kind == "proc-macro": continue slug = crate_name + "." + target_kind @@ -355,7 +366,10 @@ def run(self) -> None: with open(self.path / "tests" / "manifest", "w") as manifest: for (executable, slug, crate_path) in tests: shutil.copy(executable, self.path / "tests" / slug) - spawn.runv([*self.rd.tool("strip"), self.path / "tests" / slug]) + spawn.runv( + [*self.rd.tool("strip"), self.path / "tests" / slug], + cwd=self.rd.root, + ) manifest.write(f"{slug} {crate_path}\n") shutil.move(str(self.path / "materialized"), self.path / "tests") shutil.move(str(self.path / "testdrive"), self.path / "tests") diff --git a/misc/python/materialize/xcompile.py b/misc/python/materialize/xcompile.py index b2556b2d0c1ec..703a77f180732 100644 --- a/misc/python/materialize/xcompile.py +++ b/misc/python/materialize/xcompile.py @@ -79,8 +79,14 @@ def cargo(arch: Arch, subcommand: str, rustflags: List[str]) -> List[str]: _target = target(arch) _target_env = _target.upper().replace("-", "_") - ldflags: List[str] = [] - rustflags += ["-Clink-arg=-Wl,--compress-debug-sections=zlib"] + # `-static-libstdc++` avoids introducing a runtime dependency on the + # extremely new version of libstdc++ that our cross-compiling toolchain + # uses. + ldflags: List[str] = ["-static-libstdc++"] + rustflags += [ + "-Clink-arg=-Wl,--compress-debug-sections=zlib", + "-Clink-arg=-static-libstdc++", + ] if not sys.platform == "darwin": # lld is not yet easy to install on macOS. diff --git a/src/dataflow-types/Cargo.toml b/src/dataflow-types/Cargo.toml index a95a87d7118f5..9d782c291750d 100644 --- a/src/dataflow-types/Cargo.toml +++ b/src/dataflow-types/Cargo.toml @@ -11,6 +11,7 @@ async-trait = "0.1.52" aws-config = { version = "0.4.0", default-features = false, features = ["native-tls"] } aws-types = { version = "0.4.0", features = ["hardcoded-credentials"] } aws-smithy-http = "0.34.0" +bytes = "1.1.0" ccsr = { path = "../ccsr" } crossbeam-channel = "0.5.2" enum-iterator = "0.7.0" diff --git a/src/dataflow-types/src/errors.rs b/src/dataflow-types/src/errors.rs index 0d942ee5855f8..bcaaedc8c5c40 100644 --- a/src/dataflow-types/src/errors.rs +++ b/src/dataflow-types/src/errors.rs @@ -9,6 +9,7 @@ use std::fmt::Display; +use bytes::BufMut; use expr::EvalError; use persist_types::Codec; @@ -26,12 +27,14 @@ impl Codec for DecodeError { "DecodeError".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut B) { - let encoded = match serde_json::to_vec(self) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + match serde_json::to_writer(buf.writer(), self) { Ok(ok) => ok, Err(e) => panic!("Encoding error, trying to encode {}: {}", self, e), }; - buf.extend(encoded.iter()); } fn decode<'a>(buf: &'a [u8]) -> Result { diff --git a/src/dataflow/Cargo.toml b/src/dataflow/Cargo.toml index a42356166bde7..2c3c81d1e1b4a 100644 --- a/src/dataflow/Cargo.toml +++ b/src/dataflow/Cargo.toml @@ -14,6 +14,7 @@ aws-sdk-s3 = { version = "0.4.0", default-features = false } aws-sdk-sqs = { version = "0.4.0", default-features = false } bincode = "1.3.3" byteorder = "1.4.3" +bytes = "1.1.0" ccsr = { path = "../ccsr" } chrono = { version = "0.4.0", default-features = false, features = ["std"] } crossbeam-channel = "0.5.2" @@ -43,7 +44,7 @@ persist-types = { path = "../persist-types" } postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" } postgres-util = { path = "../postgres-util" } prometheus = { git = "https://github.com/MaterializeInc/rust-prometheus.git", default-features = false } -protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +prost = "0.9.0" pubnub-hyper = { git = "https://github.com/MaterializeInc/pubnub-rust", default-features = false } rand = "0.8.4" rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "ssl-vendored", "gssapi-vendored", "libz-static", "zstd"] } diff --git a/src/dataflow/src/source/source.proto b/src/dataflow/src/source/source.proto index 8170770dccc5e..744878d44dafc 100644 --- a/src/dataflow/src/source/source.proto +++ b/src/dataflow/src/source/source.proto @@ -13,7 +13,7 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; -package gen; +package source; message ProtoSourceTimestamp { oneof partition_id { diff --git a/src/dataflow/src/source/timestamp.rs b/src/dataflow/src/source/timestamp.rs index 54cdbcf83ea6c..b4a261ebef852 100644 --- a/src/dataflow/src/source/timestamp.rs +++ b/src/dataflow/src/source/timestamp.rs @@ -27,8 +27,9 @@ use std::collections::HashMap; use std::rc::Rc; use std::time::Instant; -use persist_types::{Codec, ExtendWriteAdapter}; -use protobuf::Message; +use bytes::BufMut; +use persist_types::Codec; +use prost::Message; use timely::order::PartialOrder; use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain}; use timely::progress::{ChangeBatch, Timestamp as TimelyTimestamp}; @@ -711,14 +712,10 @@ impl From<&SourceTimestamp> for ProtoSourceTimestamp { fn from(x: &SourceTimestamp) -> Self { ProtoSourceTimestamp { partition_id: Some(match &x.partition { - PartitionId::Kafka(x) => proto_source_timestamp::Partition_id::kafka(*x), - PartitionId::None => proto_source_timestamp::Partition_id::none( - protobuf::well_known_types::Empty::default(), - ), + PartitionId::Kafka(x) => proto_source_timestamp::PartitionId::Kafka(*x), + PartitionId::None => proto_source_timestamp::PartitionId::None(()), }), mz_offset: x.offset.offset, - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -728,8 +725,8 @@ impl TryFrom for SourceTimestamp { fn try_from(x: ProtoSourceTimestamp) -> Result { let partition = match x.partition_id { - Some(proto_source_timestamp::Partition_id::kafka(x)) => PartitionId::Kafka(x), - Some(proto_source_timestamp::Partition_id::none(_)) => PartitionId::None, + Some(proto_source_timestamp::PartitionId::Kafka(x)) => PartitionId::Kafka(x), + Some(proto_source_timestamp::PartitionId::None(_)) => PartitionId::None, None => return Err("unknown partition_id".into()), }; Ok(SourceTimestamp { @@ -746,14 +743,14 @@ impl Codec for SourceTimestamp { "protobuf[SourceTimestamp]".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { + fn encode(&self, buf: &mut B) { ProtoSourceTimestamp::from(self) - .write_to_writer(&mut ExtendWriteAdapter(buf)) - .expect("infallible for ExtendWriteAdapter") + .encode(buf) + .expect("provided buffer had sufficient capacity") } fn decode<'a>(buf: &'a [u8]) -> Result { - ProtoSourceTimestamp::parse_from_bytes(buf) + ProtoSourceTimestamp::decode(buf) .map_err(|err| err.to_string())? .try_into() } @@ -761,11 +758,7 @@ impl Codec for SourceTimestamp { impl From<&AssignedTimestamp> for ProtoAssignedTimestamp { fn from(x: &AssignedTimestamp) -> Self { - ProtoAssignedTimestamp { - ts: x.0, - unknown_fields: Default::default(), - cached_size: Default::default(), - } + ProtoAssignedTimestamp { ts: x.0 } } } @@ -782,14 +775,17 @@ impl Codec for AssignedTimestamp { "protobuf[AssignedTimestamp]".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { ProtoAssignedTimestamp::from(self) - .write_to_writer(&mut ExtendWriteAdapter(buf)) - .expect("infallible for ExtendWriteAdapter") + .encode(buf) + .expect("provided buffer had sufficient capacity") } fn decode<'a>(buf: &'a [u8]) -> Result { - ProtoAssignedTimestamp::parse_from_bytes(buf) + ProtoAssignedTimestamp::decode(buf) .map_err(|err| err.to_string())? .try_into() } diff --git a/src/dataflowd/ci/mzbuild.yml b/src/dataflowd/ci/mzbuild.yml index 9c7bd971f78d3..5b42fcebab226 100644 --- a/src/dataflowd/ci/mzbuild.yml +++ b/src/dataflowd/ci/mzbuild.yml @@ -14,5 +14,5 @@ pre-image: strip: false extract: krb5-src: - - install/bin/kinit - - install/bin/kdestroy + install/bin/kinit: . + install/bin/kdestroy: . diff --git a/src/interchange/Cargo.toml b/src/interchange/Cargo.toml index b82311b6e865f..3e962c4ad8275 100644 --- a/src/interchange/Cargo.toml +++ b/src/interchange/Cargo.toml @@ -29,6 +29,8 @@ num-traits = "0.2.14" ordered-float = { version = "2.10.0", features = ["serde"] } ore = { path = "../ore" } protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +protobuf-native = "0.2.1" +prost = "0.9.0" mz-protoc = { path = "../protoc" } regex = "1.5.4" repr = { path = "../repr" } diff --git a/src/interchange/benches/protobuf.rs b/src/interchange/benches/protobuf.rs index 3dab016445c10..42757780b8f61 100644 --- a/src/interchange/benches/protobuf.rs +++ b/src/interchange/benches/protobuf.rs @@ -9,7 +9,7 @@ use criterion::{black_box, Criterion, Throughput}; use futures::executor::block_on; -use protobuf::{Message, MessageField}; +use prost::Message; use interchange::protobuf::{DecodedDescriptors, Decoder, NormalizedProtobufMessageName}; @@ -20,49 +20,50 @@ mod gen { } pub fn bench_protobuf(c: &mut Criterion) { - let mut value = Value::new(); - value.l_orderkey = 1; - value.l_orderkey = 155_190; - value.l_suppkey = 7706; - value.l_linenumber = 1; - value.l_quantity = 17.0; - value.l_extendedprice = 21168.23; - value.l_discount = 0.04; - value.l_tax = 0.02; - value.l_returnflag = "N".into(); - value.l_linestatus = "O".into(); - value.l_shipdate = 9567; - value.l_commitdate = 9537; - value.l_receiptdate = 9537; - value.l_shipinstruct = "DELIVER IN PERSON".into(); - value.l_shipmode = "TRUCK".into(); - value.l_comment = "egular courts above the".into(); + let value = Value { + l_orderkey: 155_190, + l_suppkey: 7706, + l_linenumber: 1, + l_quantity: 17.0, + l_extendedprice: 21168.23, + l_discount: 0.04, + l_tax: 0.02, + l_returnflag: "N".into(), + l_linestatus: "O".into(), + l_shipdate: 9567, + l_commitdate: 9537, + l_receiptdate: 9537, + l_shipinstruct: "DELIVER IN PERSON".into(), + l_shipmode: "TRUCK".into(), + l_comment: "egular courts above the".into(), + ..Default::default() + }; - let mut connector = Connector::new(); - connector.version = "0.9.5.Final".into(); - connector.connector = "mysql".into(); - connector.name = "tcph".into(); - connector.server_id = 0; - connector.ts_sec = 0; - connector.gtid = "".into(); - connector.file = "binlog.000004".into(); - connector.pos = 951_896_181; - connector.row = 0; - connector.snapshot = true; - connector.thread = 0; - connector.db = "tcph".into(); - connector.table = "lineitem".into(); - connector.query = "".into(); + let connector = Connector { + version: "0.9.5.Final".into(), + connector: "mysql".into(), + name: "tcph".into(), + server_id: 0, + ts_sec: 0, + gtid: "".into(), + file: "binlog.000004".into(), + pos: 951_896_181, + row: 0, + snapshot: true, + thread: 0, + db: "tcph".into(), + table: "lineitem".into(), + query: "".into(), + }; - let mut record = Record::new(); - record.tcph_tcph_lineitem_value = MessageField::some(value); - record.source = MessageField::some(connector); - record.op = "c".into(); - record.ts_ms = 1_560_886_948_093; + let record = Record { + tcph_tcph_lineitem_value: Some(value), + source: Some(connector), + op: "c".into(), + ts_ms: 1_560_886_948_093, + }; - let buf = record - .write_to_bytes() - .expect("record failed to serialize to bytes"); + let buf = record.encode_to_vec(); let len = buf.len() as u64; let mut decoder = Decoder::new( DecodedDescriptors::from_bytes( diff --git a/src/interchange/testdata/benchmark.proto b/src/interchange/testdata/benchmark.proto index f38c712e3afd3..a872764a951b7 100644 --- a/src/interchange/testdata/benchmark.proto +++ b/src/interchange/testdata/benchmark.proto @@ -9,7 +9,7 @@ syntax = "proto3"; -package bench; +package benchmark; message Value { int32 l_orderkey = 1; diff --git a/src/materialized/ci/coordd/mzbuild.yml b/src/materialized/ci/coordd/mzbuild.yml index 6fde07de0eabd..abf19dcacc7a8 100644 --- a/src/materialized/ci/coordd/mzbuild.yml +++ b/src/materialized/ci/coordd/mzbuild.yml @@ -14,5 +14,5 @@ pre-image: strip: false extract: krb5-src: - - install/bin/kinit - - install/bin/kdestroy + install/bin/kinit: . + install/bin/kdestroy: . diff --git a/src/materialized/ci/mzbuild.yml b/src/materialized/ci/mzbuild.yml index 6a72bf945221d..0e1d9264503c8 100644 --- a/src/materialized/ci/mzbuild.yml +++ b/src/materialized/ci/mzbuild.yml @@ -14,5 +14,5 @@ pre-image: strip: false extract: krb5-src: - - install/bin/kinit - - install/bin/kdestroy + install/bin/kinit: . + install/bin/kdestroy: . diff --git a/src/persist-types/Cargo.toml b/src/persist-types/Cargo.toml index 0e5284af43703..545e3bc4fa151 100644 --- a/src/persist-types/Cargo.toml +++ b/src/persist-types/Cargo.toml @@ -8,3 +8,4 @@ publish = false # NB: This is meant to be a strong, independent abstraction boundary, please # don't leak in deps on other Materialize packages. [dependencies] +bytes = "1.1.0" diff --git a/src/persist-types/src/codec_impls.rs b/src/persist-types/src/codec_impls.rs index 0388e57e67935..aa2b4f4471858 100644 --- a/src/persist-types/src/codec_impls.rs +++ b/src/persist-types/src/codec_impls.rs @@ -9,6 +9,8 @@ //! Implementations of [Codec] for stdlib types. +use bytes::BufMut; + use crate::Codec; impl Codec for () { @@ -16,7 +18,10 @@ impl Codec for () { "()".into() } - fn encode Extend<&'a u8>>(&self, _buf: &mut E) { + fn encode(&self, _buf: &mut B) + where + B: BufMut, + { // No-op. } @@ -33,8 +38,11 @@ impl Codec for String { "String".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - buf.extend(self.as_bytes()) + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + buf.put(self.as_bytes()) } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -47,8 +55,11 @@ impl Codec for Vec { "Vec".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - buf.extend(self) + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + buf.put(self.as_slice()) } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -63,14 +74,17 @@ impl Codec for Result { "Result".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut B) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { match self { Ok(r) => { - buf.extend(&[RESULT_OK]); + buf.put(&[RESULT_OK][..]); r.encode(buf); } Err(err) => { - buf.extend(&[RESULT_ERR]); + buf.put(&[RESULT_ERR][..]); err.encode(buf); } } diff --git a/src/persist-types/src/lib.rs b/src/persist-types/src/lib.rs index 0a96039009d2d..818e010549050 100644 --- a/src/persist-types/src/lib.rs +++ b/src/persist-types/src/lib.rs @@ -16,7 +16,7 @@ clippy::cast_sign_loss )] -use std::io; +use bytes::BufMut; mod codec_impls; @@ -33,7 +33,9 @@ pub trait Codec: Sized + 'static { /// This must perfectly round-trip Self through [Codec::decode]. If the /// encode function for this codec ever changes, decode must be able to /// handle bytes output by all previous versions of encode. - fn encode Extend<&'a u8>>(&self, buf: &mut E); + fn encode(&self, buf: &mut B) + where + B: BufMut; /// Decode a key or value previous encoded with this codec's /// [Codec::encode]. /// @@ -48,20 +50,3 @@ pub trait Codec: Sized + 'static { // without any copies, see if we can make the types work out for that. fn decode<'a>(buf: &'a [u8]) -> Result; } - -/// An adaptor to implement [io::Write] for Extend<&u8>. -/// -/// This is a helper for implementations of Codec that internally need a -/// [io::Write]. Writes and flushes are guaranteed to succeed. -pub struct ExtendWriteAdapter<'e, E>(pub &'e mut E); - -impl<'e, E: for<'a> Extend<&'a u8>> io::Write for ExtendWriteAdapter<'e, E> { - fn write(&mut self, buf: &[u8]) -> Result { - self.0.extend(buf); - Ok(buf.len()) - } - - fn flush(&mut self) -> Result<(), io::Error> { - Ok(()) - } -} diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 90483cfae2457..928a1f118992d 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -28,6 +28,7 @@ aws-types = { version = "0.4.0" } base64 = "0.13.0" bincode = "1.3.3" build-info = { path = "../build-info" } +bytes = "1.1.0" crossbeam-channel = "0.5" differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow.git" } fail = { version = "0.5.0", features = ["failpoints"] } @@ -40,7 +41,7 @@ mz-aws-util = { path = "../aws-util", features = ["s3"] } ore = { path = "../ore", default-features = false, features = ["metrics"] } parquet2 = { version = "0.8.1", default-features = false } persist-types = { path = "../persist-types" } -protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +prost = "0.9.0" semver = { version = "1.0.4" } serde = { version = "1.0.133", features = ["derive"] } timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] } diff --git a/src/persist/examples/kafka_upsert.rs b/src/persist/examples/kafka_upsert.rs index 03c502ef40075..53f3bba397ada 100644 --- a/src/persist/examples/kafka_upsert.rs +++ b/src/persist/examples/kafka_upsert.rs @@ -432,6 +432,8 @@ struct AssignedTimestamp(u64); struct SourceTimestamp(P, O); mod kafka_offset_impls { + use bytes::BufMut; + use persist_types::Codec; use crate::AssignedTimestamp; @@ -444,8 +446,11 @@ mod kafka_offset_impls { "KafkaPartition".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - buf.extend(&self.0.to_le_bytes()) + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + buf.put_slice(&self.0.to_le_bytes()) } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -460,8 +465,11 @@ mod kafka_offset_impls { "KafkaOffset".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - buf.extend(&self.0.to_le_bytes()) + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + buf.put_slice(&self.0.to_le_bytes()) } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -476,8 +484,11 @@ mod kafka_offset_impls { "AssignedTimestamp".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - buf.extend(&self.0.to_le_bytes()) + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + buf.put_slice(&self.0.to_le_bytes()) } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -492,16 +503,19 @@ mod kafka_offset_impls { "SourceTimestamp".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { let mut inner_buf = Vec::new(); self.0.encode(&mut inner_buf); - buf.extend(&inner_buf.len().to_le_bytes()); - buf.extend(&inner_buf); + buf.put_slice(&inner_buf.len().to_le_bytes()); + buf.put_slice(&inner_buf); inner_buf.clear(); self.1.encode(&mut inner_buf); - buf.extend(&inner_buf.len().to_le_bytes()); - buf.extend(&inner_buf); + buf.put_slice(&inner_buf.len().to_le_bytes()); + buf.put_slice(&inner_buf); } fn decode<'a>(buf: &'a [u8]) -> Result { diff --git a/src/persist/src/gen.rs b/src/persist/src/gen.rs index 6fadac49a1184..72c8d4f149461 100644 --- a/src/persist/src/gen.rs +++ b/src/persist/src/gen.rs @@ -15,8 +15,9 @@ include!(concat!(env!("OUT_DIR"), "/protobuf/mod.rs")); use std::io::Read; +use bytes::BufMut; use md5::{Digest, Md5}; -use protobuf::Message; +use prost::Message; use crate::error::Error; use crate::gen::persist::ProtoMeta; @@ -75,20 +76,21 @@ impl persist_types::Codec for ProtoMeta { "protobuf+md5[ProtoMeta]".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { // TODO: Move checksum to be a field on the proto instead. We can encode // the proto, checksum'ing as we go, and then manually append it onto // the end. // // TODO: Regardless of the above TODO, compute the checksum as we go and // avoid this temp Vec. - let temp = self - .write_to_bytes() - .expect("no required fields means no initialization errors"); - buf.extend(&[Self::ENCODING_VERSION]); - buf.extend(temp.iter()); + let temp = self.encode_to_vec(); + buf.put_slice(&[Self::ENCODING_VERSION]); + buf.put_slice(&temp); let checksum = Self::md5_checksum(&temp); - buf.extend(&checksum); + buf.put_slice(&checksum); } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -110,8 +112,7 @@ impl persist_types::Codec for ProtoMeta { return Err("checksum mismatch".into()); } - // TODO: Use parse_from_carllerche_bytes to save allocs? - Self::parse_from_bytes(buf).map_err(|err| err.to_string()) + ::decode(buf).map_err(|err| err.to_string()) } } @@ -125,16 +126,16 @@ mod tests { fn checksum() { let meta = ProtoMeta::default(); let mut encoded = Vec::new(); - meta.encode(&mut encoded); + Codec::encode(&meta, &mut encoded); // Intact checksum matches. - assert_eq!(ProtoMeta::decode(&encoded), Ok(meta)); + assert_eq!(::decode(&encoded), Ok(meta)); // Data has been mutated. let mut bad_data = encoded.clone(); bad_data[1] += 1; assert_eq!( - ProtoMeta::decode(&bad_data), + ::decode(&bad_data), Err("checksum mismatch".into()) ); @@ -142,7 +143,7 @@ mod tests { let mut bad_checksum = encoded.clone(); *bad_checksum.last_mut().unwrap() += 1; assert_eq!( - ProtoMeta::decode(&bad_checksum), + ::decode(&bad_checksum), Err("checksum mismatch".into()) ); } @@ -156,15 +157,15 @@ mod tests { ..Default::default() }; let mut encoded = Vec::new(); - meta.encode(&mut encoded); + Codec::encode(&meta, &mut encoded); // Sanity check that we don't just always return errors. - assert_eq!(ProtoMeta::decode(&encoded), Ok(meta)); + assert_eq!(::decode(&encoded), Ok(meta)); // Every subset that's missing at least one byte should error, not panic // or succeed. for i in 0..encoded.len() - 1 { - assert!(ProtoMeta::decode(&encoded[..i]).is_err()); + assert!(::decode(&encoded[..i]).is_err()); } } } diff --git a/src/persist/src/indexed/arrangement.rs b/src/persist/src/indexed/arrangement.rs index 89475edb12da9..f9c05c947e005 100644 --- a/src/persist/src/indexed/arrangement.rs +++ b/src/persist/src/indexed/arrangement.rs @@ -948,7 +948,7 @@ mod tests { ) -> UnsealedBatchMeta { UnsealedBatchMeta { key: key.to_string(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: SeqNo(lower)..SeqNo(upper), ts_upper, ts_lower, @@ -1378,14 +1378,14 @@ mod tests { vec![ TraceBatchMeta { key: "KEY".to_string(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: desc_from(0, 3, 3), level: 1, size_bytes: 0, }, TraceBatchMeta { key: "KEY".to_string(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: desc_from(3, 9, 0), level: 0, size_bytes: 0, @@ -1437,14 +1437,14 @@ mod tests { vec![ TraceBatchMeta { key: "KEY".to_string(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: desc_from(0, 3, 3), level: 1, size_bytes: 0, }, TraceBatchMeta { key: "KEY".to_string(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: desc_from(3, 10, 10), level: 0, size_bytes: 0, diff --git a/src/persist/src/indexed/background.rs b/src/persist/src/indexed/background.rs index 8dbda31ca232f..14996c1286a81 100644 --- a/src/persist/src/indexed/background.rs +++ b/src/persist/src/indexed/background.rs @@ -241,7 +241,7 @@ mod tests { req: req.clone(), merged: TraceBatchMeta { key: "MERGED_KEY".into(), - format: ProtoBatchFormat::ParquetKVTD, + format: ProtoBatchFormat::ParquetKvtd, desc: desc_from(0, 3, 2), level: 1, size_bytes: 0, diff --git a/src/persist/src/indexed/cache.rs b/src/persist/src/indexed/cache.rs index cf3c6cfbf1c65..59929bbe10070 100644 --- a/src/persist/src/indexed/cache.rs +++ b/src/persist/src/indexed/cache.rs @@ -161,7 +161,7 @@ impl BlobCache { debug_assert_eq!(batch.validate(), Ok(()), "{:?}", &batch); let mut val = Vec::new(); - let format = ProtoBatchFormat::ParquetKVTD; + let format = ProtoBatchFormat::ParquetKvtd; batch.encode(&mut val); let val_len = u64::cast_from(val.len()); @@ -263,7 +263,7 @@ impl BlobCache { debug_assert_eq!(batch.validate(), Ok(()), "{:?}", &batch); let mut val = Vec::new(); - let format = ProtoBatchFormat::ParquetKVTD; + let format = ProtoBatchFormat::ParquetKvtd; batch.encode(&mut val); let val_len = u64::cast_from(val.len()); diff --git a/src/persist/src/indexed/columnar/arrow.rs b/src/persist/src/indexed/columnar/arrow.rs index 7d9fdf271c264..1ae1a9baae3de 100644 --- a/src/persist/src/indexed/columnar/arrow.rs +++ b/src/persist/src/indexed/columnar/arrow.rs @@ -79,7 +79,7 @@ pub fn encode_unsealed_arrow(w: &mut W, batch: &BlobUnsealedBatch) -> let mut metadata = HashMap::with_capacity(1); metadata.insert( INLINE_METADATA_KEY.into(), - encode_unsealed_inline_meta(batch, ProtoBatchFormat::ArrowKVTD), + encode_unsealed_inline_meta(batch, ProtoBatchFormat::ArrowKvtd), ); let schema = Schema::new_from(SCHEMA_ARROW_KVTD.fields().clone(), metadata); let options = WriteOptions { compression: None }; @@ -99,7 +99,7 @@ pub fn encode_trace_arrow(w: &mut W, batch: &BlobTraceBatch) -> Result let mut metadata = HashMap::with_capacity(1); metadata.insert( INLINE_METADATA_KEY.into(), - encode_trace_inline_meta(batch, ProtoBatchFormat::ArrowKVTD), + encode_trace_inline_meta(batch, ProtoBatchFormat::ArrowKvtd), ); let schema = Schema::new_from(SCHEMA_ARROW_KVTD.fields().clone(), metadata); let options = WriteOptions { compression: None }; @@ -121,9 +121,9 @@ pub fn decode_unsealed_arrow(r: &mut R) -> Result return Err("unknown format".into()), - ProtoBatchFormat::ArrowKVTD => decode_arrow_file_kvtd(r, file_meta)?, - ProtoBatchFormat::ParquetKVTD => { - return Err("ParquetKVTD format not supported in arrow".into()) + ProtoBatchFormat::ArrowKvtd => decode_arrow_file_kvtd(r, file_meta)?, + ProtoBatchFormat::ParquetKvtd => { + return Err("ParquetKvtd format not supported in arrow".into()) } }; @@ -146,9 +146,9 @@ pub fn decode_trace_arrow(r: &mut R) -> Result return Err("unknown format".into()), - ProtoBatchFormat::ArrowKVTD => decode_arrow_file_kvtd(r, file_meta)?, - ProtoBatchFormat::ParquetKVTD => { - return Err("ParquetKVTD format not supported in arrow".into()) + ProtoBatchFormat::ArrowKvtd => decode_arrow_file_kvtd(r, file_meta)?, + ProtoBatchFormat::ParquetKvtd => { + return Err("ParquetKvtd format not supported in arrow".into()) } }; @@ -161,7 +161,7 @@ pub fn decode_trace_arrow(r: &mut R) -> Result( ) -> Result<(), Error> { encode_parquet_kvtd( w, - encode_unsealed_inline_meta(batch, ProtoBatchFormat::ParquetKVTD), + encode_unsealed_inline_meta(batch, ProtoBatchFormat::ParquetKvtd), &batch.updates, ) } @@ -56,7 +56,7 @@ pub fn encode_trace_parquet(w: &mut W, batch: &BlobTraceBatch) -> Resu .collect(); encode_parquet_kvtd( w, - encode_trace_inline_meta(batch, ProtoBatchFormat::ParquetKVTD), + encode_trace_inline_meta(batch, ProtoBatchFormat::ParquetKvtd), &[updates], ) } @@ -72,10 +72,10 @@ pub fn decode_unsealed_parquet(r: &mut R) -> Result return Err("unknown format".into()), - ProtoBatchFormat::ArrowKVTD => { - return Err("ArrowKVTD format not supported in parquet".into()) + ProtoBatchFormat::ArrowKvtd => { + return Err("ArrowKvtd format not supported in parquet".into()) } - ProtoBatchFormat::ParquetKVTD => decode_parquet_file_kvtd(r)?, + ProtoBatchFormat::ParquetKvtd => decode_parquet_file_kvtd(r)?, }; let ret = BlobUnsealedBatch { @@ -97,10 +97,10 @@ pub fn decode_trace_parquet(r: &mut R) -> Result return Err("unknown format".into()), - ProtoBatchFormat::ArrowKVTD => { + ProtoBatchFormat::ArrowKvtd => { return Err("ArrowKVTD format not supported in parquet".into()) } - ProtoBatchFormat::ParquetKVTD => decode_parquet_file_kvtd(r)?, + ProtoBatchFormat::ParquetKvtd => decode_parquet_file_kvtd(r)?, }; let updates = updates @@ -112,7 +112,7 @@ pub fn decode_trace_parquet(r: &mut R) -> Result Extend<&'a u8>>(&self, buf: &mut E) { - encode_unsealed_parquet(&mut ExtendWriteAdapter(buf), &self) - .expect("writes to ExtendWriteAdapter are infallible"); + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + encode_unsealed_parquet(&mut buf.writer(), &self).expect("writes to BufMut are infallible"); } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -624,9 +627,11 @@ impl Codec for BlobTraceBatch { "parquet[TraceBatch]".into() } - fn encode Extend<&'a u8>>(&self, buf: &mut E) { - encode_trace_parquet(&mut ExtendWriteAdapter(buf), self) - .expect("writes to ExtendWriteAdapter are infallible"); + fn encode(&self, buf: &mut B) + where + B: BufMut, + { + encode_trace_parquet(&mut buf.writer(), self).expect("writes to BufMut are infallible"); } fn decode<'a>(buf: &'a [u8]) -> Result { @@ -678,11 +683,9 @@ impl From<(u64, ProtoArrangement)> for ArrangementMeta { id: Id(id), seal: x .seal - .into_option() .map_or_else(|| Antichain::from_elem(u64::minimum()), |x| x.into()), since: x .since - .into_option() .map_or_else(|| Antichain::from_elem(u64::minimum()), |x| x.into()), unsealed_batches: x.unsealed_batches.into_iter().map(|x| x.into()).collect(), trace_batches: x.trace_batches.into_iter().map(|x| x.into()).collect(), @@ -705,8 +708,8 @@ impl From<(u64, ProtoStreamRegistration)> for StreamRegistration { impl From for UnsealedBatchMeta { fn from(x: ProtoUnsealedBatchMeta) -> Self { UnsealedBatchMeta { + format: x.format(), key: x.key, - format: x.format.enum_value_or_default(), desc: SeqNo(x.seqno_lower)..SeqNo(x.seqno_upper), ts_upper: x.ts_upper, ts_lower: x.ts_lower, @@ -718,9 +721,9 @@ impl From for UnsealedBatchMeta { impl From for TraceBatchMeta { fn from(x: ProtoTraceBatchMeta) -> Self { TraceBatchMeta { + format: x.format(), key: x.key, - format: x.format.enum_value_or_default(), - desc: x.desc.into_option().map_or_else( + desc: x.desc.map_or_else( || { Description::new( Antichain::from_elem(u64::minimum()), @@ -740,13 +743,10 @@ impl From for Description { fn from(x: ProtoU64Description) -> Self { Description::new( x.lower - .into_option() .map_or_else(|| Antichain::from_elem(u64::minimum()), |x| x.into()), x.upper - .into_option() .map_or_else(|| Antichain::from_elem(u64::minimum()), |x| x.into()), x.since - .into_option() .map_or_else(|| Antichain::from_elem(u64::minimum()), |x| x.into()), ) } @@ -767,8 +767,6 @@ impl From<(&BlobMeta, &Version)> for ProtoMeta { id_mapping: x.id_mapping.iter().map(|x| (x.id.0, x.into())).collect(), graveyard: x.graveyard.iter().map(|x| (x.id.0, x.into())).collect(), arrangements: x.arrangements.iter().map(|x| (x.id.0, x.into())).collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -776,12 +774,10 @@ impl From<(&BlobMeta, &Version)> for ProtoMeta { impl From<&ArrangementMeta> for ProtoArrangement { fn from(x: &ArrangementMeta) -> Self { ProtoArrangement { - since: MessageField::some((&x.since).into()), - seal: MessageField::some((&x.seal).into()), + since: Some((&x.since).into()), + seal: Some((&x.seal).into()), unsealed_batches: x.unsealed_batches.iter().map(|x| x.into()).collect(), trace_batches: x.trace_batches.iter().map(|x| x.into()).collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -792,8 +788,6 @@ impl From<&StreamRegistration> for ProtoStreamRegistration { name: x.name.clone(), key_codec_name: x.key_codec_name.clone(), val_codec_name: x.val_codec_name.clone(), - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -808,8 +802,6 @@ impl From<&UnsealedBatchMeta> for ProtoUnsealedBatchMeta { ts_upper: x.ts_upper, ts_lower: x.ts_lower, size_bytes: x.size_bytes, - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -819,11 +811,9 @@ impl From<&TraceBatchMeta> for ProtoTraceBatchMeta { ProtoTraceBatchMeta { key: x.key.clone(), format: x.format.into(), - desc: MessageField::some((&x.desc).into()), + desc: Some((&x.desc).into()), level: x.level, size_bytes: x.size_bytes, - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -832,8 +822,6 @@ impl From<&Antichain> for ProtoU64Antichain { fn from(x: &Antichain) -> Self { ProtoU64Antichain { elements: x.elements().to_vec(), - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -841,11 +829,9 @@ impl From<&Antichain> for ProtoU64Antichain { impl From<&Description> for ProtoU64Description { fn from(x: &Description) -> Self { ProtoU64Description { - lower: MessageField::some(x.lower().into()), - upper: MessageField::some(x.upper().into()), - since: MessageField::some(x.since().into()), - unknown_fields: Default::default(), - cached_size: Default::default(), + lower: Some(x.lower().into()), + upper: Some(x.upper().into()), + since: Some(x.since().into()), } } } @@ -853,41 +839,29 @@ impl From<&Description> for ProtoU64Description { /// Encodes the inline metadata for an unsealed batch into a base64 string. pub fn encode_unsealed_inline_meta(batch: &BlobUnsealedBatch, format: ProtoBatchFormat) -> String { let inline = ProtoBatchInline { - batch_type: Some(proto_batch_inline::Batch_type::unsealed( + batch_type: Some(proto_batch_inline::BatchType::Unsealed( ProtoUnsealedBatchInline { format: format.into(), seqno_lower: batch.desc.start.0, seqno_upper: batch.desc.end.0, - unknown_fields: Default::default(), - cached_size: Default::default(), }, )), - unknown_fields: Default::default(), - cached_size: Default::default(), }; - let inline_encoded = inline - .write_to_bytes() - .expect("no required fields means no initialization errors"); + let inline_encoded = inline.encode_to_vec(); base64::encode(inline_encoded) } /// Encodes the inline metadata for a trace batch into a base64 string. pub fn encode_trace_inline_meta(batch: &BlobTraceBatch, format: ProtoBatchFormat) -> String { let inline = ProtoBatchInline { - batch_type: Some(proto_batch_inline::Batch_type::trace( + batch_type: Some(proto_batch_inline::BatchType::Trace( ProtoTraceBatchInline { format: format.into(), - desc: MessageField::some((&batch.desc).into()), - unknown_fields: Default::default(), - cached_size: Default::default(), + desc: Some((&batch.desc).into()), }, )), - unknown_fields: Default::default(), - cached_size: Default::default(), }; - let inline_encoded = inline - .write_to_bytes() - .expect("no required fields means no initialization errors"); + let inline_encoded = inline.encode_to_vec(); base64::encode(inline_encoded) } @@ -897,14 +871,11 @@ pub fn decode_unsealed_inline_meta( ) -> Result<(ProtoBatchFormat, ProtoUnsealedBatchInline), Error> { let inline_base64 = inline_base64.ok_or("missing batch metadata")?; let inline_encoded = base64::decode(&inline_base64).map_err(|err| err.to_string())?; - let inline = - ProtoBatchInline::parse_from_bytes(&inline_encoded).map_err(|err| err.to_string())?; + let inline = ProtoBatchInline::decode(&*inline_encoded).map_err(|err| err.to_string())?; match inline.batch_type { - Some(proto_batch_inline::Batch_type::unsealed(x)) => { - let format = x - .format - .enum_value() - .map_err(|x| Error::from(format!("unknown format: {}", x)))?; + Some(proto_batch_inline::BatchType::Unsealed(x)) => { + let format = ProtoBatchFormat::from_i32(x.format) + .ok_or_else(|| Error::from(format!("unknown format: {}", x.format)))?; Ok((format, x)) } x => return Err(format!("incorrect batch type: {:?}", x).into()), @@ -917,14 +888,11 @@ pub fn decode_trace_inline_meta( ) -> Result<(ProtoBatchFormat, ProtoTraceBatchInline), Error> { let inline_base64 = inline_base64.ok_or("missing batch metadata")?; let inline_encoded = base64::decode(&inline_base64).map_err(|err| err.to_string())?; - let inline = - ProtoBatchInline::parse_from_bytes(&inline_encoded).map_err(|err| err.to_string())?; + let inline = ProtoBatchInline::decode(&*inline_encoded).map_err(|err| err.to_string())?; match inline.batch_type { - Some(proto_batch_inline::Batch_type::trace(x)) => { - let format = x - .format - .enum_value() - .map_err(|x| Error::from(format!("unknown format: {}", x)))?; + Some(proto_batch_inline::BatchType::Trace(x)) => { + let format = ProtoBatchFormat::from_i32(x.format) + .ok_or_else(|| Error::from(format!("unknown format: {}", x.format)))?; Ok((format, x)) } x => return Err(format!("incorrect batch type: {:?}", x).into()), @@ -1639,7 +1607,7 @@ mod tests { sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)), sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)), ), - "1/1=(481, 497) 25/1=(2229, 2245) 1000/1=(72468, 72484) 1000/100=(106557, 72484)" + "1/1=(481, 501) 25/1=(2229, 2249) 1000/1=(72468, 72488) 1000/100=(106557, 72488)" ); } } diff --git a/src/persist/src/persist.proto b/src/persist/src/persist.proto index d18e36e8ca4ab..846220f350248 100644 --- a/src/persist/src/persist.proto +++ b/src/persist/src/persist.proto @@ -11,7 +11,7 @@ syntax = "proto3"; -package gen; +package persist; message ProtoMeta { // For data written by some vX.Y.Z of Materialize, we'll support reading it @@ -119,12 +119,12 @@ enum ProtoBatchFormat { // but we probably don't want to pay the cost of converting between the // in-mem `[(K, V, T, D)]` representation and anything else (to keep the hot // path clean). Unsealed batches are also likely to be our smallest. For - // this reason, they'll probably always stay as ParquetKVTD. + // this reason, they'll probably always stay as ParquetKvtd. // // For trace batches, we consolidate them before writing them out, so we're // guaranteed to get nothing from the V level of the trie. For duplicate // keys, we'll probably get a good amount of benefit from column specific // compression, and I'd like to exhaust that direction first before dealing // with a trie-like column structure. - ParquetKVTD = 2; + ParquetKvtd = 2; } diff --git a/src/protoc/Cargo.toml b/src/protoc/Cargo.toml index cbdc02934fc37..85c138df23c58 100644 --- a/src/protoc/Cargo.toml +++ b/src/protoc/Cargo.toml @@ -16,4 +16,5 @@ ore = { path = "../ore" } protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } protobuf-codegen = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } protobuf-parse = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +prost-build = { git = "https://github.com/MaterializeInc/prost.git" } tempfile = "3.2.0" diff --git a/src/protoc/src/lib.rs b/src/protoc/src/lib.rs index 375701ceb2df5..cc370da8e7183 100644 --- a/src/protoc/src/lib.rs +++ b/src/protoc/src/lib.rs @@ -28,23 +28,20 @@ //! is to upstream these bugfixes over time. use std::env; -use std::fs::{self, File}; -use std::io::{self, Write}; +use std::fs; +use std::io; use std::path::{Path, PathBuf}; use std::process; use anyhow::{bail, Context}; + use protobuf::descriptor::FileDescriptorSet; -use protobuf::Message; -use protobuf_codegen::Customize; -use protobuf_parse::ParsedAndTypechecked; /// A builder for a protobuf compilation. #[derive(Default, Debug)] pub struct Protoc { includes: Vec, inputs: Vec, - customize: Customize, } impl Protoc { @@ -71,16 +68,6 @@ impl Protoc { self } - fn parse_internal(&self) -> Result { - for input in &self.inputs { - if !input.exists() { - bail!("input protobuf file does not exist: {}", input.display()); - } - } - - protobuf_parse::pure::parse_and_typecheck(&self.includes, &self.inputs) - } - /// Parses the inputs into a file descriptor set. /// /// A [`FileDescriptorSet`] is protobuf's internal representation of a @@ -91,8 +78,15 @@ impl Protoc { /// /// Most users will want to call [`Protoc::compile_into`] or /// [`Protoc::build_script_exec`] instead. + // TODO(benesch): switch this to protobuf_native to avoid the dependency + // on rust-protobuf. pub fn parse(&self) -> Result { - let parsed = self.parse_internal()?; + for input in &self.inputs { + if !input.exists() { + bail!("input protobuf file does not exist: {}", input.display()); + } + } + let parsed = protobuf_parse::pure::parse_and_typecheck(&self.includes, &self.inputs)?; let mut fds = FileDescriptorSet::new(); fds.file = parsed.file_descriptors.into_iter().collect(); Ok(fds) @@ -114,56 +108,9 @@ impl Protoc { ); } - let parsed = self.parse_internal()?; - - protobuf_codegen::gen_and_write::gen_and_write( - &parsed.file_descriptors, - "mz-protoc", - &parsed.relative_paths, - &out_dir, - &self.customize, - )?; - - let mut mods = vec![]; - for entry in fs::read_dir(out_dir)? { - let entry = entry?; - - match entry.path().file_stem() { - None => bail!( - "unexpected file in protobuf out directory: {}", - entry.path().display() - ), - Some(m) => mods.push(m.to_string_lossy().into_owned()), - } - } - - // Generate a module index that includes all generated modules. - // Upstream issue: https://github.com/stepancheg/rust-protobuf/issues/438 - #[rustfmt::skip] - { - let mut f = File::create(out_dir.join("mod.rs"))?; - writeln!(f, "// Generated by Materialize's protoc crate. Do not edit!")?; - writeln!(f, "// @generated")?; - writeln!(f)?; - writeln!(f)?; - for m in &mods { - writeln!(f, "pub mod {};", m)?; - } - writeln!(f)?; - writeln!(f, "pub const FILE_DESCRIPTOR_SET_DATA: &[u8] = include_bytes!(\"file_descriptor_set.pb\");")?; - writeln!(f)?; - writeln!(f, "#[allow(dead_code)]")?; - writeln!(f, "pub fn file_descriptor_set() -> &'static protobuf::descriptor::FileDescriptorSet {{")?; - writeln!(f, " static LAZY: protobuf::rt::LazyV2 = protobuf::rt::LazyV2::INIT;")?; - writeln!(f, " LAZY.get(|| protobuf::Message::parse_from_bytes(FILE_DESCRIPTOR_SET_DATA).unwrap())")?; - writeln!(f, "}}")?; - - let mut f = File::create(out_dir.join("file_descriptor_set.pb")) - .context("creating file_descriptor_set.pb")?; - let mut fds = FileDescriptorSet::new(); - fds.file = parsed.file_descriptors.into_iter().collect(); - fds.write_to_writer(&mut f)?; - }; + prost_build::Config::new() + .out_dir(out_dir) + .compile_protos(&self.inputs, &self.includes)?; Ok(()) } diff --git a/src/protoc/tests/protoc.rs b/src/protoc/tests/protoc.rs index 80b9b5f8c8c53..c7031b2864a58 100644 --- a/src/protoc/tests/protoc.rs +++ b/src/protoc/tests/protoc.rs @@ -16,7 +16,9 @@ use tempfile::TempDir; use mz_protoc::Protoc; -const SIMPLE_PROTO: &str = "message Simple { +const SIMPLE_PROTO: &str = "package mz_protoc; + +message Simple { required int32 i = 1; } "; @@ -63,7 +65,7 @@ fn missing_input_file() -> Result<(), anyhow::Error> { .include(temp_dir.path()) .input(temp_dir.path().join("noexist")) .compile_into(temp_dir.path()); - assert_error(res, "input protobuf file does not exist"); + assert_error(res, "No such file or directory"); Ok(()) } @@ -74,8 +76,10 @@ fn bad_input_file() -> Result<(), anyhow::Error> { .include(temp_dir.path()) .input(proto_path) .compile_into(&temp_dir.path()); - assert_error(&res, "input.proto"); - assert_error(&res, "incorrect input"); + assert_error( + &res, + r#"input.proto:1:1: Expected top-level statement (e.g. "message")."#, + ); Ok(()) } @@ -92,7 +96,9 @@ fn simple_success() -> Result<(), anyhow::Error> { #[test] fn well_known_types() -> Result<(), anyhow::Error> { let (temp_dir, proto_path) = build_workspace( - r#"import "google/protobuf/timestamp.proto"; + r#"package mz_protoc; + +import "google/protobuf/timestamp.proto"; message HasWellKnownType { required google.protobuf.Timestamp ts = 1; diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index 15941ac014ffe..2c114cf8f4834 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -16,6 +16,7 @@ harness = false [dependencies] anyhow = "1.0.52" byteorder = "1.4.3" +bytes = "1.1.0" chrono = { version = "0.4.0", default-features = false, features = ["serde", "std"] } chrono-tz = { version = "0.6.1", features = ["serde", "case-insensitive"] } dec = "0.4.7" @@ -30,7 +31,7 @@ num-traits = "0.2.14" ordered-float = { version = "2.10.0", features = ["serde"] } ore = { path = "../ore", features = ["bytes", "smallvec"] } persist-types = { path = "../persist-types" } -protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" } +prost = "0.9.0" regex = "1.5.4" ryu = "1.0.9" serde = { version = "1.0.133", features = ["derive"] } diff --git a/src/repr/src/row.proto b/src/repr/src/row.proto index b02b75e626601..94fdf046b4dc6 100644 --- a/src/repr/src/row.proto +++ b/src/repr/src/row.proto @@ -9,7 +9,7 @@ syntax = "proto3"; -package gen; +package row; message ProtoRow { repeated ProtoDatum datums = 1; diff --git a/src/repr/src/row/encoding.rs b/src/repr/src/row/encoding.rs index b0800dc0492b2..5aa2c88f895cc 100644 --- a/src/repr/src/row/encoding.rs +++ b/src/repr/src/row/encoding.rs @@ -11,17 +11,18 @@ //! //! See row.proto for details. +use bytes::BufMut; use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, Timelike, Utc}; use dec::Decimal; use ore::cast::CastFrom; -use persist_types::{Codec, ExtendWriteAdapter}; -use protobuf::{Message, MessageField}; +use persist_types::Codec; +use prost::Message; use uuid::Uuid; use crate::adt::array::ArrayDimension; use crate::adt::interval::Interval; use crate::adt::numeric::Numeric; -use crate::gen::row::proto_datum::Datum_type; +use crate::gen::row::proto_datum::DatumType; use crate::gen::row::{ ProtoArray, ProtoArrayDimension, ProtoDate, ProtoDatum, ProtoDatumOther, ProtoDict, ProtoDictElement, ProtoInterval, ProtoNumeric, ProtoRow, ProtoTime, ProtoTimestamp, @@ -38,9 +39,12 @@ impl Codec for Row { /// This perfectly round-trips through [Row::decode]. It's guaranteed to be /// readable by future versions of Materialize through v(TODO: Figure out /// our policy). - fn encode Extend<&'a u8>>(&self, buf: &mut E) { + fn encode(&self, buf: &mut B) + where + B: BufMut, + { ProtoRow::from(self) - .write_to_writer(&mut ExtendWriteAdapter(buf)) + .encode(buf) .expect("no required fields means no initialization errors"); } @@ -50,7 +54,7 @@ impl Codec for Row { /// encoded by historical versions of Materialize back to v(TODO: Figure out /// our policy). fn decode(buf: &[u8]) -> Result { - let proto_row = ProtoRow::parse_from_bytes(buf).map_err(|err| err.to_string())?; + let proto_row = ProtoRow::decode(buf).map_err(|err| err.to_string())?; Row::try_from(&proto_row) } } @@ -58,44 +62,36 @@ impl Codec for Row { impl<'a> From> for ProtoDatum { fn from(x: Datum<'a>) -> Self { let datum_type = match x { - Datum::False => Datum_type::other(ProtoDatumOther::False.into()), - Datum::True => Datum_type::other(ProtoDatumOther::True.into()), - Datum::Int16(x) => Datum_type::int16(x.into()), - Datum::Int32(x) => Datum_type::int32(x), - Datum::Int64(x) => Datum_type::int64(x), - Datum::Float32(x) => Datum_type::float32(x.into_inner()), - Datum::Float64(x) => Datum_type::float64(x.into_inner()), - Datum::Date(x) => Datum_type::date(ProtoDate { + Datum::False => DatumType::Other(ProtoDatumOther::False.into()), + Datum::True => DatumType::Other(ProtoDatumOther::True.into()), + Datum::Int16(x) => DatumType::Int16(x.into()), + Datum::Int32(x) => DatumType::Int32(x), + Datum::Int64(x) => DatumType::Int64(x), + Datum::Float32(x) => DatumType::Float32(x.into_inner()), + Datum::Float64(x) => DatumType::Float64(x.into_inner()), + Datum::Date(x) => DatumType::Date(ProtoDate { year: x.year(), ordinal: x.ordinal(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), - Datum::Time(x) => Datum_type::time(ProtoTime { + Datum::Time(x) => DatumType::Time(ProtoTime { secs: x.num_seconds_from_midnight(), nanos: x.nanosecond(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), - Datum::Timestamp(x) => Datum_type::timestamp(ProtoTimestamp { + Datum::Timestamp(x) => DatumType::Timestamp(ProtoTimestamp { year: x.date().year(), ordinal: x.date().ordinal(), secs: x.time().num_seconds_from_midnight(), nanos: x.time().nanosecond(), is_tz: false, - unknown_fields: Default::default(), - cached_size: Default::default(), }), Datum::TimestampTz(x) => { let date = x.date().naive_utc(); - Datum_type::timestamp(ProtoTimestamp { + DatumType::Timestamp(ProtoTimestamp { year: date.year(), ordinal: date.ordinal(), secs: x.time().num_seconds_from_midnight(), nanos: x.time().nanosecond(), is_tz: true, - unknown_fields: Default::default(), - cached_size: Default::default(), }) } Datum::Interval(x) => { @@ -103,21 +99,17 @@ impl<'a> From> for ProtoDatum { let (mut duration_lo, mut duration_hi) = ([0u8; 8], [0u8; 8]); duration_lo.copy_from_slice(&duration[..8]); duration_hi.copy_from_slice(&duration[8..]); - Datum_type::interval(ProtoInterval { + DatumType::Interval(ProtoInterval { months: x.months, duration_lo: i64::from_le_bytes(duration_lo), duration_hi: i64::from_le_bytes(duration_hi), - unknown_fields: Default::default(), - cached_size: Default::default(), }) } - Datum::Bytes(x) => Datum_type::bytes(x.to_vec()), - Datum::String(x) => Datum_type::string(x.to_owned()), - Datum::Array(x) => Datum_type::array(ProtoArray { - elements: MessageField::some(ProtoRow { + Datum::Bytes(x) => DatumType::Bytes(x.to_vec()), + Datum::String(x) => DatumType::String(x.to_owned()), + Datum::Array(x) => DatumType::Array(ProtoArray { + elements: Some(ProtoRow { datums: x.elements().iter().map(|x| x.into()).collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), dims: x .dims() @@ -125,48 +117,33 @@ impl<'a> From> for ProtoDatum { .map(|x| ProtoArrayDimension { lower_bound: u64::cast_from(x.lower_bound), length: u64::cast_from(x.length), - unknown_fields: Default::default(), - cached_size: Default::default(), }) .collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), - Datum::List(x) => Datum_type::list(ProtoRow { + Datum::List(x) => DatumType::List(ProtoRow { datums: x.iter().map(|x| x.into()).collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), - Datum::Map(x) => Datum_type::dict(ProtoDict { + Datum::Map(x) => DatumType::Dict(ProtoDict { elements: x .iter() .map(|(k, v)| ProtoDictElement { key: k.to_owned(), - val: MessageField::some(v.into()), - unknown_fields: Default::default(), - cached_size: Default::default(), + val: Some(v.into()), }) .collect(), - unknown_fields: Default::default(), - cached_size: Default::default(), }), Datum::Numeric(x) => { // TODO: Do we need this defensive clone? let mut x = x.0.clone(); if let Some((bcd, scale)) = x.to_packed_bcd() { - Datum_type::numeric(ProtoNumeric { - bcd, - scale, - unknown_fields: Default::default(), - cached_size: Default::default(), - }) + DatumType::Numeric(ProtoNumeric { bcd, scale }) } else if x.is_nan() { - Datum_type::other(ProtoDatumOther::NumericNaN.into()) + DatumType::Other(ProtoDatumOther::NumericNaN.into()) } else if x.is_infinite() { if x.is_negative() { - Datum_type::other(ProtoDatumOther::NumericNegInf.into()) + DatumType::Other(ProtoDatumOther::NumericNegInf.into()) } else { - Datum_type::other(ProtoDatumOther::NumericPosInf.into()) + DatumType::Other(ProtoDatumOther::NumericPosInf.into()) } } else if x.is_special() { panic!("internal error: unhandled special numeric value: {}", x); @@ -177,15 +154,13 @@ impl<'a> From> for ProtoDatum { ) } } - Datum::JsonNull => Datum_type::other(ProtoDatumOther::JsonNull.into()), - Datum::Uuid(x) => Datum_type::uuid(x.as_bytes().to_vec()), - Datum::Dummy => Datum_type::other(ProtoDatumOther::Dummy.into()), - Datum::Null => Datum_type::other(ProtoDatumOther::Null.into()), + Datum::JsonNull => DatumType::Other(ProtoDatumOther::JsonNull.into()), + Datum::Uuid(x) => DatumType::Uuid(x.as_bytes().to_vec()), + Datum::Dummy => DatumType::Other(ProtoDatumOther::Dummy.into()), + Datum::Null => DatumType::Other(ProtoDatumOther::Null.into()), }; ProtoDatum { datum_type: Some(datum_type), - unknown_fields: Default::default(), - cached_size: Default::default(), } } } @@ -193,41 +168,39 @@ impl<'a> From> for ProtoDatum { impl From<&Row> for ProtoRow { fn from(x: &Row) -> Self { let datums = x.iter().map(|x| x.into()).collect(); - ProtoRow { - datums, - unknown_fields: Default::default(), - cached_size: Default::default(), - } + ProtoRow { datums } } } impl Row { fn try_push_proto(&mut self, x: &ProtoDatum) -> Result<(), String> { match &x.datum_type { - Some(Datum_type::other(o)) => match o.enum_value() { - Ok(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()), - Ok(ProtoDatumOther::Null) => self.push(Datum::Null), - Ok(ProtoDatumOther::False) => self.push(Datum::False), - Ok(ProtoDatumOther::True) => self.push(Datum::True), - Ok(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull), - Ok(ProtoDatumOther::Dummy) => self.push(Datum::Dummy), - Ok(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())), - Ok(ProtoDatumOther::NumericNegInf) => self.push(Datum::from(-Numeric::infinity())), - Ok(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())), - Err(id) => return Err(format!("unknown datum type: {}", id)), + Some(DatumType::Other(o)) => match ProtoDatumOther::from_i32(*o) { + Some(ProtoDatumOther::Unknown) => return Err("unknown datum type".into()), + Some(ProtoDatumOther::Null) => self.push(Datum::Null), + Some(ProtoDatumOther::False) => self.push(Datum::False), + Some(ProtoDatumOther::True) => self.push(Datum::True), + Some(ProtoDatumOther::JsonNull) => self.push(Datum::JsonNull), + Some(ProtoDatumOther::Dummy) => self.push(Datum::Dummy), + Some(ProtoDatumOther::NumericPosInf) => self.push(Datum::from(Numeric::infinity())), + Some(ProtoDatumOther::NumericNegInf) => { + self.push(Datum::from(-Numeric::infinity())) + } + Some(ProtoDatumOther::NumericNaN) => self.push(Datum::from(Numeric::nan())), + None => return Err(format!("unknown datum type: {}", o)), }, - Some(Datum_type::int16(x)) => { + Some(DatumType::Int16(x)) => { let x = i16::try_from(*x) .map_err(|_| format!("int16 field stored with out of range value: {}", *x))?; self.push(Datum::Int16(x)) } - Some(Datum_type::int32(x)) => self.push(Datum::Int32(*x)), - Some(Datum_type::int64(x)) => self.push(Datum::Int64(*x)), - Some(Datum_type::float32(x)) => self.push(Datum::Float32((*x).into())), - Some(Datum_type::float64(x)) => self.push(Datum::Float64((*x).into())), - Some(Datum_type::bytes(x)) => self.push(Datum::Bytes(x)), - Some(Datum_type::string(x)) => self.push(Datum::String(x)), - Some(Datum_type::uuid(x)) => { + Some(DatumType::Int32(x)) => self.push(Datum::Int32(*x)), + Some(DatumType::Int64(x)) => self.push(Datum::Int64(*x)), + Some(DatumType::Float32(x)) => self.push(Datum::Float32((*x).into())), + Some(DatumType::Float64(x)) => self.push(Datum::Float64((*x).into())), + Some(DatumType::Bytes(x)) => self.push(Datum::Bytes(x)), + Some(DatumType::String(x)) => self.push(Datum::String(x)), + Some(DatumType::Uuid(x)) => { // Uuid internally has a [u8; 16] so we'll have to do at least // one copy, but there's currently an additional one when the // Vec is created. Perhaps the protobuf Bytes support will let @@ -235,13 +208,13 @@ impl Row { let u = Uuid::from_slice(&x).map_err(|err| err.to_string())?; self.push(Datum::Uuid(u)); } - Some(Datum_type::date(x)) => { + Some(DatumType::Date(x)) => { self.push(Datum::Date(NaiveDate::from_yo(x.year, x.ordinal))) } - Some(Datum_type::time(x)) => self.push(Datum::Time( + Some(DatumType::Time(x)) => self.push(Datum::Time( NaiveTime::from_num_seconds_from_midnight(x.secs, x.nanos), )), - Some(Datum_type::timestamp(x)) => { + Some(DatumType::Timestamp(x)) => { let date = NaiveDate::from_yo(x.year, x.ordinal); let time = NaiveTime::from_num_seconds_from_midnight(x.secs, x.nanos); let datetime = date.and_time(time); @@ -251,7 +224,7 @@ impl Row { self.push(Datum::Timestamp(datetime)); } } - Some(Datum_type::interval(x)) => { + Some(DatumType::Interval(x)) => { let mut duration = [0u8; 16]; duration[..8].copy_from_slice(&x.duration_lo.to_le_bytes()); duration[8..].copy_from_slice(&x.duration_hi.to_le_bytes()); @@ -261,13 +234,13 @@ impl Row { duration, })) } - Some(Datum_type::list(x)) => self.push_list_with(|row| -> Result<(), String> { + Some(DatumType::List(x)) => self.push_list_with(|row| -> Result<(), String> { for d in x.datums.iter() { row.try_push_proto(d)?; } Ok(()) })?, - Some(Datum_type::array(x)) => { + Some(DatumType::Array(x)) => { let dims = x .dims .iter() @@ -287,7 +260,7 @@ impl Row { } .map_err(|err| err.to_string())? } - Some(Datum_type::dict(x)) => self.push_dict_with(|row| -> Result<(), String> { + Some(DatumType::Dict(x)) => self.push_dict_with(|row| -> Result<(), String> { for e in x.elements.iter() { row.push(Datum::from(e.key.as_str())); let val = e @@ -298,7 +271,7 @@ impl Row { } Ok(()) })?, - Some(Datum_type::numeric(x)) => { + Some(DatumType::Numeric(x)) => { // Reminder that special values like NaN, PosInf, and NegInf are // represented as variants of ProtoDatumOther. let n = Decimal::from_packed_bcd(&x.bcd, x.scale).map_err(|err| err.to_string())?; diff --git a/src/testdrive/ci/mzbuild.yml b/src/testdrive/ci/mzbuild.yml index 80e2896463087..5065786662534 100644 --- a/src/testdrive/ci/mzbuild.yml +++ b/src/testdrive/ci/mzbuild.yml @@ -13,5 +13,5 @@ pre-image: bin: testdrive extract: krb5-src: - - install/bin/kinit - - install/bin/kdestroy + install/bin/kinit: . + install/bin/kdestroy: .