Skip to content

Commit

Permalink
Follow @rrichardson's lead and only poll oneshots in one place
Browse files Browse the repository at this point in the history
  • Loading branch information
asonix authored and spacejam committed Sep 12, 2021
1 parent 7fdd900 commit c8a31e3
Showing 1 changed file with 17 additions and 27 deletions.
44 changes: 17 additions & 27 deletions src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,6 @@ impl Subscriber {
};
}
}

fn poll_oneshot(
self: &mut Pin<&mut Self>,
mut oneshot: OneShot<Option<Event>>,
cx: &mut Context<'_>,
) -> Option<Poll<Option<Event>>> {
match Future::poll(Pin::new(&mut oneshot), cx) {
Poll::Ready(Some(event)) => Some(Poll::Ready(event)),
Poll::Ready(None) => None,
Poll::Pending => {
self.existing = Some(oneshot);
Some(Poll::Pending)
}
}
}
}

impl Future for Subscriber {
Expand All @@ -158,20 +143,25 @@ impl Future for Subscriber {
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
if let Some(future_rx) = self.existing.take() {
if let Some(poll) = self.poll_oneshot(future_rx, cx) {
return poll;
}
}

match self.rx.try_recv() {
Ok(future_rx) => {
if let Some(poll) = self.poll_oneshot(future_rx, cx) {
return poll;
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
match self.rx.try_recv() {
Ok(future_rx) => future_rx,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Poll::Ready(None)
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
};

match Future::poll(Pin::new(&mut future_rx), cx) {
Poll::Ready(Some(event)) => return Poll::Ready(event),
Poll::Ready(None) => continue,
Poll::Pending => {
self.existing = Some(future_rx);
return Poll::Pending;
}
}
}
let mut home = self.home.write();
Expand Down

0 comments on commit c8a31e3

Please sign in to comment.