Skip to content

ripgrep crates/cli/src/process.rs: Code Companion

Reference code for the Process Management lecture. Sections correspond to the lecture document.


Section 1: The Custom Error Type Strategy

/// An error that can occur while running a command and reading its output.
///
/// This error can be seamlessly converted to an `io::Error` via a `From`
/// implementation.
#[derive(Debug)]
pub struct CommandError {
    kind: CommandErrorKind,  // Private field - users can't match on variants
}

#[derive(Debug)]
enum CommandErrorKind {
    Io(io::Error),           // System-level I/O failures
    Stderr(Vec<u8>),         // Process stderr output (raw bytes, may not be UTF-8)
}

impl CommandError {
    /// Create an error from an I/O error.
    pub(crate) fn io(ioerr: io::Error) -> CommandError {
        CommandError { kind: CommandErrorKind::Io(ioerr) }
    }

    /// Create an error from the contents of stderr (which may be empty).
    pub(crate) fn stderr(bytes: Vec<u8>) -> CommandError {
        CommandError { kind: CommandErrorKind::Stderr(bytes) }
    }

    /// Returns true if and only if this error has empty data from stderr.
    pub(crate) fn is_empty(&self) -> bool {
        match self.kind {
            CommandErrorKind::Stderr(ref bytes) => bytes.is_empty(),
            _ => false,
        }
    }
}

impl std::error::Error for CommandError {}

The pub(crate) constructors limit creation to within the crate while the struct itself is public. The is_empty method will be important for distinguishing expected pipe errors from real failures.


Section 2: Error Display Formatting for Human Readability

impl std::fmt::Display for CommandError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self.kind {
            // I/O errors delegate to their own Display implementation
            CommandErrorKind::Io(ref e) => e.fmt(f),
            CommandErrorKind::Stderr(ref bytes) => {
                // Lossy conversion handles non-UTF-8 stderr gracefully
                let msg = String::from_utf8_lossy(bytes);
                if msg.trim().is_empty() {
                    // Explicit message when process wrote nothing useful
                    write!(f, "<stderr is empty>")
                } else {
                    // Visual box separates external tool output from ripgrep messages
                    let div = "-".repeat(79);
                    write!(
                        f,
                        "\n{div}\n{msg}\n{div}",
                        div = div,
                        msg = msg.trim()  // Clean up whitespace for readability
                    )
                }
            }
        }
    }
}

The from_utf8_lossy function replaces invalid UTF-8 sequences with the Unicode replacement character, ensuring the error message is always displayable even if the external tool wrote binary data to stderr.


Section 3: Bidirectional Error Conversion

// Converting io::Error → CommandError (wrapping)
impl From<io::Error> for CommandError {
    fn from(ioerr: io::Error) -> CommandError {
        CommandError { kind: CommandErrorKind::Io(ioerr) }
    }
}

// Converting CommandError → io::Error (unwrapping or wrapping)
impl From<CommandError> for io::Error {
    fn from(cmderr: CommandError) -> io::Error {
        match cmderr.kind {
            // I/O errors unwrap cleanly - no information lost
            CommandErrorKind::Io(ioerr) => ioerr,
            // Stderr errors become "Other" with the CommandError as source
            CommandErrorKind::Stderr(_) => {
                io::Error::new(io::ErrorKind::Other, cmderr)
            }
        }
    }
}

These implementations enable the ? operator to work seamlessly. A function returning Result<T, io::Error> can call functions returning Result<T, CommandError> and vice versa.


Section 4: The Builder Pattern for Command Configuration

/// Configures and builds a streaming reader for process output.
#[derive(Clone, Debug, Default)]
pub struct CommandReaderBuilder {
    async_stderr: bool,
}

impl CommandReaderBuilder {
    /// Create a new builder with the default configuration.
    pub fn new() -> CommandReaderBuilder {
        CommandReaderBuilder::default()
    }

