Skip to content

Commit

Permalink
*: replace rust-protobuf with prost
Browse files Browse the repository at this point in the history
It's time to switch from rust-protobuf to Prost. There are a few factors
in this decsion:

  * The maintainer of rust-protobuf has announced that the project is
    essentially unmaintained for the time being [0].

  * Prost is the more popular Protobuf library across the ecosystem. In
    particular, the Tonic GRPC library uses Prost, and the Tokio tracing
    console uses Prost. These are both extremely important for
    Materialize Platform, which we expect to be built on top of Tonic
    and making heavy use of distributed tracing.

There have been been two major blockers to adopting Prost:

  1. Prost lacked dynamic message support. That was miraculously fixed
     last month with the release of a fantastic prost-reflect crate [1].

  2. Prost embeds `protoc` in its Git repository (!). See
     tokio-rs/prost#575. That is a disaster for supply chain security.
     This commit patches in a Materialize-specific fork of Prost that
     compiles `protoc` from source rather than using upstream's embedded
     binary blobs.

This commit excludes one crucial piece: converting the dynamic Protobuf
decoding in the `interchange` and `testdrive` crates from rust-protobuf
to prost-reflect. I'll do that migration in a future commit to avoid
inflating the size of this PR unnecessarily.

[0]: stepancheg/rust-protobuf#583 (comment)
[1]: https://www.reddit.com/r/rust/comments/rsg6gx/announcing_prostreflect_a_crate_for_protobuf/
  • Loading branch information
benesch committed Jan 18, 2022
1 parent e32c1c4 commit 1c8b453
Show file tree
Hide file tree
Showing 44 changed files with 711 additions and 644 deletions.
236 changes: 204 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ debug = 1
headers = { git = "https://github.com/MaterializeInc/headers.git" }
# Until https://github.com/jorgecarleitao/parquet-format-rs/pull/2 is merged and released
parquet-format-async-temp = { git = "https://github.com/MaterializeInc/parquet-format-rs", branch = "main" }
prost = { git = "https://github.com/MaterializeInc/prost.git" }
prost-types = { git = "https://github.com/MaterializeInc/prost.git" }
2 changes: 1 addition & 1 deletion ci/builder/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ ENV RANLIB=$ARCH_GCC-unknown-linux-gnu-ranlib
ENV CPP=$ARCH_GCC-unknown-linux-gnu-cpp
ENV CC=$ARCH_GCC-unknown-linux-gnu-cc
ENV CXX=$ARCH_GCC-unknown-linux-gnu-c++
ENV LDFLAGS=-fuse-ld=lld
ENV LDFLAGS="-fuse-ld=lld -static-libstdc++"
ENV TARGET_CC=$CC
ENV TARGET_CXX=$CXX
ENV PATH=/opt/x-tools/$ARCH_GCC-unknown-linux-gnu/bin:$PATH
Expand Down
3 changes: 2 additions & 1 deletion demo/billing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ hex = "0.4.3"
tracing = "0.1.29"
ore = { path = "../../src/ore" }
postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" }
protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" }
prost = "0.9.0"
prost-types = "0.9.0"
rand = "0.8.4"
rand_distr = "0.4.2"
test-util = { path = "../../test/test-util" }
Expand Down
4 changes: 2 additions & 2 deletions demo/billing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::process;
use std::sync::Arc;

use anyhow::Result;
use protobuf::Message;
use prost::Message;
use tokio::time::{self, Duration};

