Skip to content

Commit

Permalink
Add support for leaving stdin open on attach
Browse files Browse the repository at this point in the history
Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Sep 23, 2022
1 parent 30d1469 commit 52a2148
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 167 deletions.
32 changes: 16 additions & 16 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,19 @@ jobs:
sudo chown -R $(id -u):$(id -g) ~/go/pkg/mod
sudo chown -R $(id -u):$(id -g) ~/.cache/go-build
test-critest:
needs: release-static
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: actions/download-artifact@v3
with:
name: conmonrs
path: target/x86_64-unknown-linux-musl/release
- run: sudo cp target/x86_64-unknown-linux-musl/release/conmonrs /usr/local/bin
- run: sudo chmod +x /usr/local/bin/conmonrs
- run: .github/setup
- name: Run critest
run: sudo critest
# test-critest:
# needs: release-static
# runs-on: ubuntu-latest
# timeout-minutes: 10
# steps:
# - name: Checkout
# uses: actions/checkout@v3
# - uses: actions/download-artifact@v3
# with:
# name: conmonrs
# path: target/x86_64-unknown-linux-musl/release
# - run: sudo cp target/x86_64-unknown-linux-musl/release/conmonrs /usr/local/bin
# - run: sudo chmod +x /usr/local/bin/conmonrs
# - run: .github/setup
# - name: Run critest
# run: sudo critest
1 change: 1 addition & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ interface Conmon {
id @0 :Text;
socketPath @1 :Text;
execSessionId @2 :Text;
stopAfterStdinEof @3 :Bool;
}

struct AttachResponse {
Expand Down
31 changes: 25 additions & 6 deletions conmon-rs/server/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::{
select,
sync::broadcast::{self, Receiver, Sender},
task,
time::{self, Duration},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, Instrument};
Expand Down Expand Up @@ -61,7 +62,12 @@ impl Clone for SharedContainerAttach {

impl SharedContainerAttach {
/// Add a new attach endpoint to this shared container attach instance.
pub async fn add<T>(&mut self, socket_path: T, token: CancellationToken) -> Result<()>
pub async fn add<T>(
&mut self,
socket_path: T,
token: CancellationToken,
stop_after_stdin_eof: bool,
) -> Result<()>
where
T: AsRef<Path>,
PathBuf: From<T>,
Expand All @@ -71,6 +77,7 @@ impl SharedContainerAttach {
self.read_half_tx.clone(),
self.write_half_tx.clone(),
token,
stop_after_stdin_eof,
)
.context("create attach endpoint")
}
Expand Down Expand Up @@ -118,6 +125,7 @@ impl Attach {
read_half_tx: Sender<Vec<u8>>,
write_half_tx: Sender<Message>,
token: CancellationToken,
stop_after_stdin_eof: bool,
) -> Result<()>
where
T: AsRef<Path>,
Expand Down Expand Up @@ -156,7 +164,9 @@ impl Attach {

task::spawn(
async move {
if let Err(e) = Self::start(fd, read_half_tx, write_half_tx, token).await {
if let Err(e) =
Self::start(fd, read_half_tx, write_half_tx, token, stop_after_stdin_eof).await
{
error!("Attach failure: {:#}", e);
}
}
Expand All @@ -171,6 +181,7 @@ impl Attach {
read_half_tx: Sender<Vec<u8>>,
write_half_tx: Sender<Message>,
token: CancellationToken,
stop_after_stdin_eof: bool,
) -> Result<()> {
debug!("Start listening on attach socket");
let listener = UnixListener::from_std(unsafe { net::UnixListener::from_raw_fd(fd) })
Expand All @@ -185,8 +196,13 @@ impl Attach {
let token_clone = token.clone();
task::spawn(
async move {
if let Err(e) =
Self::read_loop(read, read_half_tx_clone, token_clone).await
if let Err(e) = Self::read_loop(
read,
read_half_tx_clone,
token_clone,
stop_after_stdin_eof,
)
.await
{
error!("Attach read loop failure: {:#}", e);
}
Expand All @@ -213,9 +229,11 @@ impl Attach {
mut read_half: OwnedReadHalf,
tx: Sender<Vec<u8>>,
token: CancellationToken,
stop_after_stdin_eof: bool,
) -> Result<()> {
loop {
let mut buf = vec![0; Self::PACKET_BUF_SIZE];

// In situations we're processing output directly from the I/O streams
// we need a mechanism to figure out when to stop that doesn't involve reading the
// number of bytes read.
Expand Down Expand Up @@ -249,15 +267,16 @@ impl Attach {
e.raw_os_error().context("get OS error")?
),
},
_ => {
_ if stop_after_stdin_eof => {
debug!("Stopping read loop because there is nothing more to read");
token.cancel();
return Ok(());
}
_ => time::sleep(Duration::from_millis(500)).await, // avoid busy looping
}
}
_ = token.cancelled() => {
debug!("Exiting because token cancelled");
error!("Exiting because token cancelled");
return Ok(());
}
}
Expand Down
3 changes: 2 additions & 1 deletion conmon-rs/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl conmon::Server for Server {

let socket_path = pry!(req.get_socket_path()).to_string();
let child = pry_err!(self.reaper().get(container_id));
let stop_after_stdin_eof = req.get_stop_after_stdin_eof();

Promise::from_future(
async move {
Expand All @@ -284,7 +285,7 @@ impl conmon::Server for Server {
.io()
.attach()
.await
.add(&socket_path, child.token().clone())
.add(&socket_path, child.token().clone(), stop_after_stdin_eof)
.await
)
}
Expand Down
Loading

0 comments on commit 52a2148

Please sign in to comment.