Skip to content

Commit

Permalink
Store pending oneshot for subscriber future
Browse files Browse the repository at this point in the history
  • Loading branch information
asonix authored and spacejam committed Sep 12, 2021
1 parent 3bfd469 commit 7fdd900
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Senders = Map<usize, (Option<Waker>, SyncSender<OneShot<Option<Event>>>)>;
pub struct Subscriber {
id: usize,
rx: Receiver<OneShot<Option<Event>>>,
existing: Option<OneShot<Option<Event>>>,
home: Arc<RwLock<Senders>>,
}

Expand Down Expand Up @@ -132,29 +133,41 @@ impl Subscriber {
};
}
}

fn poll_oneshot(
self: &mut Pin<&mut Self>,
mut oneshot: OneShot<Option<Event>>,
cx: &mut Context<'_>,
) -> Option<Poll<Option<Event>>> {
match Future::poll(Pin::new(&mut oneshot), cx) {
Poll::Ready(Some(event)) => Some(Poll::Ready(event)),
Poll::Ready(None) => None,
Poll::Pending => {
self.existing = Some(oneshot);
Some(Poll::Pending)
}
}
}
}

impl Future for Subscriber {
type Output = Option<Event>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
if let Some(future_rx) = self.existing.take() {
if let Some(poll) = self.poll_oneshot(future_rx, cx) {
return poll;
}
}

match self.rx.try_recv() {
Ok(mut future_rx) => {
#[allow(unsafe_code)]
let future_rx =
unsafe { std::pin::Pin::new_unchecked(&mut future_rx) };

match Future::poll(future_rx, cx) {
Poll::Ready(Some(event)) => {
return Poll::Ready(event);
}
Poll::Ready(None) => {
continue;
}
Poll::Pending => {
return Poll::Pending;
}
Ok(future_rx) => {
if let Some(poll) = self.poll_oneshot(future_rx, cx) {
return poll;
}
}
Err(TryRecvError::Empty) => break,
Expand Down Expand Up @@ -237,7 +250,7 @@ impl Subscribers {

w_senders.insert(id, (None, tx));

Subscriber { id, rx, home: arc_senders.clone() }
Subscriber { id, rx, existing: None, home: arc_senders.clone() }
}

pub(crate) fn reserve<R: AsRef<[u8]>>(
Expand Down

0 comments on commit 7fdd900

Please sign in to comment.