use test_util::kafka::kafka_client;
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn create_kafka_messages(config: KafkaConfig) -> Result<()> {
let messages_to_send = std::cmp::min(config.messages_per_second, messages_remaining);
for _ in 0..messages_to_send {
let m = randomizer::random_batch(rng, &mut recordstate);
m.write_to_vec(&mut buf)?;
m.encode(&mut buf)?;
tracing::trace!("sending: {:?}", m);
let res = k_client.send(&config.topic, &buf);
match res {
Expand Down
57 changes: 26 additions & 31 deletions demo/billing/src/randomizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

use chrono::prelude::*;
use chrono::DateTime;
use protobuf::well_known_types::Timestamp;
use protobuf::MessageField;
use prost_types::Timestamp;
use rand::distributions::Distribution;
use rand::seq::SliceRandom;
use rand::Rng;
Expand All @@ -30,10 +29,10 @@ pub struct RecordState {
}

fn protobuf_timestamp(time: DateTime<Utc>) -> Timestamp {
let mut ret = Timestamp::new();
ret.seconds = time.timestamp();
ret.nanos = time.timestamp_subsec_nanos() as i32;
ret
Timestamp {
seconds: time.timestamp(),
nanos: time.timestamp_subsec_nanos() as i32,
}
}

/// Construct a Batch that depends on `state`
Expand All @@ -56,13 +55,12 @@ pub fn random_batch(rng: &mut impl Rng, state: &mut RecordState) -> Batch {
records.push(random_record(rng, interval_start_time, dur_val));
}

let mut batch = Batch::new();
batch.id = id.to_string();
batch.interval_start = MessageField::some(interval_start);
batch.interval_end = MessageField::some(interval_end);
batch.records = records;

batch
Batch {
id: id.to_string(),
interval_start: Some(interval_start),
interval_end: Some(interval_end),
records,
}
}

fn random_record(rng: &mut impl Rng, start_at: DateTime<Utc>, max_secs: i64) -> Record {
Expand All @@ -88,30 +86,27 @@ fn random_record(rng: &mut impl Rng, start_at: DateTime<Utc>, max_secs: i64) ->
}
}

let mut record = Record::new();
record.id = Uuid::new_v4().to_string();
record.interval_start = MessageField::some(protobuf_timestamp(interval_start));
record.interval_end = MessageField::some(protobuf_timestamp(interval_end));
record.meter = meter;
record.value = val as i32;
record.info = MessageField::some(ResourceInfo::random(rng));

record
Record {
id: Uuid::new_v4().to_string(),
interval_start: Some(protobuf_timestamp(interval_start)),
interval_end: Some(protobuf_timestamp(interval_end)),
meter,
value: val as i32,
info: Some(ResourceInfo::random(rng)),
}
}

impl Randomizer for ResourceInfo {
fn random(rng: &mut impl Rng) -> ResourceInfo {
static POSSIBLE_CPUS: &[i32] = &[1, 2];
static POSSIBLE_MEM: &[i32] = &[8, 16];
static POSSIBLE_DISK: &[i32] = &[128];

let mut resource_info = ResourceInfo::new();
resource_info.cpu_num = *POSSIBLE_CPUS.choose(rng).unwrap();
resource_info.memory_gb = *POSSIBLE_MEM.choose(rng).unwrap();
resource_info.disk_gb = *POSSIBLE_DISK.choose(rng).unwrap();
resource_info.client_id = rng.gen_range(1..NUM_CLIENTS as i32);
resource_info.vm_id = rng.gen_range(1000..2000);

resource_info
ResourceInfo {
cpu_num: *POSSIBLE_CPUS.choose(rng).unwrap(),
memory_gb: *POSSIBLE_MEM.choose(rng).unwrap(),
disk_gb: *POSSIBLE_DISK.choose(rng).unwrap(),
client_id: rng.gen_range(1..NUM_CLIENTS as i32),
vm_id: rng.gen_range(1000..2000),
}
}
}
4 changes: 2 additions & 2 deletions misc/python/materialize/xcompile.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def cargo(arch: Arch, subcommand: str, rustflags: List[str]) -> List[str]:
_target = target(arch)
_target_env = _target.upper().replace("-", "_")

ldflags: List[str] = []
rustflags += ["-Clink-arg=-Wl,--compress-debug-sections=zlib"]
ldflags: List[str] = ["-static-libstdc++"]
rustflags += ["-Clink-arg=-Wl,--compress-debug-sections=zlib", "-Clink-arg=-static-libstdc++"]

if not sys.platform == "darwin":
# lld is not yet easy to install on macOS.
Expand Down
1 change: 0 additions & 1 deletion src/coord/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pgrepr = { path = "../pgrepr" }
postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" }
postgres-util = { path = "../postgres-util" }
prometheus = { git = "https://github.com/MaterializeInc/rust-prometheus.git", default-features = false }
protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" }
rand = "0.8.4"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "libz-static"] }
regex = "1.5.4"
Expand Down
69 changes: 16 additions & 53 deletions src/coord/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fs;
use std::iter;
use std::path::Path;

