diff --git a/src/subscriber.rs b/src/subscriber.rs index 342a9fb1b..b2d1e0d24 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -133,21 +133,6 @@ impl Subscriber { }; } } - - fn poll_oneshot( - self: &mut Pin<&mut Self>, - mut oneshot: OneShot>, - cx: &mut Context<'_>, - ) -> Option>> { - match Future::poll(Pin::new(&mut oneshot), cx) { - Poll::Ready(Some(event)) => Some(Poll::Ready(event)), - Poll::Ready(None) => None, - Poll::Pending => { - self.existing = Some(oneshot); - Some(Poll::Pending) - } - } - } } impl Future for Subscriber { @@ -158,20 +143,25 @@ impl Future for Subscriber { cx: &mut Context<'_>, ) -> Poll { loop { - if let Some(future_rx) = self.existing.take() { - if let Some(poll) = self.poll_oneshot(future_rx, cx) { - return poll; - } - } - - match self.rx.try_recv() { - Ok(future_rx) => { - if let Some(poll) = self.poll_oneshot(future_rx, cx) { - return poll; + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + match self.rx.try_recv() { + Ok(future_rx) => future_rx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Poll::Ready(None) } } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + }; + + match Future::poll(Pin::new(&mut future_rx), cx) { + Poll::Ready(Some(event)) => return Poll::Ready(event), + Poll::Ready(None) => continue, + Poll::Pending => { + self.existing = Some(future_rx); + return Poll::Pending; + } } } let mut home = self.home.write();