Skip to content

Commit

Permalink
Remove token from ContainerIO read_loop
Browse files Browse the repository at this point in the history
This allows the read loop to finish one there is nothing more to read.

Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Sep 21, 2022
1 parent 8e4cec9 commit 814c50a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 83 deletions.
112 changes: 52 additions & 60 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,74 +235,17 @@ impl ContainerIO {
logger: SharedContainerLog,
message_tx: UnboundedSender<Message>,
mut attach: SharedContainerAttach,
token: CancellationToken,
) -> Result<()>
where
T: AsyncRead + Unpin,
{
let mut buf = vec![0; 1024];

loop {
// In situations we're processing output directly from the I/O streams
// we need a mechanism to figure out when to stop that doesn't involve reading the
// number of bytes read.
// Thus, we need to select on the cancellation token saved in the child.
// While this could result in a data race, as select statements are racy,
// we won't interleve these two futures, as one ends execution.
select! {
n = reader.read(&mut buf) => {
match n {
Ok(n) if n > 0 => {
debug!("Read {} bytes", n);
let data = &buf[..n];

let mut locked_logger = logger.write().await;
locked_logger
.write(pipe, data)
.await
.context("write to log file")?;
match reader.read(&mut buf).await {
Ok(n) if n == 0 => {
debug!("Nothing more to read");

attach
.write(Message::Data(data.into(), pipe))
.await
.context("write to attach endpoints")?;

if !message_tx.is_closed() {
message_tx
.send(Message::Data(data.into(), pipe))
.context("send data message")?;
}
}
Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) {
Errno::EIO => {
debug!("Stopping read loop");
attach
.write(Message::Done)
.await
.context("write to attach endpoints")?;

message_tx
.send(Message::Done)
.context("send done message")?;
return Ok(());
}
Errno::EBADF => {
return Err(Errno::EBADFD.into());
}
Errno::EAGAIN => {
continue;
}
_ => error!(
"Unable to read from file descriptor: {} {}",
e,
e.raw_os_error().context("get OS error")?
),
},
_ => {}
}
}
_ = token.cancelled() => {
debug!("Sending done because token cancelled");
attach
.write(Message::Done)
.await
Expand All @@ -311,8 +254,57 @@ impl ContainerIO {
message_tx
.send(Message::Done)
.context("send done message")?;

return Ok(());
}

Ok(n) => {
debug!("Read {} bytes", n);
let data = &buf[..n];

let mut locked_logger = logger.write().await;
locked_logger
.write(pipe, data)
.await
.context("write to log file")?;

attach
.write(Message::Data(data.into(), pipe))
.await
.context("write to attach endpoints")?;

if !message_tx.is_closed() {
message_tx
.send(Message::Data(data.into(), pipe))
.context("send data message")?;
}
}

Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) {
Errno::EIO => {
debug!("Stopping read loop");
attach
.write(Message::Done)
.await
.context("write to attach endpoints")?;

message_tx
.send(Message::Done)
.context("send done message")?;
return Ok(());
}
Errno::EBADF => {
return Err(Errno::EBADFD.into());
}
Errno::EAGAIN => {
continue;
}
_ => error!(
"Unable to read from file descriptor: {} {}",
e,
e.raw_os_error().context("get OS error")?
),
},
}
}
}
Expand Down
28 changes: 7 additions & 21 deletions conmon-rs/server/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ impl Streams {
let attach = self.attach().clone();
let message_tx = self.message_tx_stdout().clone();

let token_clone = token.clone();
if let Some(stdin) = stdin {
task::spawn(
async move {
if let Err(e) =
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token_clone).await
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token).await
{
error!("Stdin read loop failure: {:#}", e);
}
Expand All @@ -80,19 +79,12 @@ impl Streams {
}

let attach = self.attach().clone();
let token_clone = token.clone();
if let Some(stdout) = stdout {
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop(
stdout,
Pipe::StdOut,
logger,
message_tx,
attach,
token_clone,
)
.await
if let Err(e) =
ContainerIO::read_loop(stdout, Pipe::StdOut, logger, message_tx, attach)
.await
{
error!("Stdout read loop failure: {:#}", e);
}
Expand All @@ -107,15 +99,9 @@ impl Streams {
if let Some(stderr) = stderr {
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop(
stderr,
Pipe::StdErr,
logger,
message_tx,
attach,
token,
)
.await
if let Err(e) =
ContainerIO::read_loop(stderr, Pipe::StdErr, logger, message_tx, attach)
.await
{
error!("Stderr read loop failure: {:#}", e);
}
Expand Down
2 changes: 0 additions & 2 deletions conmon-rs/server/src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ impl Terminal {
let logger_clone = self.logger.clone();
let (message_tx, message_rx) = mpsc::unbounded_channel();
self.message_rx = Some(message_rx);
let token_clone = token.clone();

task::spawn(
async move {
Expand All @@ -125,7 +124,6 @@ impl Terminal {
logger_clone,
message_tx,
attach_clone,
token_clone,
)
.await
{
Expand Down

0 comments on commit 814c50a

Please sign in to comment.