Skip to content

Commit bb4daeb

Browse files
Christopher Neugebauerstuhood
Christopher Neugebauer
authored andcommitted
Adds workunit for interactive processes. (pantsbuild#17544)
While working on pantsbuild#16825, we discovered that parts of the trace for our `experimental_run_shell_command` were not captured. That's because there was no work unit preserved. This adds such a work unit.
1 parent 955cd55 commit bb4daeb

File tree

1 file changed

+141
-135
lines changed

1 file changed

+141
-135
lines changed

src/rust/engine/src/intrinsics.rs

+141-135
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use rule_graph::DependencyKey;
3636
use stdio::TryCloneAsFile;
3737
use store::{SnapshotOps, SubsetParams};
3838

39+
use workunit_store::{in_workunit, Level};
40+
3941
type IntrinsicFn =
4042
Box<dyn Fn(Context, Vec<Value>) -> BoxFuture<'static, NodeResult<Value>> + Send + Sync>;
4143

@@ -512,147 +514,151 @@ fn interactive_process(
512514
context: Context,
513515
args: Vec<Value>,
514516
) -> BoxFuture<'static, NodeResult<Value>> {
515-
async move {
516-
let types = &context.core.types;
517-
let interactive_process_result = types.interactive_process_result;
518-
519-
let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessConfigFromEnvironment) = Python::with_gil(|py| {
520-
let py_interactive_process = (*args[0]).as_ref(py);
521-
let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap();
522-
let process_config = (*args[1])
523-
.as_ref(py)
524-
.extract()
525-
.unwrap();
526-
(py_interactive_process.extract().unwrap(), py_process, process_config)
527-
});
528-
match process_config.execution_strategy {
529-
ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => Err("InteractiveProcess should not set docker_image or remote_execution".to_owned()),
530-
_ => Ok(())
531-
}?;
532-
let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process;
533-
let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| {
534-
let py_interactive_process_obj = py_interactive_process.to_object(py);
535-
let py_interactive_process = py_interactive_process_obj.as_ref(py);
536-
let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap();
537-
let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap();
538-
let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap();
539-
let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap();
540-
(run_in_workspace, restartable, keep_sandboxes)
541-
});
542-
543-
let session = context.session;
544-
545-
let mut tempdir = create_sandbox(
546-
context.core.executor.clone(),
547-
&context.core.local_execution_root_dir,
548-
"interactive process",
549-
keep_sandboxes,
550-
)?;
551-
prepare_workdir(
552-
tempdir.path().to_owned(),
553-
&process,
554-
process.input_digests.input_files.clone(),
555-
context.core.store(),
556-
context.core.executor.clone(),
557-
&context.core.named_caches,
558-
&context.core.immutable_inputs,
559-
None,
560-
None,
561-
)
562-
.await?;
563-
apply_chroot(tempdir.path().to_str().unwrap(), &mut process);
564-
565-
let p = Path::new(&process.argv[0]);
566-
// TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args
567-
// instead.
568-
let program_name = if !run_in_workspace && p.is_relative() {
569-
let mut buf = PathBuf::new();
570-
buf.push(tempdir.path());
571-
buf.push(p);
572-
buf
573-
} else {
574-
p.to_path_buf()
575-
};
576-
577-
let mut command = process::Command::new(program_name);
578-
if !run_in_workspace {
579-
command.current_dir(tempdir.path());
580-
}
581-
for arg in process.argv[1..].iter() {
582-
command.arg(arg);
583-
}
517+
in_workunit!(
518+
"interactive_process",
519+
Level::Debug,
520+
|_workunit| async move {
521+
let types = &context.core.types;
522+
let interactive_process_result = types.interactive_process_result;
523+
524+
let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessConfigFromEnvironment) = Python::with_gil(|py| {
525+
let py_interactive_process = (*args[0]).as_ref(py);
526+
let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap();
527+
let process_config = (*args[1])
528+
.as_ref(py)
529+
.extract()
530+
.unwrap();
531+
(py_interactive_process.extract().unwrap(), py_process, process_config)
532+
});
533+
match process_config.execution_strategy {
534+
ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => Err("InteractiveProcess should not set docker_image or remote_execution".to_owned()),
535+
_ => Ok(())
536+
}?;
537+
let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process;
538+
let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| {
539+
let py_interactive_process_obj = py_interactive_process.to_object(py);
540+
let py_interactive_process = py_interactive_process_obj.as_ref(py);
541+
let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap();
542+
let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap();
543+
let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap();
544+
let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap();
545+
(run_in_workspace, restartable, keep_sandboxes)
546+
});
547+
548+
let session = context.session;
549+
550+
let mut tempdir = create_sandbox(
551+
context.core.executor.clone(),
552+
&context.core.local_execution_root_dir,
553+
"interactive process",
554+
keep_sandboxes,
555+
)?;
556+
prepare_workdir(
557+
tempdir.path().to_owned(),
558+
&process,
559+
process.input_digests.input_files.clone(),
560+
context.core.store(),
561+
context.core.executor.clone(),
562+
&context.core.named_caches,
563+
&context.core.immutable_inputs,
564+
None,
565+
None,
566+
)
567+
.await?;
568+
apply_chroot(tempdir.path().to_str().unwrap(), &mut process);
569+
570+
let p = Path::new(&process.argv[0]);
571+
// TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args
572+
// instead.
573+
let program_name = if !run_in_workspace && p.is_relative() {
574+
let mut buf = PathBuf::new();
575+
buf.push(tempdir.path());
576+
buf.push(p);
577+
buf
578+
} else {
579+
p.to_path_buf()
580+
};
581+
582+
let mut command = process::Command::new(program_name);
583+
if !run_in_workspace {
584+
command.current_dir(tempdir.path());
585+
}
586+
for arg in process.argv[1..].iter() {
587+
command.arg(arg);
588+
}
584589

