Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add waker to StreamResource #4293

Merged
merged 13 commits into from
Mar 11, 2020
43 changes: 43 additions & 0 deletions cli/js/tests/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,46 @@ unitTest(
conn.close();
}
);

unitTest(
{
perms: { net: true }
},
async function netHangsOnClose() {
let acceptedConn: Deno.Conn;
const resolvable = createResolvable();

async function iteratorReq(listener: Deno.Listener): Promise<void> {
const p = new Uint8Array(10);
const conn = await listener.accept();
acceptedConn = conn;

try {
while (true) {
const nread = await conn.read(p);
if (nread === Deno.EOF) {
break;
}
await conn.write(new Uint8Array([1, 2, 3]));
}
} catch (err) {
assert(!!err);
assert(err instanceof Deno.errors.BadResource);
}

resolvable.resolve();
}

const addr = { hostname: "127.0.0.1", port: 4500 };
const listener = Deno.listen(addr);
iteratorReq(listener);
const conn = await Deno.connect(addr);
await conn.write(new Uint8Array([1, 2, 3, 4]));
const buf = new Uint8Array(10);
await conn.read(buf);
conn!.close();
acceptedConn!.close();
listener.close();
await resolvable;
}
);
6 changes: 4 additions & 2 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::http_util::{create_http_client, HttpBody};
use crate::op_error::OpError;
use crate::state::State;
Expand Down Expand Up @@ -80,7 +80,9 @@ pub fn op_fetch(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),
))),
);

let json_res = json!({
Expand Down
13 changes: 8 additions & 5 deletions cli/ops/fs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use super::io::{FileMetadata, StreamResource};
use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs as deno_fs;
use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult;
Expand Down Expand Up @@ -153,7 +153,10 @@ fn op_open(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
Box::new(StreamResource::FsFile(fs_file, FileMetadata::default())),
Box::new(StreamResourceHolder::new(StreamResource::FsFile(
fs_file,
FileMetadata::default(),
))),
);
Ok(json!(rid))
};
Expand Down Expand Up @@ -198,12 +201,12 @@ fn op_seek(
};

let state = state.borrow();
let resource = state
let resource_holder = state
.resource_table
.get::<StreamResource>(rid)
.get::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;

let tokio_file = match resource {
let tokio_file = match resource_holder.resource {
StreamResource::FsFile(ref file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down
100 changes: 86 additions & 14 deletions cli/ops/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::ready;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) {
);
}

pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default());
let stdout = StreamResource::Stdout({
pub fn get_stdio() -> (
StreamResourceHolder,
StreamResourceHolder,
StreamResourceHolder,
) {
let stdin = StreamResourceHolder::new(StreamResource::Stdin(
tokio::io::stdin(),
TTYMetadata::default(),
));
let stdout = StreamResourceHolder::new(StreamResource::Stdout({
let stdout = STDOUT_HANDLE
.try_clone()
.expect("Unable to clone stdout handle");
tokio::fs::File::from_std(stdout)
});
let stderr = StreamResource::Stderr(tokio::io::stderr());
}));
let stderr =
StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));

(stdin, stdout, stderr)
}
Expand All @@ -87,6 +97,51 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}

pub struct StreamResourceHolder {
pub resource: StreamResource,
waker: HashMap<usize, futures::task::AtomicWaker>,
waker_counter: AtomicUsize,
}

impl StreamResourceHolder {
pub fn new(resource: StreamResource) -> StreamResourceHolder {
StreamResourceHolder {
resource,
// Atleast one task is expecter for the resource
waker: HashMap::with_capacity(1),
// Tracks wakers Ids
waker_counter: AtomicUsize::new(0),
}
}
}

impl Drop for StreamResourceHolder {
fn drop(&mut self) {
self.wake_tasks();
}
}

impl StreamResourceHolder {
pub fn track_task(&mut self, cx: &Context) -> Result<usize, OpError> {
let waker = futures::task::AtomicWaker::new();
waker.register(cx.waker());
// Its OK if it overflows
let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
self.waker.insert(task_waker_id, waker);
Ok(task_waker_id)
}

pub fn wake_tasks(&mut self) {
for waker in self.waker.values() {
waker.wake();
}
}

pub fn untrack_task(&mut self, task_waker_id: usize) {
self.waker.remove(&task_waker_id);
}
}

pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File),
Expand Down Expand Up @@ -150,10 +205,27 @@ pub fn op_read(

poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;

let mut task_tracker_id: Option<usize> = None;
let nread = match resource_holder
.resource
.poll_read(cx, &mut buf.as_mut()[..])
.map_err(OpError::from)
{
Poll::Ready(t) => {
if let Some(id) = task_tracker_id {
resource_holder.untrack_task(id);
}
t
}
Poll::Pending => {
task_tracker_id.replace(resource_holder.track_task(cx)?);
return Poll::Pending;
}
}?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local()
Expand Down Expand Up @@ -233,10 +305,10 @@ pub fn op_write(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_write(cx, &buf.as_ref()[..])
resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;

Expand All @@ -246,10 +318,10 @@ pub fn op_write(
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_flush(cx)
resource_holder.resource.poll_flush(cx)
})
.await?;

Expand Down
26 changes: 16 additions & 10 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -78,9 +78,12 @@ fn op_accept(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -207,9 +210,12 @@ fn op_connect(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -251,11 +257,11 @@ fn op_shutdown(
};

let mut state = state.borrow_mut();
let resource = state
let resource_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
match resource {
match resource_holder.resource {
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
Expand Down
20 changes: 13 additions & 7 deletions cli/ops/process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::signal::kill;
use crate::state::State;
Expand All @@ -24,11 +24,11 @@ pub fn init(i: &mut Isolate, s: &State) {

fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
let mut state = state.borrow_mut();
let repr = state
let repr_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
let file = match repr {
let file = match repr_holder.resource {
StreamResource::FsFile(ref mut file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down Expand Up @@ -127,7 +127,9 @@ fn op_run(
Some(child_stdin) => {
let rid = table.add(
"childStdin",
Box::new(StreamResource::ChildStdin(child_stdin)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
child_stdin,
))),
);
Some(rid)
}
Expand All @@ -138,7 +140,9 @@ fn op_run(
Some(child_stdout) => {
let rid = table.add(
"childStdout",
Box::new(StreamResource::ChildStdout(child_stdout)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
child_stdout,
))),
);
Some(rid)
}
Expand All @@ -149,7 +153,9 @@ fn op_run(
Some(child_stderr) => {
let rid = table.add(
"childStderr",
Box::new(StreamResource::ChildStderr(child_stderr)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
child_stderr,
))),
);
Some(rid)
}
Expand Down
10 changes: 7 additions & 3 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -85,7 +85,9 @@ pub fn op_connect_tls(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
))),
);
Ok(json!({
"rid": rid,
Expand Down Expand Up @@ -318,7 +320,9 @@ fn op_accept_tls(
let mut state = state.borrow_mut();
state.resource_table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),
))),
)
};
Ok(json!({
Expand Down
Loading