    /// Build a new streaming reader for the given command's output.
    pub fn build(
        &self,
        command: &mut process::Command,
    ) -> Result<CommandReader, CommandError> {
        // Override stdout/stderr to piped - we need to capture both
        let mut child = command
            .stdout(process::Stdio::piped())
            .stderr(process::Stdio::piped())
            .spawn()?;  // ? converts io::Error to CommandError via From

        // Take ownership of stderr from the child process
        let stderr = if self.async_stderr {
            StderrReader::r#async(child.stderr.take().unwrap())
        } else {
            StderrReader::sync(child.stderr.take().unwrap())
        };
        Ok(CommandReader { child, stderr, eof: false })
    }
}

The take() calls return Option<ChildStderr>, removing the handle from the child and giving ownership to StderrReader. The unwrap() is safe because we just set stderr to piped().


Section 5: Understanding the Deadlock Problem

impl CommandReaderBuilder {
    /// When enabled, the reader will asynchronously read the contents of the
    /// command's stderr output. When disabled, stderr is only read after the
    /// stdout stream has been exhausted (or if the process quits with an error
    /// code).
    ///
    /// Note that when enabled, this may require launching an additional
    /// thread in order to read stderr. This is done so that the process being
    /// executed is never blocked from writing to stdout or stderr. If this is
    /// disabled, then it is possible for the process to fill up the stderr
    /// buffer and deadlock.
    ///
    /// This is enabled by default.
    pub fn async_stderr(&mut self, yes: bool) -> &mut CommandReaderBuilder {
        self.async_stderr = yes;
        self
    }
}

The deadlock scenario:

┌─────────────┐     ┌─────────────┐
│   Process   │     │   Reader    │
├─────────────┤     ├─────────────┤
│ write(stdout)│ ←── │ read(stdout)│  ✓ flowing
│ write(stderr)│     │             │  
│     ↓       │     │             │
│ [buffer full]│     │             │  ✗ blocked
│ [blocked]   │     │ [waiting]   │  ✗ deadlock!
└─────────────┘     └─────────────┘

Section 6: The CommandReader Core Structure

/// A streaming reader for a command's output.
#[derive(Debug)]
pub struct CommandReader {
    child: process::Child,    // Owns the spawned process (including stdout)
    stderr: StderrReader,     // Handles stderr via sync or async reading
    /// This is set to true once 'read' returns zero bytes. When this isn't
    /// set and we close the reader, then we anticipate a pipe error when
    /// reaping the child process and silence it.
    eof: bool,
}

impl CommandReader {
    /// Create a new streaming reader for the given command using the default
    /// configuration.
    pub fn new(
        cmd: &mut process::Command,
    ) -> Result<CommandReader, CommandError> {
        CommandReaderBuilder::new().build(cmd)
    }
}

The Child struct from std::process contains stdout: Option<ChildStdout>. The Option allows taking ownership of stdout later (via take()), which is how the close protocol works.


Section 7: The Close Protocol

impl CommandReader {
    /// Closes the CommandReader, freeing any resources used by its underlying
    /// child process. If the child process exits with a nonzero exit code, the
    /// returned Err value will include its stderr.
    pub fn close(&mut self) -> io::Result<()> {
        // Step 1: Take stdout (if already taken, close() was called before)
        let stdout = match self.child.stdout.take() {
            None => return Ok(()),  // Idempotent: already closed
            Some(stdout) => stdout,
        };

        // Step 2: Drop stdout to signal EOF to child process
        drop(stdout);

        // Step 3: Wait for child and check exit status
        if self.child.wait()?.success() {
            Ok(())
        } else {
            // Step 4: On failure, collect stderr for error message
            let err = self.stderr.read_to_end();

            // Step 5: Handle the early-close case
            // If we didn't read to EOF and stderr is empty, 
            // the "failure" is just a broken pipe - expected behavior
            if !self.eof && err.is_empty() {
                return Ok(());
            }
            Err(io::Error::from(err))
        }
    }
}

The drop(stdout) closes the pipe, which causes the child process to receive EOF on its output. Well-behaved processes will exit cleanly when this happens.


Section 8: The Drop Safety Net

