diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 19302559350..78905e3ccfc 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -98,6 +98,13 @@ where }) } +cfg_rt_threaded! { + #[inline(always)] + pub(crate) fn has_budget_remaining() -> bool { + HITS.with(|hits| hits.get() > 0) + } +} + cfg_blocking_impl! { /// Forcibly remove the budgeting constraints early. pub(crate) fn stop() { diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 0419c209d8a..301554280f9 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -152,7 +152,7 @@ where }; match next { - Some(task) => task.run(), + Some(task) => crate::coop::budget(|| task.run()), None => { // Park until the thread is signaled scheduler.park.park().ok().expect("failed to park"); diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index 233fe45492d..81408135a56 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -13,9 +13,6 @@ use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local { inner: Arc>, - - /// LIFO slot. Cannot be stolen. - next: Option>, } /// Consumer handle. May be used from many threads. @@ -96,7 +93,6 @@ pub(super) fn local() -> (Steal, Local) { let local = Local { inner: inner.clone(), - next: None, }; let remote = Steal(inner); @@ -110,26 +106,6 @@ impl Local { !self.inner.is_empty() } - /// Returns true if the queue has an unstealable entry. - pub(super) fn has_unstealable(&self) -> bool { - self.next.is_some() - } - - /// Push a task to the local queue. Returns `true` if a stealer should be - /// notified. - pub(super) fn push(&mut self, task: task::Notified, inject: &Inject) -> bool { - let prev = self.next.take(); - let ret = prev.is_some(); - - if let Some(prev) = prev { - self.push_back(prev, inject); - } - - self.next = Some(task); - - ret - } - /// Pushes a task to the back of the local queue, skipping the LIFO slot. pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { let tail = loop { @@ -270,11 +246,6 @@ impl Local { /// Pops a task from the local queue. pub(super) fn pop(&mut self) -> Option> { - // If a task is available in the FIFO slot, return that. - if let Some(task) = self.next.take() { - return Some(task); - } - let mut head = self.inner.head.load(Acquire); let idx = loop { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 2092c0aa3bb..573b9f3c9cb 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -160,7 +160,7 @@ impl Core { let waker_ref = waker_ref::(header); let mut cx = Context::from_waker(&*waker_ref); - crate::coop::budget(|| future.poll(&mut cx)) + future.poll(&mut cx) }) }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index c07aa0541f7..400e2a938ca 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -34,6 +34,13 @@ struct Core { /// Used to schedule bookkeeping tasks every so often. tick: u8, + /// When a task is scheduled from a worker, it is stored in this slot. The + /// worker will check this slot for a task **before** checking the run + /// queue. This effectively results in the **last** scheduled task to be run + /// next (LIFO). This is an optimization for message passing patterns and + /// helps to reduce latency. + lifo_slot: Option, + /// The worker-local run queue. run_queue: queue::Local>, @@ -128,6 +135,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { cores.push(Box::new(Core { tick: 0, + lifo_slot: None, run_queue, is_searching: false, is_shutdown: false, @@ -296,13 +304,37 @@ impl Context { *self.core.borrow_mut() = Some(core); // Run the task - task.run(); - - // Try to take the core back - match self.core.borrow_mut().take() { - Some(core) => Ok(core), - None => Err(()), - } + crate::coop::budget(|| { + task.run(); + + // As long as there is budget remaining and a task exists in the + // `lifo_slot`, then keep running. + loop { + // Check if we still have the core. If not, the core was stolen + // by another worker. + let mut core = match self.core.borrow_mut().take() { + Some(core) => core, + None => return Err(()), + }; + + // Check for a task in the LIFO slot + let task = match core.lifo_slot.take() { + Some(task) => task, + None => return Ok(core), + }; + + if crate::coop::has_budget_remaining() { + // Run the LIFO task, then loop + *self.core.borrow_mut() = Some(core); + task.run(); + } else { + // Not enough budget left to run the LIFO task, push it to + // the back of the queue and return. + core.run_queue.push_back(task, self.worker.inject()); + return Ok(core); + } + } + }) } fn maintenance(&self, mut core: Box) -> Box { @@ -373,12 +405,16 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { if self.tick % GLOBAL_POLL_INTERVAL == 0 { - worker.inject().pop().or_else(|| self.run_queue.pop()) + worker.inject().pop().or_else(|| self.next_local_task()) } else { - self.run_queue.pop().or_else(|| worker.inject().pop()) + self.next_local_task().or_else(|| worker.inject().pop()) } } + fn next_local_task(&mut self) -> Option { + self.lifo_slot.take().or_else(|| self.run_queue.pop()) + } + fn steal_work(&mut self, worker: &Worker) -> Option { if !self.transition_to_searching(worker) { return None; @@ -444,9 +480,9 @@ impl Core { /// Returns `true` if the transition happened. fn transition_from_parked(&mut self, worker: &Worker) -> bool { - // If there is a non-stealable task, then we must unpark regardless of + // If a task is in the lifo slot, then we must unpark regardless of // being notified - if self.run_queue.has_unstealable() { + if self.lifo_slot.is_some() { worker.shared.idle.unpark_worker_by_id(worker.index); self.is_searching = true; return true; @@ -494,7 +530,7 @@ impl Core { } // Drain the queue - while let Some(_) = self.run_queue.pop() {} + while let Some(_) = self.next_local_task() {} } fn drain_pending_drop(&mut self, worker: &Worker) { @@ -639,7 +675,17 @@ impl Shared { core.run_queue.push_back(task, &self.inject); true } else { - core.run_queue.push(task, &self.inject) + // Push to the LIFO slot + let prev = core.lifo_slot.take(); + let ret = prev.is_some(); + + if let Some(prev) = prev { + core.run_queue.push_back(prev, &self.inject); + } + + core.lifo_slot = Some(task); + + ret }; // Only notify if not currently parked. If `park` is `None`, then the diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index a2d1ceb2490..fcb8c789237 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -398,7 +398,7 @@ impl LocalSet { // task initially. Because `LocalSet` itself is `!Send`, and // `spawn_local` spawns into the `LocalSet` on the current // thread, the invariant is maintained. - Some(task) => task.run(), + Some(task) => crate::coop::budget(|| task.run()), // We have fully drained the queue of notified tasks, so the // local future doesn't need to be notified again — it can wait // until something else wakes a task in the local set. diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 0355a6e7469..ae16721ee65 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -878,4 +878,58 @@ rt_test! { }).await; }); } + + // Tests that the "next task" scheduler optimization is not able to starve + // other tasks. + #[test] + fn ping_pong_saturation() { + use tokio::sync::mpsc; + + const NUM: usize = 100; + + let mut rt = rt(); + + rt.block_on(async { + let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); + + // Spawn a bunch of tasks that ping ping between each other to + // saturate the runtime. + for _ in 0..NUM { + let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); + let spawned_tx = spawned_tx.clone(); + + task::spawn(async move { + spawned_tx.send(()).unwrap(); + + tx1.send(()).unwrap(); + + loop { + rx2.recv().await.unwrap(); + tx1.send(()).unwrap(); + } + }); + + task::spawn(async move { + loop { + rx1.recv().await.unwrap(); + tx2.send(()).unwrap(); + } + }); + } + + for _ in 0..NUM { + spawned_rx.recv().await.unwrap(); + } + + // spawn another task and wait for it to complete + let handle = task::spawn(async { + for _ in 0..5 { + // Yielding forces it back into the local queue. + task::yield_now().await; + } + }); + handle.await.unwrap(); + }); + } }