Skip to content

Commit

Permalink
rt: cap fifo scheduler slot to avoid starvation (#2349)
Browse files Browse the repository at this point in the history
The work-stealing scheduler includes an optimization where each worker
includes a single slot to store the **last** scheduled task. Tasks in
scheduler's LIFO slot are executed next. This speeds up and reduces
latency with message passing patterns.

Previously, this optimization was susceptible to starving other tasks in
certain cases. If two tasks ping-ping between each other without ever
yielding, the worker would never execute other tasks.

An early PR (#2160) introduced a form of pre-emption. Each task is
allocated a per-poll operation budget. Tokio resources will return ready
until the budget is depleted, at which point, Tokio resources will
always return `Pending`.

This patch leverages the operation budget to limit the LIFO scheduler
optimization. When executing tasks from the LIFO slot, the budget is
**not** reset. Once the budget goes to zero, the task in the LIFO slot
is pushed to the back of the queue.
  • Loading branch information
carllerche authored Mar 28, 2020
1 parent 7b2438e commit caa7e18
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 45 deletions.
7 changes: 7 additions & 0 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ where
})
}

cfg_rt_threaded! {
#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
HITS.with(|hits| hits.get() > 0)
}
}

cfg_blocking_impl! {
/// Forcibly remove the budgeting constraints early.
pub(crate) fn stop() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
};

match next {
Some(task) => task.run(),
Some(task) => crate::coop::budget(|| task.run()),
None => {
// Park until the thread is signaled
scheduler.park.park().ok().expect("failed to park");
Expand Down
29 changes: 0 additions & 29 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
inner: Arc<Inner<T>>,

/// LIFO slot. Cannot be stolen.
next: Option<task::Notified<T>>,
}

/// Consumer handle. May be used from many threads.
Expand Down Expand Up @@ -96,7 +93,6 @@ pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {

let local = Local {
inner: inner.clone(),
next: None,
};

let remote = Steal(inner);
Expand All @@ -110,26 +106,6 @@ impl<T> Local<T> {
!self.inner.is_empty()
}

/// Returns true if the queue has an unstealable entry.
pub(super) fn has_unstealable(&self) -> bool {
self.next.is_some()
}

/// Push a task to the local queue. Returns `true` if a stealer should be
/// notified.
pub(super) fn push(&mut self, task: task::Notified<T>, inject: &Inject<T>) -> bool {
let prev = self.next.take();
let ret = prev.is_some();

if let Some(prev) = prev {
self.push_back(prev, inject);
}

self.next = Some(task);

ret
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
Expand Down Expand Up @@ -270,11 +246,6 @@ impl<T> Local<T> {

/// Pops a task from the local queue.
pub(super) fn pop(&mut self) -> Option<task::Notified<T>> {
// If a task is available in the FIFO slot, return that.
if let Some(task) = self.next.take() {
return Some(task);
}

let mut head = self.inner.head.load(Acquire);

let idx = loop {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
let waker_ref = waker_ref::<T, S>(header);
let mut cx = Context::from_waker(&*waker_ref);

crate::coop::budget(|| future.poll(&mut cx))
future.poll(&mut cx)
})
};

Expand Down
72 changes: 59 additions & 13 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u8,

/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
lifo_slot: Option<Notified>,

/// The worker-local run queue.
run_queue: queue::Local<Arc<Worker>>,

Expand Down Expand Up @@ -128,6 +135,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {

cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
Expand Down Expand Up @@ -296,13 +304,37 @@ impl Context {
*self.core.borrow_mut() = Some(core);

// Run the task
task.run();

// Try to take the core back
match self.core.borrow_mut().take() {
Some(core) => Ok(core),
None => Err(()),
}
crate::coop::budget(|| {
task.run();

// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
loop {
// Check if we still have the core. If not, the core was stolen
// by another worker.
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => return Err(()),
};

// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
};

if crate::coop::has_budget_remaining() {
// Run the LIFO task, then loop
*self.core.borrow_mut() = Some(core);
task.run();
} else {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue.push_back(task, self.worker.inject());
return Ok(core);
}
}
})
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
Expand Down Expand Up @@ -373,12 +405,16 @@ impl Core {
/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % GLOBAL_POLL_INTERVAL == 0 {
worker.inject().pop().or_else(|| self.run_queue.pop())
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.run_queue.pop().or_else(|| worker.inject().pop())
self.next_local_task().or_else(|| worker.inject().pop())
}
}

fn next_local_task(&mut self) -> Option<Notified> {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
}

fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
Expand Down Expand Up @@ -444,9 +480,9 @@ impl Core {

/// Returns `true` if the transition happened.
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
// If there is a non-stealable task, then we must unpark regardless of
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.run_queue.has_unstealable() {
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
return true;
Expand Down Expand Up @@ -494,7 +530,7 @@ impl Core {
}

// Drain the queue
while let Some(_) = self.run_queue.pop() {}
while let Some(_) = self.next_local_task() {}
}

fn drain_pending_drop(&mut self, worker: &Worker) {
Expand Down Expand Up @@ -639,7 +675,17 @@ impl Shared {
core.run_queue.push_back(task, &self.inject);
true
} else {
core.run_queue.push(task, &self.inject)
// Push to the LIFO slot
let prev = core.lifo_slot.take();
let ret = prev.is_some();

if let Some(prev) = prev {
core.run_queue.push_back(prev, &self.inject);
}

core.lifo_slot = Some(task);

ret
};

// Only notify if not currently parked. If `park` is `None`, then the
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl LocalSet {
// task initially. Because `LocalSet` itself is `!Send`, and
// `spawn_local` spawns into the `LocalSet` on the current
// thread, the invariant is maintained.
Some(task) => task.run(),
Some(task) => crate::coop::budget(|| task.run()),
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.
Expand Down
54 changes: 54 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,4 +878,58 @@ rt_test! {
}).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
fn ping_pong_saturation() {
use tokio::sync::mpsc;

const NUM: usize = 100;

let mut rt = rt();

rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();

// Spawn a bunch of tasks that ping ping between each other to
// saturate the runtime.
for _ in 0..NUM {
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx2, mut rx2) = mpsc::unbounded_channel();
let spawned_tx = spawned_tx.clone();

task::spawn(async move {
spawned_tx.send(()).unwrap();

tx1.send(()).unwrap();

loop {
rx2.recv().await.unwrap();
tx1.send(()).unwrap();
}
});

task::spawn(async move {
loop {
rx1.recv().await.unwrap();
tx2.send(()).unwrap();
}
});
}

for _ in 0..NUM {
spawned_rx.recv().await.unwrap();
}

// spawn another task and wait for it to complete
let handle = task::spawn(async {
for _ in 0..5 {
// Yielding forces it back into the local queue.
task::yield_now().await;
}
});
handle.await.unwrap();
});
}
}

0 comments on commit caa7e18

Please sign in to comment.