use anyhow::{anyhow, bail};
use anyhow::bail;
use futures::executor::block_on;
use lazy_static::lazy_static;
use protobuf::Message;
use regex::Regex;
use repr::strconv;
use semver::Version;
use tempfile;
use tokio::fs::File;

use mz_protoc::Protoc;
use interchange::protobuf::VirtualProtoFile;
use ore::collections::CollectionExt;
use sql::ast::display::AstDisplay;
use sql::ast::visit_mut::{self, VisitMut};
Expand All @@ -32,7 +30,6 @@ use sql::ast::{
Value, ViewDefinition, WithOption, WithOptionValue,
};
use sql::plan::resolve_names_stmt;
use uuid::Uuid;

use crate::catalog::storage::Transaction;
use crate::catalog::{Catalog, ConnCatalog, SerializedCatalogItem};
Expand Down Expand Up @@ -143,52 +140,18 @@ fn ast_rewrite_kafka_protobuf_source_text_to_compiled_0_9_13(
stmt: &mut sql::ast::Statement<Raw>,
) -> Result<(), anyhow::Error> {
fn compile_proto(schema: &str) -> Result<CsrSeedCompiledEncoding, anyhow::Error> {
let temp_schema_name: String = Uuid::new_v4().to_simple().to_string();
let include_dir = tempfile::tempdir()?;
let schema_path = include_dir.path().join(&temp_schema_name);
let schema_bytes = strconv::parse_bytes(schema)?;
fs::write(&schema_path, &schema_bytes)?;

match Protoc::new()
.include(include_dir.path())
.input(schema_path)
.parse()
{
Ok(fds) => {
let message_name = fds
.file
.iter()
.find(|f| f.get_name() == temp_schema_name)
.map(|file| file.message_type.first())
.flatten()
.map(|message| format!(".{}", message.get_name()))
.ok_or_else(|| anyhow!("unable to compile temporary schema"))?;
let mut schema = String::new();
strconv::format_bytes(&mut schema, &fds.write_to_bytes()?);
Ok(CsrSeedCompiledEncoding {
schema,
message_name,
})
}
Err(e) => {
lazy_static! {
static ref MISSING_IMPORT_ERROR: Regex = Regex::new(
r#"protobuf path \\"(?P<reference>.*)\\" is not found in import path"#
)
.unwrap();
}

// Make protobuf import errors more user-friendly.
if let Some(captures) = MISSING_IMPORT_ERROR.captures(&e.to_string()) {
bail!(
"unsupported protobuf schema reference {}",
&captures["reference"]
)
} else {
Err(e)
}
}
}
let primary = VirtualProtoFile {
path: Path::new("migration.proto"),
contents: schema.as_bytes(),
};
let dependencies = iter::empty();
let descriptors = interchange::protobuf::compile_proto(primary, dependencies)?;
let mut schema = String::new();
strconv::format_bytes(&mut schema, &descriptors.file_descriptor_set);
Ok(CsrSeedCompiledEncoding {
schema,
message_name: descriptors.message_name.into_string(),
})
}

fn do_upgrade(seed: &mut CsrSeedCompiledOrLegacy) -> Result<(), anyhow::Error> {
Expand Down
1 change: 1 addition & 0 deletions src/dataflow-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait = "0.1.52"
aws-config = { version = "0.4.0", default-features = false, features = ["native-tls"] }
aws-types = { version = "0.4.0", features = ["hardcoded-credentials"] }
aws-smithy-http = "0.34.0"
bytes = "1.1.0"
ccsr = { path = "../ccsr" }
crossbeam-channel = "0.5.2"
enum-iterator = "0.7.0"
Expand Down
9 changes: 6 additions & 3 deletions src/dataflow-types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::fmt::Display;

use bytes::BufMut;
use expr::EvalError;
use persist_types::Codec;

Expand All @@ -26,12 +27,14 @@ impl Codec for DecodeError {
"DecodeError".into()
}

fn encode<B: for<'a> Extend<&'a u8>>(&self, buf: &mut B) {
let encoded = match serde_json::to_vec(self) {
fn encode<B: BufMut>(&self, buf: &mut B)
where
B: BufMut,
{
match serde_json::to_writer(buf.writer(), self) {
Ok(ok) => ok,
Err(e) => panic!("Encoding error, trying to encode {}: {}", self, e),
};
buf.extend(encoded.iter());
}

fn decode<'a>(buf: &'a [u8]) -> Result<Self, String> {
Expand Down
10 changes: 4 additions & 6 deletions src/dataflow-types/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub mod sources {
use serde::{Deserialize, Serialize};

use interchange::avro::{self};
use interchange::protobuf::{self, NormalizedProtobufMessageName};
use interchange::protobuf::{self, EncodedDescriptors};
use repr::{ColumnType, RelationDesc, ScalarType};

/// A description of how to interpret data from various sources
Expand Down Expand Up @@ -401,11 +401,10 @@ pub mod sources {
}
DataEncoding::Protobuf(ProtobufEncoding {
descriptors,
message_name,
schema_registry_config: _,
}) => protobuf::DecodedDescriptors::from_bytes(
descriptors,
message_name.to_owned(),
&descriptors.file_descriptor_set,
descriptors.message_name.clone(),
)?
.columns()
.iter()
Expand Down Expand Up @@ -489,8 +488,7 @@ pub mod sources {
/// Encoding in Protobuf format.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ProtobufEncoding {
pub descriptors: Vec<u8>,
pub message_name: NormalizedProtobufMessageName,
pub descriptors: EncodedDescriptors,
pub schema_registry_config: Option<ccsr::ClientConfig>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ aws-sdk-s3 = { version = "0.4.0", default-features = false }
aws-sdk-sqs = { version = "0.4.0", default-features = false }
bincode = "1.3.3"
byteorder = "1.4.3"
bytes = "1.1.0"
ccsr = { path = "../ccsr" }
chrono = { version = "0.4.0", default-features = false, features = ["std"] }
crossbeam-channel = "0.5.2"
Expand Down Expand Up @@ -43,7 +44,7 @@ persist-types = { path = "../persist-types" }
postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres", branch = "mz-0.7.2" }
postgres-util = { path = "../postgres-util" }
prometheus = { git = "https://github.com/MaterializeInc/rust-prometheus.git", default-features = false }
protobuf = { git = "https://github.com/MaterializeInc/rust-protobuf.git" }
prost = "0.9.0"
pubnub-hyper = { git = "https://github.com/MaterializeInc/pubnub-rust", default-features = false }
rand = "0.8.4"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "ssl-vendored", "gssapi-vendored", "libz-static", "zstd"] }
Expand Down
11 changes: 7 additions & 4 deletions src/dataflow/src/decode/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0.

use dataflow_types::{sources::encoding::ProtobufEncoding, DecodeError};
use interchange::protobuf::{DecodedDescriptors, Decoder};
use interchange::protobuf::{DecodedDescriptors, Decoder, EncodedDescriptors};
use repr::Row;

#[derive(Debug)]
Expand All @@ -21,12 +21,15 @@ pub struct ProtobufDecoderState {
impl ProtobufDecoderState {
pub fn new(
ProtobufEncoding {
descriptors,
message_name,
descriptors:
EncodedDescriptors {
file_descriptor_set,
message_name,
},
schema_registry_config,
}: ProtobufEncoding,
) -> Result<Self, anyhow::Error> {
let descriptors = DecodedDescriptors::from_bytes(&descriptors, message_name)
let descriptors = DecodedDescriptors::from_bytes(&file_descriptor_set, message_name)
.expect("descriptors provided to protobuf source are pre-validated");
Ok(ProtobufDecoderState {
decoder: Decoder::new(descriptors, schema_registry_config)?,
Expand Down
2 changes: 1 addition & 1 deletion src/dataflow/src/source/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";

package gen;
package source;

message ProtoSourceTimestamp {
oneof partition_id {
Expand Down
Loading

0 comments on commit 1c8b453

Please sign in to comment.