Skip to content

Forward lines from stderr of child process to stdout on the same thread, instead of spawning a thread #940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -22,10 +22,10 @@ rust-version = "1.53"
[target.'cfg(unix)'.dependencies]
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
# which is still an issue with `resolver = "1"`.
libc = { version = "0.2.62", default-features = false }
libc = { version = "0.2.62", default-features = false, optional = true }

[features]
parallel = []
parallel = ["libc"]

[dev-dependencies]
tempfile = "3"
3 changes: 1 addition & 2 deletions dev-tools/gen-windows-sys-binding/windows_sys.list
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Windows.Win32.Foundation.FILETIME
Windows.Win32.Foundation.INVALID_HANDLE_VALUE
Windows.Win32.Foundation.ERROR_NO_MORE_ITEMS
Windows.Win32.Foundation.ERROR_SUCCESS
Windows.Win32.Foundation.SysFreeString
@@ -20,7 +19,7 @@ Windows.Win32.System.Com.COINIT_MULTITHREADED
Windows.Win32.System.Com.CoCreateInstance
Windows.Win32.System.Com.CoInitializeEx

Windows.Win32.System.Pipes.CreatePipe
Windows.Win32.System.Pipes.PeekNamedPipe

Windows.Win32.System.Registry.RegCloseKey
Windows.Win32.System.Registry.RegEnumKeyExW
253 changes: 163 additions & 90 deletions src/command_helpers.rs
Original file line number Diff line number Diff line change
@@ -4,13 +4,12 @@ use std::{
collections::hash_map,
ffi::OsString,
fmt::Display,
fs::{self, File},
fs,
hash::Hasher,
io::{self, BufRead, BufReader, Read, Write},
io::{self, Read, Write},
path::Path,
process::{Child, Command, Stdio},
process::{Child, ChildStderr, Command, Stdio},
sync::Arc,
thread::{self, JoinHandle},
};

use crate::{Error, ErrorKind, Object};
@@ -41,83 +40,164 @@ impl CargoOutput {
}
}

pub(crate) fn print_thread(&self) -> Result<Option<PrintThread>, Error> {
self.warnings.then(PrintThread::new).transpose()
fn stdio_for_warnings(&self) -> Stdio {
if self.warnings {
Stdio::piped()
} else {
Stdio::null()
}
}
}

