Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add waker to StreamResource #4293

Merged
merged 13 commits into from
Mar 11, 2020
6 changes: 4 additions & 2 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::http_util::{create_http_client, HttpBody};
use crate::op_error::OpError;
use crate::state::State;
Expand Down Expand Up @@ -80,7 +80,9 @@ pub fn op_fetch(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),
))),
);

let json_res = json!({
Expand Down
13 changes: 8 additions & 5 deletions cli/ops/fs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use super::io::{FileMetadata, StreamResource};
use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs as deno_fs;
use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult;
Expand Down Expand Up @@ -152,7 +152,10 @@ fn op_open(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
Box::new(StreamResource::FsFile(fs_file, FileMetadata::default())),
Box::new(StreamResourceHolder::new(StreamResource::FsFile(
fs_file,
FileMetadata::default(),
))),
);
Ok(json!(rid))
};
Expand Down Expand Up @@ -197,12 +200,12 @@ fn op_seek(
};

let state = state.borrow();
let resource = state
let resource_holder = state
.resource_table
.get::<StreamResource>(rid)
.get::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;

let tokio_file = match resource {
let tokio_file = match resource_holder.resource {
StreamResource::FsFile(ref file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down
103 changes: 89 additions & 14 deletions cli/ops/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,23 @@ pub fn init(i: &mut Isolate, s: &State) {
);
}

pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default());
let stdout = StreamResource::Stdout({
pub fn get_stdio() -> (
StreamResourceHolder,
StreamResourceHolder,
StreamResourceHolder,
) {
let stdin = StreamResourceHolder::new(StreamResource::Stdin(
tokio::io::stdin(),
TTYMetadata::default(),
));
let stdout = StreamResourceHolder::new(StreamResource::Stdout({
let stdout = STDOUT_HANDLE
.try_clone()
.expect("Unable to clone stdout handle");
tokio::fs::File::from_std(stdout)
});
let stderr = StreamResource::Stderr(tokio::io::stderr());
}));
let stderr =
StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));

(stdin, stdout, stderr)
}
Expand All @@ -87,6 +95,61 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}

pub struct StreamResourceHolder {
pub resource: StreamResource,
waker: Option<futures::task::AtomicWaker>,
}

impl StreamResourceHolder {
pub fn new(resource: StreamResource) -> StreamResourceHolder {
StreamResourceHolder {
resource,
waker: None,
}
}
}

impl Drop for StreamResourceHolder {
fn drop(&mut self) {
self.wake_task();
}
}

impl StreamResourceHolder {
/// Track the current task so future awaiting for connection
/// can be notified when listener is closed.
///
/// Throws an error if another task is already tracked.
pub fn track_task(&mut self, cx: &Context) -> Result<(), OpError> {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
if self.waker.is_some() {
return Err(OpError::other("Another accept task is ongoing".to_string()));
}
let waker = futures::task::AtomicWaker::new();
waker.register(cx.waker());
self.waker.replace(waker);
Ok(())
}

/// Notifies a task when listener is closed so accept future can resolve.
pub fn wake_task(&mut self) {
if let Some(waker) = self.waker.as_ref() {
waker.wake();
}
}

/// Stop tracking a task.
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
if self.waker.is_some() {
self.waker.take();
}
}
}

pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File),
Expand Down Expand Up @@ -150,10 +213,22 @@ pub fn op_read(

poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;
let nread = match resource_holder
.resource
.poll_read(cx, &mut buf.as_mut()[..])
{
Poll::Ready(t) => {
resource_holder.untrack_task();
t
}
Poll::Pending => {
resource_holder.track_task(cx)?;
return Poll::Pending;
}
}?;
Copy link
Member

@bartlomieju bartlomieju Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ry, @piscisaureus and I had a discussion about this solution; it looks good on the surface; but it effectively prevents having multiple ops reading from single resource. Although in most cases we only want to have a single op, there are some cases where having multiple ops reading from single resource is desirable.

Ideally we would track multiple tasks for single resource, but Waker is very simple struct that has no PartialEq or any other trait implementation that would allow to discriminate wakers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll discuss it more tonight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartlomieju I pushed some small changes to the StreamResourceWrapper to make it capable of waking multiple tasks. It just a suggestion to help the discussion.

Poll::Ready(Ok(nread as i32))
})
.boxed_local()
Expand Down Expand Up @@ -233,10 +308,10 @@ pub fn op_write(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_write(cx, &buf.as_ref()[..])
resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;

Expand All @@ -246,10 +321,10 @@ pub fn op_write(
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_flush(cx)
resource_holder.resource.poll_flush(cx)
})
.await?;

Expand Down
26 changes: 16 additions & 10 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -78,9 +78,12 @@ fn op_accept(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -203,9 +206,12 @@ fn op_connect(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -247,11 +253,11 @@ fn op_shutdown(
};

let mut state = state.borrow_mut();
let resource = state
let resource_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
match resource {
match resource_holder.resource {
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
Expand Down
20 changes: 13 additions & 7 deletions cli/ops/process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::signal::kill;
use crate::state::State;
Expand All @@ -24,11 +24,11 @@ pub fn init(i: &mut Isolate, s: &State) {

fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
let mut state = state.borrow_mut();
let repr = state
let repr_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
let file = match repr {
let file = match repr_holder.resource {
StreamResource::FsFile(ref mut file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down Expand Up @@ -127,7 +127,9 @@ fn op_run(
Some(child_stdin) => {
let rid = table.add(
"childStdin",
Box::new(StreamResource::ChildStdin(child_stdin)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
child_stdin,
))),
);
Some(rid)
}
Expand All @@ -138,7 +140,9 @@ fn op_run(
Some(child_stdout) => {
let rid = table.add(
"childStdout",
Box::new(StreamResource::ChildStdout(child_stdout)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
child_stdout,
))),
);
Some(rid)
}
Expand All @@ -149,7 +153,9 @@ fn op_run(
Some(child_stderr) => {
let rid = table.add(
"childStderr",
Box::new(StreamResource::ChildStderr(child_stderr)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
child_stderr,
))),
);
Some(rid)
}
Expand Down
10 changes: 7 additions & 3 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -85,7 +85,9 @@ pub fn op_connect_tls(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
))),
);
Ok(json!({
"rid": rid,
Expand Down Expand Up @@ -318,7 +320,9 @@ fn op_accept_tls(
let mut state = state.borrow_mut();
state.resource_table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),
))),
)
};
Ok(json!({
Expand Down
Loading