585-
command.env_clear();
586-
command.envs(process.env);
590+
command.env_clear();
591+
command.envs(process.env);
587592

588-
if !restartable {
589-
task_side_effected()?;
590-
}
593+
if !restartable {
594+
task_side_effected()?;
595+
}
591596

592-
let exit_status = session.clone()
593-
.with_console_ui_disabled(async move {
594-
// Once any UI is torn down, grab exclusive access to the console.
595-
let (term_stdin, term_stdout, term_stderr) =
596-
stdio::get_destination().exclusive_start(Box::new(|_| {
597-
// A stdio handler that will immediately trigger logging.
598-
Err(())
599-
}))?;
600-
// NB: Command's stdio methods take ownership of a file-like to use, so we use
601-
// `TryCloneAsFile` here to `dup` our thread-local stdio.
602-
command
603-
.stdin(Stdio::from(
604-
term_stdin
605-
.try_clone_as_file()
606-
.map_err(|e| format!("Couldn't clone stdin: {}", e))?,
607-
))
608-
.stdout(Stdio::from(
609-
term_stdout
610-
.try_clone_as_file()
611-
.map_err(|e| format!("Couldn't clone stdout: {}", e))?,
612-
))
613-
.stderr(Stdio::from(
614-
term_stderr
615-
.try_clone_as_file()
616-
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
617-
));
618-
let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?;
619-
tokio::select! {
620-
_ = session.cancelled() => {
621-
// The Session was cancelled: attempt to kill the process group / process, and
622-
// then wait for it to exit (to avoid zombies).
623-
if let Err(e) = subprocess.graceful_shutdown_sync() {
624-
// Failed to kill the PGID: try the non-group form.
625-
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
626-
This is unexpected: please file an issue about this problem at \
627-
[https://github.com/pantsbuild/pants/issues/new]", e);
628-
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
629-
};
630-
subprocess.wait().await.map_err(|e| e.to_string())
631-
}
632-
exit_status = subprocess.wait() => {
633-
// The process exited.
634-
exit_status.map_err(|e| e.to_string())
597+
let exit_status = session.clone()
598+
.with_console_ui_disabled(async move {
599+
// Once any UI is torn down, grab exclusive access to the console.
600+
let (term_stdin, term_stdout, term_stderr) =
601+
stdio::get_destination().exclusive_start(Box::new(|_| {
602+
// A stdio handler that will immediately trigger logging.
603+
Err(())
604+
}))?;
605+
// NB: Command's stdio methods take ownership of a file-like to use, so we use
606+
// `TryCloneAsFile` here to `dup` our thread-local stdio.
607+
command
608+
.stdin(Stdio::from(
609+
term_stdin
610+
.try_clone_as_file()
611+
.map_err(|e| format!("Couldn't clone stdin: {}", e))?,
612+
))
613+
.stdout(Stdio::from(
614+
term_stdout
615+
.try_clone_as_file()
616+
.map_err(|e| format!("Couldn't clone stdout: {}", e))?,
617+
))
618+
.stderr(Stdio::from(
619+
term_stderr
620+
.try_clone_as_file()
621+
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
622+
));
623+
let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?;
624+
tokio::select! {
625+
_ = session.cancelled() => {
626+
// The Session was cancelled: attempt to kill the process group / process, and
627+
// then wait for it to exit (to avoid zombies).
628+
if let Err(e) = subprocess.graceful_shutdown_sync() {
629+
// Failed to kill the PGID: try the non-group form.
630+
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
631+
This is unexpected: please file an issue about this problem at \
632+
[https://github.com/pantsbuild/pants/issues/new]", e);
633+
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
634+
};
635+
subprocess.wait().await.map_err(|e| e.to_string())
636+
}
637+
exit_status = subprocess.wait() => {
638+
// The process exited.
639+
exit_status.map_err(|e| e.to_string())
640+
}
635641
}
636-
}
637-
})
638-
.await?;
642+
})
643+
.await?;
639644

640-
let code = exit_status.code().unwrap_or(-1);
641-
if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {
642-
tempdir.keep("interactive process");
643-
}
645+
let code = exit_status.code().unwrap_or(-1);
646+
if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {
647+
tempdir.keep("interactive process");
648+
}
644649

645-
let result = {
646-
let gil = Python::acquire_gil();
647-
let py = gil.python();
648-
externs::unsafe_call(
649-
py,
650-
interactive_process_result,
651-
&[externs::store_i64(py, i64::from(code))],
652-
)
653-
};
654-
Ok(result)
655-
}.boxed()
650+
let result = {
651+
let gil = Python::acquire_gil();
652+
let py = gil.python();
653+
externs::unsafe_call(
654+
py,
655+
interactive_process_result,
656+
&[externs::store_i64(py, i64::from(code))],
657+
)
658+
};
659+
Ok(result)
660+
}
661+
).boxed()
656662
}
657663

658664
fn docker_resolve_image(

0 commit comments

Comments
 (0)