Skip to content

Commit

Permalink
Remove token from ContainerIO read_loop
Browse files Browse the repository at this point in the history
This allows the read loop to finish one there is nothing more to read.

Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Sep 21, 2022
1 parent 8e4cec9 commit 0107f94
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 88 deletions.
122 changes: 59 additions & 63 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,84 +235,80 @@ impl ContainerIO {
logger: SharedContainerLog,
message_tx: UnboundedSender<Message>,
mut attach: SharedContainerAttach,
token: CancellationToken,
) -> Result<()>
where
T: AsyncRead + Unpin,
{
let mut buf = vec![0; 1024];
let mut buf = vec![];

loop {
// In situations we're processing output directly from the I/O streams
// we need a mechanism to figure out when to stop that doesn't involve reading the
// number of bytes read.
// Thus, we need to select on the cancellation token saved in the child.
// While this could result in a data race, as select statements are racy,
// we won't interleve these two futures, as one ends execution.
select! {
n = reader.read(&mut buf) => {
match n {
Ok(n) if n > 0 => {
debug!("Read {} bytes", n);
let data = &buf[..n];

let mut locked_logger = logger.write().await;
locked_logger
.write(pipe, data)
.await
.context("write to log file")?;
match reader.read(&mut buf).await {
Ok(n) if n == 0 => {
debug!("Nothing more to read");

attach
.write(Message::Data(data.into(), pipe))
.await
.context("write to attach endpoints")?;
attach
.write(Message::Done)
.await
.context("write to attach endpoints")?;

if !message_tx.is_closed() {
message_tx
.send(Message::Data(data.into(), pipe))
.context("send data message")?;
}
}
Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) {
Errno::EIO => {
debug!("Stopping read loop");
attach
.write(Message::Done)
.await
.context("write to attach endpoints")?;

message_tx
.send(Message::Done)
.context("send done message")?;
return Ok(());
}
Errno::EBADF => {
return Err(Errno::EBADFD.into());
}
Errno::EAGAIN => {
continue;
}
_ => error!(
"Unable to read from file descriptor: {} {}",
e,
e.raw_os_error().context("get OS error")?
),
},
_ => {}
if !message_tx.is_closed() {
message_tx
.send(Message::Done)
.context("send done message")?;
}

return Ok(());
}
_ = token.cancelled() => {
debug!("Sending done because token cancelled");

Ok(n) => {
debug!("Read {} bytes", n);
let data = &buf[..n];

let mut locked_logger = logger.write().await;
locked_logger
.write(pipe, data)
.await
.context("write to log file")?;

attach
.write(Message::Done)
.write(Message::Data(data.into(), pipe))
.await
.context("write to attach endpoints")?;

message_tx
.send(Message::Done)
.context("send done message")?;
return Ok(());
if !message_tx.is_closed() {
message_tx
.send(Message::Data(data.into(), pipe))
.context("send data message")?;
}
}

Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) {
Errno::EIO => {
debug!("Stopping read loop");
attach
.write(Message::Done)
.await
.context("write to attach endpoints")?;

if !message_tx.is_closed() {
message_tx
.send(Message::Done)
.context("send done message")?;
}
return Ok(());
}
Errno::EBADF => {
return Err(Errno::EBADFD.into());
}
Errno::EAGAIN => {
continue;
}
_ => error!(
"Unable to read from file descriptor: {} {}",
e,
e.raw_os_error().context("get OS error")?
),
},
}
}
}
Expand Down
28 changes: 7 additions & 21 deletions conmon-rs/server/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ impl Streams {
let attach = self.attach().clone();
let message_tx = self.message_tx_stdout().clone();

let token_clone = token.clone();
if let Some(stdin) = stdin {
task::spawn(
async move {
if let Err(e) =
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token_clone).await
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token).await
{
error!("Stdin read loop failure: {:#}", e);
}
Expand All @@ -80,19 +79,12 @@ impl Streams {
}

let attach = self.attach().clone();
let token_clone = token.clone();
if let Some(stdout) = stdout {
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop(
stdout,
Pipe::StdOut,
logger,
message_tx,
attach,
token_clone,
)
.await
if let Err(e) =
ContainerIO::read_loop(stdout, Pipe::StdOut, logger, message_tx, attach)
.await
{
error!("Stdout read loop failure: {:#}", e);
}
Expand All @@ -107,15 +99,9 @@ impl Streams {
if let Some(stderr) = stderr {
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop(
stderr,
Pipe::StdErr,
logger,
message_tx,
attach,
token,
)
.await
if let Err(e) =
ContainerIO::read_loop(stderr, Pipe::StdErr, logger, message_tx, attach)
.await
{
error!("Stderr read loop failure: {:#}", e);
}
Expand Down
2 changes: 0 additions & 2 deletions conmon-rs/server/src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ impl Terminal {
let logger_clone = self.logger.clone();
let (message_tx, message_rx) = mpsc::unbounded_channel();
self.message_rx = Some(message_rx);
let token_clone = token.clone();

task::spawn(
async move {
Expand All @@ -125,7 +124,6 @@ impl Terminal {
logger_clone,
message_tx,
attach_clone,
token_clone,
)
.await
{
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ var _ = Describe("ConmonClient", func() {
})

It(testName("should execute cleanup command when container exits", terminal), func() {
filepath := fmt.Sprintf("%s/conmon-client-test%s", os.TempDir(), tr.ctrID)
tr = newTestRunner()
filepath := fmt.Sprintf("%s/conmon-client-test%s", os.TempDir(), tr.ctrID)
tr.createRuntimeConfig(terminal)
sut = tr.configGivenEnv()
tr.createContainerWithConfig(sut, &client.CreateContainerConfig{
Expand Down Expand Up @@ -312,7 +312,7 @@ var _ = Describe("ConmonClient", func() {
rssAfter := vmRSSGivenPID(pid)
GinkgoWriter.Printf("VmRSS after: %d\n", rssAfter)
GinkgoWriter.Printf("VmRSS diff: %d\n", rssAfter-rssBefore)
Expect(rssAfter - rssBefore).To(BeNumerically("<", 1000))
Expect(rssAfter - rssBefore).To(BeNumerically("<", 1200))
})
}
})
Expand Down

0 comments on commit 0107f94

Please sign in to comment.