Skip to content

Commit

Permalink
feat: introduce KillSignal for sending signals to tasks (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsherret authored Nov 28, 2024
1 parent c6ec537 commit af5d82e
Show file tree
Hide file tree
Showing 15 changed files with 575 additions and 123 deletions.
41 changes: 25 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@ description = "Cross platform scripting for deno task"

[features]
default = ["shell"]
shell = ["futures", "glob", "os_pipe", "path-dedot", "tokio", "tokio-util"]
shell = ["futures", "glob", "nix", "os_pipe", "path-dedot", "tokio"]
serialization = ["serde"]

[dependencies]
anyhow = "1.0.75"
futures = { version = "0.3.29", optional = true }
glob = { version = "0.3.1", optional = true }
path-dedot = { version = "3.1.1", optional = true }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "process", "rt-multi-thread", "sync", "time"], optional = true }
tokio-util = { version = "0.7.10", optional = true }
os_pipe = { version = "1.1.4", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
monch = "0.5.0"
thiserror = "1.0.58"
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "process", "rt-multi-thread", "sync", "time"], optional = true }

[target.'cfg(unix)'.dependencies]
nix = { version = "0.27.1", features = ["signal"], optional = true }

[dev-dependencies]
deno_unsync = "0.4.1"
parking_lot = "0.12.1"
pretty_assertions = "1"
serde_json = "1.0.111"
Expand Down
4 changes: 2 additions & 2 deletions src/shell/commands/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ fn execute_cat(mut context: ShellCommandContext) -> Result<ExecuteResult> {
// in memory
match File::open(context.state.cwd().join(&path)) {
Ok(mut file) => loop {
if context.state.token().is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = context.state.kill_signal().aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}

let size = file.read(&mut buf)?;
Expand Down
8 changes: 4 additions & 4 deletions src/shell/commands/cp_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ShellCommand for CpCommand {
async move {
execute_with_cancellation!(
cp_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ShellCommand for MvCommand {
async move {
execute_with_cancellation!(
mv_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand All @@ -184,10 +184,10 @@ async fn mv_command(
mut stderr: ShellPipeWriter,
) -> ExecuteResult {
match execute_mv(cwd, args).await {
Ok(()) => ExecuteResult::Continue(0, Vec::new(), Vec::new()),
Ok(()) => ExecuteResult::from_exit_code(0),
Err(err) => {
let _ = stderr.write_line(&format!("mv: {err}"));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
ExecuteResult::from_exit_code(1)
}
}
}
Expand Down
58 changes: 43 additions & 15 deletions src/shell/commands/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ impl ShellCommand for ExecutableCommand {
let display_name = self.display_name.clone();
let command_name = self.command_path.clone();
async move {
// don't spawn if already aborted
if let Some(exit_code) = context.state.kill_signal().aborted_code() {
return ExecuteResult::from_exit_code(exit_code);
}

let mut stderr = context.stderr;
let mut sub_command = tokio::process::Command::new(&command_name);
let child = sub_command
Expand All @@ -48,31 +53,54 @@ impl ShellCommand for ExecutableCommand {
"Error launching '{}': {}",
display_name, err
));
return ExecuteResult::Continue(1, Vec::new(), Vec::new());
return ExecuteResult::from_exit_code(1);
}
};

// avoid deadlock since this is holding onto the pipes
drop(sub_command);

tokio::select! {
result = child.wait() => match result {
Ok(status) => ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
),
Err(err) => {
let _ = stderr.write_line(&format!("{}", err));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
loop {
tokio::select! {
result = child.wait() => match result {
Ok(status) => return ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
),
Err(err) => {
let _ = stderr.write_line(&format!("{}", err));
return ExecuteResult::from_exit_code(1);
}
},
signal = context.state.kill_signal().wait_any() => {
if let Some(_id) = child.id() {
#[cfg(unix)]
kill(_id as i32, signal);

if cfg!(not(unix)) && signal.causes_abort() {
let _ = child.start_kill();
let status = child.wait().await.ok();
return ExecuteResult::from_exit_code(
status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()),
);
}
}
}
},
_ = context.state.token().cancelled() => {
let _ = child.kill().await;
ExecuteResult::for_cancellation()
}
}
}
.boxed_local()
}
}

#[cfg(unix)]
pub fn kill(pid: i32, signal: crate::SignalKind) -> Option<()> {
use nix::sys::signal::kill as unix_kill;
use nix::sys::signal::Signal;
use nix::unistd::Pid;
let signo: i32 = signal.into();
let sig = Signal::try_from(signo).ok()?;
unix_kill(Pid::from_raw(pid), Some(sig)).ok()?;
Some(())
}
18 changes: 9 additions & 9 deletions src/shell/commands/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::bail;
use anyhow::Result;
use futures::future::LocalBoxFuture;

use crate::shell::CancellationToken;
use crate::shell::KillSignal;
use crate::ExecuteResult;
use crate::ShellCommand;
use crate::ShellCommandContext;
Expand Down Expand Up @@ -38,23 +38,23 @@ impl ShellCommand for HeadCommand {
fn copy_lines<F: FnMut(&mut [u8]) -> Result<usize>>(
writer: &mut ShellPipeWriter,
max_lines: u64,
cancellation_token: &CancellationToken,
kill_signal: &KillSignal,
mut read: F,
buffer_size: usize,
) -> Result<ExecuteResult> {
let mut written_lines = 0;
let mut buffer = vec![0; buffer_size];
while written_lines < max_lines {
if cancellation_token.is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = kill_signal.aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}
let read_bytes = read(&mut buffer)?;
if read_bytes == 0 {
break;
}

if cancellation_token.is_cancelled() {
return Ok(ExecuteResult::for_cancellation());
if let Some(exit_code) = kill_signal.aborted_code() {
return Ok(ExecuteResult::from_exit_code(exit_code));
}

let mut written_bytes: usize = 0;
Expand Down Expand Up @@ -85,7 +85,7 @@ fn execute_head(mut context: ShellCommandContext) -> Result<ExecuteResult> {
copy_lines(
&mut context.stdout,
flags.lines,
context.state.token(),
context.state.kill_signal(),
|buf| context.stdin.read(buf),
512,
)
Expand All @@ -95,7 +95,7 @@ fn execute_head(mut context: ShellCommandContext) -> Result<ExecuteResult> {
Ok(mut file) => copy_lines(
&mut context.stdout,
flags.lines,
context.state.token(),
context.state.kill_signal(),
|buf| file.read(buf).map_err(Into::into),
512,
),
Expand Down Expand Up @@ -174,7 +174,7 @@ mod test {
let result = copy_lines(
&mut writer,
2,
&CancellationToken::new(),
&KillSignal::default(),
|buffer| {
if offset >= data.len() {
return Ok(0);
Expand Down
6 changes: 3 additions & 3 deletions src/shell/commands/mkdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl ShellCommand for MkdirCommand {
async move {
execute_with_cancellation!(
mkdir_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand All @@ -38,10 +38,10 @@ async fn mkdir_command(
mut stderr: ShellPipeWriter,
) -> ExecuteResult {
match execute_mkdir(cwd, args).await {
Ok(()) => ExecuteResult::Continue(0, Vec::new(), Vec::new()),
Ok(()) => ExecuteResult::from_exit_code(0),
Err(err) => {
let _ = stderr.write_line(&format!("mkdir: {err}"));
ExecuteResult::Continue(1, Vec::new(), Vec::new())
ExecuteResult::from_exit_code(1)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/shell/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ pub trait ShellCommand {
}

macro_rules! execute_with_cancellation {
($result_expr:expr, $token:expr) => {
($result_expr:expr, $kill_signal:expr) => {
tokio::select! {
result = $result_expr => {
result
},
_ = $token.cancelled() => {
ExecuteResult::for_cancellation()
signal = $kill_signal.wait_aborted() => {
ExecuteResult::from_exit_code(signal.aborted_code())
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/shell/commands/rm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ShellCommand for RmCommand {
async move {
execute_with_cancellation!(
rm_command(context.state.cwd(), context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down
2 changes: 1 addition & 1 deletion src/shell/commands/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ShellCommand for SleepCommand {
async move {
execute_with_cancellation!(
sleep_command(context.args, context.stderr),
context.state.token()
context.state.kill_signal()
)
}
.boxed_local()
Expand Down
Loading

0 comments on commit af5d82e

Please sign in to comment.