impl Drop for CommandReader {
    fn drop(&mut self) {
        if let Err(error) = self.close() {
            // Log warning instead of panicking - Drop should never panic
            log::warn!("{}", error);
        }
    }
}

The Drop implementation ensures resource cleanup even if the caller forgets to call close(). Using log::warn! instead of panicking follows Rust's guideline that Drop implementations should not panic.


Section 9: The Read Implementation

impl io::Read for CommandReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // Handle already-closed reader
        let stdout = match self.child.stdout {
            None => return Ok(0),  // Signals EOF to caller
            Some(ref mut stdout) => stdout,
        };

        // Delegate to the underlying stdout pipe
        let nread = stdout.read(buf)?;

        if nread == 0 {
            // Reached natural EOF - set flag and trigger cleanup
            self.eof = true;
            self.close().map(|_| 0)  // Return 0 to signal EOF
        } else {
            Ok(nread)
        }
    }
}

The ref mut pattern borrows stdout mutably without taking ownership. This is necessary because we need to read from it while keeping it inside the Option for later cleanup.


Section 10: Async vs Sync Stderr Handling

/// A reader that encapsulates the asynchronous or synchronous reading of
/// stderr.
#[derive(Debug)]
enum StderrReader {
    Async(Option<std::thread::JoinHandle<CommandError>>),
    Sync(process::ChildStderr),
}

impl StderrReader {
    /// Create a reader for stderr that reads contents asynchronously.
    fn r#async(mut stderr: process::ChildStderr) -> StderrReader {
        // Spawn thread that reads stderr and converts to CommandError
        let handle =
            std::thread::spawn(move || stderr_to_command_error(&mut stderr));
        StderrReader::Async(Some(handle))
    }

    /// Create a reader for stderr that reads contents synchronously.
    fn sync(stderr: process::ChildStderr) -> StderrReader {
        StderrReader::Sync(stderr)
    }

    /// Consumes all of stderr on to the heap and returns it as an error.
    fn read_to_end(&mut self) -> CommandError {
        match *self {
            StderrReader::Async(ref mut handle) => {
                let handle = handle
                    .take()
                    .expect("read_to_end cannot be called more than once");
                // join() blocks until thread completes, returns its result
                handle.join().expect("stderr reading thread does not panic")
            }
            StderrReader::Sync(ref mut stderr) => {
                stderr_to_command_error(stderr)
            }
        }
    }
}

fn stderr_to_command_error(stderr: &mut process::ChildStderr) -> CommandError {
    let mut bytes = vec![];
    match stderr.read_to_end(&mut bytes) {
        Ok(_) => CommandError::stderr(bytes),
        Err(err) => CommandError::io(err),
    }
}

The r#async syntax is a raw identifier, necessary because async is a reserved keyword in Rust. The move keyword in the closure transfers ownership of stderr to the spawned thread.


Quick Reference

Type Summary

Type Purpose
CommandError Error type capturing I/O errors or stderr content
CommandReaderBuilder Configures process reading behavior
CommandReader Streaming reader implementing Read trait
StderrReader Internal enum for sync/async stderr handling

Key Methods

Method Returns Purpose
CommandReader::new() Result<CommandReader, CommandError> Spawn process with defaults
CommandReader::close() io::Result<()> Clean up process, return stderr on failure
CommandReaderBuilder::build() Result<CommandReader, CommandError> Spawn with custom config
CommandReaderBuilder::async_stderr() &mut Self Enable/disable async stderr reading

Process Lifecycle

Command::new() → CommandReader::new() → read() loops → close()/drop()
      │                  │                  │              │
      │                  ▼                  ▼              ▼
      │            spawn child        read stdout    drop stdout
      │            pipe stdout/err                   wait for exit
      │            start stderr thread               collect stderr
      ▼                                              return error
  configure args,
  env, cwd

Error Conversion Flow

io::Error ──From──► CommandError ──From──► io::Error
                         │                     │
                    Io variant            unwraps cleanly
                    Stderr variant        wraps as ErrorKind::Other