Skip to content

Commit

Permalink
Replace in-house implementation with smallvec
Browse files Browse the repository at this point in the history
  • Loading branch information
ishitatsuyuki authored and M-Adoo committed Jan 22, 2020
1 parent 7b55752 commit cab8756
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 78 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "MIT"
futures-preview = "=0.3.0-alpha.18"
lazy_static = "1.3.0"
futures-timer = "0.3.0"
smallvec = "1.1.0"

[dev-dependencies]
float-cmp = "0.5.3"
104 changes: 26 additions & 78 deletions src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::prelude::*;
use crate::util;
use smallvec::SmallVec;
use std::cell::RefCell;
use std::mem::replace;
use std::mem::transmute;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -34,7 +34,7 @@ impl LocalSubscription {
}

impl TearDownSize for LocalSubscription {
fn teardown_size(&self) -> usize { self.0.borrow().teardown.size() }
fn teardown_size(&self) -> usize { self.0.borrow().teardown.len() }
}

pub trait TearDownSize: SubscriptionLike {
Expand All @@ -50,13 +50,12 @@ impl IntoShared for LocalSubscription {
when it referenced by other.",
);

match inner.teardown {
Teardown::None => {}
_ => panic!(
if !inner.teardown.is_empty() {
panic!(
"LocalSubscription already has some teardown work to do,
can not covert to SharedSubscription "
),
};
);
}
let inner: Inner<Box<dyn SubscriptionLike + Send + Sync + 'static>> =
unsafe { transmute(inner) };
SharedSubscription(Arc::new(Mutex::new(Inner {
Expand Down Expand Up @@ -98,7 +97,7 @@ impl SharedSubscription {
}

impl TearDownSize for SharedSubscription {
fn teardown_size(&self) -> usize { self.0.lock().unwrap().teardown.size() }
fn teardown_size(&self) -> usize { self.0.lock().unwrap().teardown.len() }
}

impl SubscriptionLike for SharedSubscription {
Expand Down Expand Up @@ -129,25 +128,9 @@ impl<Item, Err, T> Publisher<Item, Err> for T where
{
}

enum Teardown<T> {
None,
Once(T),
Multi(Vec<T>),
}

impl<T> Teardown<T> {
fn size(&self) -> usize {
match self {
Teardown::None => 0,
Teardown::Once(_) => 1,
Teardown::Multi(ref vec) => vec.len(),
}
}
}

struct Inner<T> {
closed: bool,
teardown: Teardown<T>,
teardown: SmallVec<[T; 1]>,
}

impl<T: SubscriptionLike> Inner<T> {
Expand All @@ -157,12 +140,8 @@ impl<T: SubscriptionLike> Inner<T> {
fn unsubscribe(&mut self) {
if !self.closed {
self.closed = true;
match self.teardown {
Teardown::None => {}
Teardown::Once(ref mut first) => first.unsubscribe(),
Teardown::Multi(ref mut vec) => {
vec.iter_mut().for_each(|v| v.unsubscribe())
}
for v in &mut self.teardown {
v.unsubscribe();
}
}
}
Expand All @@ -171,43 +150,19 @@ impl<T: SubscriptionLike> Inner<T> {
if self.closed {
v.unsubscribe();
}
let teardown = &mut self.teardown;
match teardown {
Teardown::None => *teardown = Teardown::Once(v),
Teardown::Once(_) => {
let first = replace(teardown, Teardown::None);
if let Teardown::Once(first) = first {
*teardown = Teardown::Multi(vec![first, v])
}
}
Teardown::Multi(ref mut vec) => vec.push(v),
}
self.teardown.push(v);
}

fn remove(&mut self, s: &dyn SubscriptionLike) {
let teardown = &mut self.teardown;
match teardown {
Teardown::None => {}
Teardown::Once(ref mut first) => {
if first.inner_addr() == s.inner_addr() {
replace(teardown, Teardown::None);
}
}
Teardown::Multi(ref mut vec) => {
vec.retain(|v| v.inner_addr() != s.inner_addr());
if vec.len() == 1 {
let once = Teardown::Once(vec.pop().unwrap());
replace(teardown, once);
}
}
}
self.teardown.retain(|v| v.inner_addr() != s.inner_addr());
}
}

impl<T> Default for Inner<T> {
fn default() -> Self {
Inner {
closed: false,
teardown: Teardown::None,
teardown: SmallVec::new(),
}
}
}
Expand Down Expand Up @@ -255,31 +210,24 @@ impl<Item, Err> SubscriptionLike
#[cfg(test)]
mod test {
use super::*;
fn teardown_size<T>(td: &Teardown<T>) -> usize {
match td {
Teardown::None => 0,
Teardown::Once(_) => 1,
Teardown::Multi(ref vec) => vec.len(),
}
}
#[test]
fn add_remove_for_local() {
let mut local = LocalSubscription::default();
let l1 = LocalSubscription::default();
let l2 = LocalSubscription::default();
let l3 = LocalSubscription::default();
local.add(l1.clone());
assert_eq!(teardown_size(&local.0.borrow().teardown), 1);
assert_eq!(local.0.borrow().teardown.len(), 1);
local.add(l2.clone());
assert_eq!(teardown_size(&local.0.borrow().teardown), 2);
assert_eq!(local.0.borrow().teardown.len(), 2);
local.add(l3.clone());
assert_eq!(teardown_size(&local.0.borrow().teardown), 3);
assert_eq!(local.0.borrow().teardown.len(), 3);
local.remove(&l1);
assert_eq!(teardown_size(&local.0.borrow().teardown), 2);
assert_eq!(local.0.borrow().teardown.len(), 2);
local.remove(&l2);
assert_eq!(teardown_size(&local.0.borrow().teardown), 1);
assert_eq!(local.0.borrow().teardown.len(), 1);
local.remove(&l3);
assert_eq!(teardown_size(&local.0.borrow().teardown), 0);
assert_eq!(local.0.borrow().teardown.len(), 0);
}

#[test]
Expand All @@ -289,16 +237,16 @@ mod test {
let l2 = SharedSubscription::default();
let l3 = SharedSubscription::default();
local.add(l1.clone());
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 1);
assert_eq!(local.0.lock().unwrap().teardown.len(), 1);
local.add(l2.clone());
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 2);
assert_eq!(local.0.lock().unwrap().teardown.len(), 2);
local.add(l3.clone());
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 3);
assert_eq!(local.0.lock().unwrap().teardown.len(), 3);
local.remove(&l1);
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 2);
assert_eq!(local.0.lock().unwrap().teardown.len(), 2);
local.remove(&l2);
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 1);
assert_eq!(local.0.lock().unwrap().teardown.len(), 1);
local.remove(&l3);
assert_eq!(teardown_size(&local.0.lock().unwrap().teardown), 0);
assert_eq!(local.0.lock().unwrap().teardown.len(), 0);
}
}

0 comments on commit cab8756

Please sign in to comment.