pub(crate) struct PrintThread {
handle: Option<JoinHandle<()>>,
pipe_writer: Option<File>,
pub(crate) struct StderrForwarder {
inner: Option<(ChildStderr, Vec<u8>)>,
#[cfg(feature = "parallel")]
is_non_blocking: bool,
#[cfg(feature = "parallel")]
bytes_available_failed: bool,
}

impl PrintThread {
pub(crate) fn new() -> Result<Self, Error> {
let (pipe_reader, pipe_writer) = crate::os_pipe::pipe()?;

// Capture the standard error coming from compilation, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
let print = thread::spawn(move || {
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
let mut line = Vec::with_capacity(20);
let stdout = io::stdout();

// read_until returns 0 on Eof
while stderr.read_until(b'\n', &mut line).unwrap() != 0 {
{
let mut stdout = stdout.lock();

stdout.write_all(b"cargo:warning=").unwrap();
stdout.write_all(&line).unwrap();
stdout.write_all(b"\n").unwrap();
}
const MIN_BUFFER_CAPACITY: usize = 100;

// read_until does not clear the buffer
line.clear();
}
});
impl StderrForwarder {
pub(crate) fn new(child: &mut Child) -> Self {
Self {
inner: child
.stderr
.take()
.map(|stderr| (stderr, Vec::with_capacity(MIN_BUFFER_CAPACITY))),
#[cfg(feature = "parallel")]
is_non_blocking: false,
#[cfg(feature = "parallel")]
bytes_available_failed: false,
}
}

Ok(Self {
handle: Some(print),
pipe_writer: Some(pipe_writer),
})
#[allow(clippy::uninit_vec)]
fn forward_available(&mut self) -> bool {
if let Some((stderr, buffer)) = self.inner.as_mut() {
loop {
let old_data_end = buffer.len();

// For non-blocking we check to see if there is data available, so we should try to
// read at least that much. For blocking, always read at least the minimum amount.
#[cfg(not(feature = "parallel"))]
let to_reserve = MIN_BUFFER_CAPACITY;
#[cfg(feature = "parallel")]
let to_reserve = if self.is_non_blocking && !self.bytes_available_failed {
match crate::parallel::stderr::bytes_available(stderr) {
#[cfg(windows)]
Ok(0) => return false,
#[cfg(unix)]
Ok(0) => {
// On Unix, depending on the implementation, we may sometimes get 0 in a
// loop (either there is data available or the pipe is broken), so
// continue with the non-blocking read anyway.
MIN_BUFFER_CAPACITY
}
#[cfg(windows)]
Err(_) => {
// On Windows, if we get an error then the pipe is broken, so flush
// the buffer and bail.
if !buffer.is_empty() {
write_warning(&buffer[..]);
}
self.inner = None;
return true;
}
#[cfg(unix)]
Err(_) => {
// On Unix, depending on the implementation, we may get spurious
// errors so make a note not to use bytes_available again and try
// the non-blocking read anyway.
self.bytes_available_failed = true;
MIN_BUFFER_CAPACITY
}
Ok(bytes_available) => MIN_BUFFER_CAPACITY.max(bytes_available),
}
} else {
MIN_BUFFER_CAPACITY
};
buffer.reserve(to_reserve);

// SAFETY: 1) the length is set to the capacity, so we are never using memory beyond
// the underlying buffer and 2) we always call `truncate` below to set the len back
// to the intitialized data.
unsafe {
buffer.set_len(buffer.capacity());
}
match stderr.read(&mut buffer[old_data_end..]) {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No data currently, yield back.
buffer.truncate(old_data_end);
return false;
}
Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
// Interrupted, try again.
buffer.truncate(old_data_end);
}
Ok(0) | Err(_) => {
// End of stream: flush remaining data and bail.
if old_data_end > 0 {
write_warning(&buffer[..old_data_end]);
}
self.inner = None;
return true;
}
Ok(bytes_read) => {
buffer.truncate(old_data_end + bytes_read);
let mut consumed = 0;
for line in buffer.split_inclusive(|&b| b == b'\n') {
// Only forward complete lines, leave the rest in the buffer.
if let Some((b'\n', line)) = line.split_last() {
consumed += line.len() + 1;
write_warning(line);
}
}
buffer.drain(..consumed);
}
}
}
} else {
true
}
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn take_pipe_writer(&mut self) -> File {
self.pipe_writer.take().unwrap()
#[cfg(feature = "parallel")]
pub(crate) fn set_non_blocking(&mut self) -> Result<(), Error> {
assert!(!self.is_non_blocking);

if let Some((stderr, _)) = self.inner.as_mut() {
crate::parallel::stderr::set_non_blocking(stderr)?;
}

self.is_non_blocking = true;
Ok(())
}

/// # Panics
///
/// Will panic if the pipe writer has already been taken.
pub(crate) fn clone_pipe_writer(&self) -> Result<File, Error> {
self.try_clone_pipe_writer().map(Option::unwrap)
#[cfg(feature = "parallel")]
fn forward_all(&mut self) {
while !self.forward_available() {}
}

pub(crate) fn try_clone_pipe_writer(&self) -> Result<Option<File>, Error> {
self.pipe_writer
.as_ref()
.map(File::try_clone)
.transpose()
.map_err(From::from)
#[cfg(not(feature = "parallel"))]
fn forward_all(&mut self) {
let forward_result = self.forward_available();
assert!(forward_result, "Should have consumed all data");
}
}

impl Drop for PrintThread {
fn drop(&mut self) {
// Drop pipe_writer first to avoid deadlock
self.pipe_writer.take();

self.handle.take().unwrap().join().unwrap();
}
fn write_warning(line: &[u8]) {
let stdout = io::stdout();
let mut stdout = stdout.lock();
stdout.write_all(b"cargo:warning=").unwrap();
stdout.write_all(line).unwrap();
stdout.write_all(b"\n").unwrap();
}

fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
StderrForwarder::new(child).forward_all();

let status = match child.wait() {
Ok(s) => s,
Err(e) => {
@@ -193,20 +273,13 @@ pub(crate) fn objects_from_files(files: &[Arc<Path>], dst: &Path) -> Result<Vec<
Ok(objects)
}

fn run_inner(cmd: &mut Command, program: &str, pipe_writer: Option<File>) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run(
cmd: &mut Command,
program: &str,
print: Option<&PrintThread>,
cargo_output: &CargoOutput,
) -> Result<(), Error> {
let pipe_writer = print.map(PrintThread::clone_pipe_writer).transpose()?;
run_inner(cmd, program, pipe_writer)?;

Ok(())
let mut child = spawn(cmd, program, cargo_output)?;
wait_on_child(cmd, program, &mut child)
}

pub(crate) fn run_output(
@@ -216,12 +289,7 @@ pub(crate) fn run_output(
) -> Result<Vec<u8>, Error> {
cmd.stdout(Stdio::piped());

let mut print = cargo_output.print_thread()?;
let mut child = spawn(
cmd,
program,
print.as_mut().map(PrintThread::take_pipe_writer),
)?;
let mut child = spawn(cmd, program, cargo_output)?;

let mut stdout = vec![];
child
@@ -239,7 +307,7 @@ pub(crate) fn run_output(
pub(crate) fn spawn(
cmd: &mut Command,
program: &str,
pipe_writer: Option<File>,
cargo_output: &CargoOutput,
) -> Result<Child, Error> {
struct ResetStderr<'cmd>(&'cmd mut Command);

@@ -254,10 +322,7 @@ pub(crate) fn spawn(
println!("running: {:?}", cmd);

let cmd = ResetStderr(cmd);
let child = cmd
.0
.stderr(pipe_writer.map_or_else(Stdio::null, Stdio::from))
.spawn();
let child = cmd.0.stderr(cargo_output.stdio_for_warnings()).spawn();
match child {
Ok(child) => Ok(child),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
@@ -307,9 +372,14 @@ pub(crate) fn try_wait_on_child(
program: &str,
child: &mut Child,
stdout: &mut dyn io::Write,
stderr_forwarder: &mut StderrForwarder,
) -> Result<Option<()>, Error> {
stderr_forwarder.forward_available();

match child.try_wait() {
Ok(Some(status)) => {
stderr_forwarder.forward_all();

let _ = writeln!(stdout, "{}", status);

if status.success() {
@@ -325,12 +395,15 @@ pub(crate) fn try_wait_on_child(
}
}
Ok(None) => Ok(None),
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
)),
Err(e) => {
stderr_forwarder.forward_all();
Err(Error::new(
ErrorKind::ToolExecError,
format!(
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
))
}
}
}
Loading