Skip to content

Parallel Directory Walking: Code Companion

Reference code for the Parallel Directory Walking lecture. Sections correspond to the lecture document.


Section 1: The Directory Entry Abstraction

/// A directory entry with a possible error attached.
///
/// The error typically refers to a problem parsing ignore files in a
/// particular directory.
#[derive(Clone, Debug)]
pub struct DirEntry {
    dent: DirEntryInner,
    err: Option<Error>,  // Error attachment pattern - errors travel with entries
}

/// DirEntryInner is the implementation of DirEntry.
///
/// It specifically represents three distinct sources of directory entries:
///
/// 1. From the walkdir crate.
/// 2. Special entries that represent things like stdin.
/// 3. From a path.
#[derive(Clone, Debug)]
enum DirEntryInner {
    Stdin,                      // Pseudo-entry for stdin input
    Walkdir(walkdir::DirEntry), // From sequential iteration
    Raw(DirEntryRaw),           // From parallel iteration
}

impl DirEntryInner {
    fn path(&self) -> &Path {
        use self::DirEntryInner::*;
        match *self {
            Stdin => Path::new("<stdin>"),
            Walkdir(ref x) => x.path(),
            Raw(ref x) => x.path(),
        }
    }

    fn metadata(&self) -> Result<Metadata, Error> {
        use self::DirEntryInner::*;
        match *self {
            Stdin => {
                // Stdin has no real metadata - synthesize an error
                let err = Error::Io(io::Error::new(
                    io::ErrorKind::Other,
                    "<stdin> has no metadata",
                ));
                Err(err.with_path("<stdin>"))
            }
            Walkdir(ref x) => x.metadata().map_err(|err| {
                Error::Io(io::Error::from(err)).with_path(x.path())
            }),
            Raw(ref x) => x.metadata(),
        }
    }

    fn is_dir(&self) -> bool {
        self.file_type().map(|ft| ft.is_dir()).unwrap_or(false)
    }
}

The three-variant enum provides a unified interface regardless of entry source. The err field enables the "error attachment" pattern where parsing errors don't abort traversal.


Section 2: Platform-Specific Raw Entries

/// DirEntryRaw is essentially copied from the walkdir crate so that we can
/// build `DirEntry`s from whole cloth in the parallel iterator.
#[derive(Clone)]
struct DirEntryRaw {
    path: PathBuf,
    ty: FileType,
    follow_link: bool,
    depth: usize,
    #[cfg(unix)]
    ino: u64,              // Unix: inode comes cheap from readdir
    #[cfg(windows)]
    metadata: fs::Metadata, // Windows: full metadata comes free
}

impl DirEntryRaw {
    // Platform-specific construction from directory entry
    #[cfg(unix)]
    fn from_entry_os(
        depth: usize,
        ent: &fs::DirEntry,
        ty: fs::FileType,
    ) -> Result<DirEntryRaw, Error> {
        use std::os::unix::fs::DirEntryExt;

        Ok(DirEntryRaw {
            path: ent.path(),
            ty,
            follow_link: false,
            depth,
            ino: ent.ino(),  // Cheap on Unix - no stat required
        })
    }

    #[cfg(windows)]
    fn from_entry_os(
        depth: usize,
        ent: &fs::DirEntry,
        ty: fs::FileType,
    ) -> Result<DirEntryRaw, Error> {
        let md = ent.metadata().map_err(|err| {
            let err = Error::Io(io::Error::from(err)).with_path(ent.path());
            Error::WithDepth { depth, err: Box::new(err) }
        })?;
        Ok(DirEntryRaw {
            path: ent.path(),
            ty,
            follow_link: false,
            depth,
            metadata: md,  // Free on Windows during directory reading
        })
    }

    // Platform-specific metadata access
    #[cfg(windows)]
    fn metadata_internal(&self) -> Result<fs::Metadata, Error> {
        if self.follow_link {
            fs::metadata(&self.path)  // Follow symlink - need fresh stat
        } else {
            Ok(self.metadata.clone()) // Use cached metadata
        }
        .map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path))
    }

    #[cfg(not(windows))]
    fn metadata_internal(&self) -> Result<fs::Metadata, Error> {
        if self.follow_link {
            fs::metadata(&self.path)
        } else {
            fs::symlink_metadata(&self.path)  // Don't follow symlinks
        }
        .map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path))
    }
}

The #[cfg(unix)] and #[cfg(windows)] attributes compile completely different struct layouts and implementations. This avoids runtime branching and stores only what's cheap to obtain on each platform.


Section 3: WalkBuilder and the Builder Pattern

#[derive(Clone)]
pub struct WalkBuilder {
    paths: Vec<PathBuf>,
    ig_builder: IgnoreBuilder,
    max_depth: Option<usize>,
    min_depth: Option<usize>,
    max_filesize: Option<u64>,
    follow_links: bool,
    same_file_system: bool,
    sorter: Option<Sorter>,
    threads: usize,
    skip: Option<Arc<Handle>>,
    filter: Option<Filter>,
    /// Lazy initialization - only query CWD if needed, and only once
    global_gitignores_relative_to:
        OnceLock<Result<PathBuf, Arc<std::io::Error>>>,
}

impl WalkBuilder {
    /// Create a new builder for a recursive directory iterator
    pub fn new<P: AsRef<Path>>(path: P) -> WalkBuilder {
        WalkBuilder {
            paths: vec![path.as_ref().to_path_buf()],
            ig_builder: IgnoreBuilder::new(),
            max_depth: None,
            // ... other defaults
            threads: 0,  // 0 means auto-detect
            global_gitignores_relative_to: OnceLock::new(),
        }
    }

    /// Build a new `Walk` iterator (sequential)
    pub fn build(&self) -> Walk {
        // ... configuration applied to walkdir
    }

    /// Build a new `WalkParallel` iterator
    /// Note: returns a type that does NOT implement Iterator
    pub fn build_parallel(&self) -> WalkParallel {
        let ig_root = self
            .get_or_set_current_dir()
            .map(|cwd| self.ig_builder.build_with_cwd(Some(cwd.to_path_buf())))
            .unwrap_or_else(|| self.ig_builder.build());
        WalkParallel {
            paths: self.paths.clone().into_iter(),
            ig_root,
            // ... other configuration
        }
    }

    /// Lazy CWD initialization - called once, result cached
    fn get_or_set_current_dir(&self) -> Option<&Path> {
        let result = self.global_gitignores_relative_to.get_or_init(|| {
            let result = std::env::current_dir().map_err(Arc::new);
            match result {
                Ok(ref path) => {
                    log::trace!(
                        "automatically discovered CWD: {}",
                        path.display()
                    );
                }
                Err(ref err) => {
                    log::debug!(
                        "failed to find CWD \
                         (global gitignores will be ignored): {err}"
                    );
                }
            }
            result
        });
        result.as_ref().ok().map(|path| &**path)
    }
}

The OnceLock ensures the potentially expensive CWD lookup happens at most once. The builder can produce two fundamentally different types: Walk (implements Iterator) and WalkParallel (callback-based).


Section 4: The Single-Threaded Walk Iterator

pub struct Walk {
    its: std::vec::IntoIter<(PathBuf, Option<WalkEventIter>)>,
    it: Option<WalkEventIter>,
    ig_root: Ignore,
    ig: Ignore,           // Current ignore state - changes as we traverse
    max_filesize: Option<u64>,
    skip: Option<Arc<Handle>>,
    filter: Option<Filter>,
}

impl Walk {
    fn skip_entry(&self, ent: &DirEntry) -> Result<bool, Error> {
        if ent.depth() == 0 {
            return Ok(false);  // Never skip root entries
        }
        // CRITICAL: Trivial skipping BEFORE expensive filesystem operations
        // This prevents unnecessary file downloads on remote filesystems
        if should_skip_entry(&self.ig, ent) {
            return Ok(true);
        }
        if let Some(ref stdout) = self.skip {
            if path_equals(ent, stdout)? {
                return Ok(true);
            }
        }
        // File size check requires metadata - do it last
        if self.max_filesize.is_some() && !ent.is_dir() {
            return Ok(skip_filesize(
                self.max_filesize.unwrap(),
                ent.path(),
                &ent.metadata().ok(),
            ));
        }
        Ok(false)
    }
}

impl Iterator for Walk {
    type Item = Result<DirEntry, Error>;

    fn next(&mut self) -> Option<Result<DirEntry, Error>> {
        loop {
            let ev = match self.it.as_mut().and_then(|it| it.next()) {
                Some(ev) => ev,
                None => {
                    // Move to next root path
                    match self.its.next() {
                        None => return None,
                        Some((_, None)) => {
                            return Some(Ok(DirEntry::new_stdin()));
                        }
                        Some((path, Some(it))) => {
                            self.it = Some(it);
                            if path.is_dir() {
                                // Add parent ignore rules for new root
                                let (ig, err) = self.ig_root.add_parents(path);
                                self.ig = ig;
                                if let Some(err) = err {
                                    return Some(Err(err));
                                }
                            }
                        }
                    }
                    continue;
                }
            };
            match ev {
                Err(err) => return Some(Err(Error::from_walkdir(err))),
                Ok(WalkEvent::Exit) => {
                    // Pop ignore rules when leaving directory
                    self.ig = self.ig.parent().unwrap();
                }
                Ok(WalkEvent::Dir(ent)) => {
                    // Push new ignore rules when entering directory
                    let (igtmp, err) = self.ig.add_child(ent.path());
                    self.ig = igtmp;
                    // ... return directory entry
                }
                Ok(WalkEvent::File(ent)) => {
                    // ... handle file entry
                }
            }
        }
    }
}

The ig field maintains the current ignore matcher state. It grows when entering directories (via add_child) and shrinks when exiting (via parent()). The skip_entry method orders checks from cheapest to most expensive.


Section 5: WalkEvent and Directory Structure

/// Transforms flat walkdir stream into structured enter/exit events
struct WalkEventIter {
    depth: usize,
    it: walkdir::IntoIter,
    next: Option<Result<walkdir::DirEntry, walkdir::Error>>,  // Lookahead buffer
}

#[derive(Debug)]
enum WalkEvent {
    Dir(walkdir::DirEntry),   // Entering a directory
    File(walkdir::DirEntry),  // A file (non-directory)
    Exit,                     // Leaving a directory
}

impl Iterator for WalkEventIter {
    type Item = walkdir::Result<WalkEvent>;

    fn next(&mut self) -> Option<walkdir::Result<WalkEvent>> {
        // Take from lookahead buffer, or get next from underlying iterator
        let dent = self.next.take().or_else(|| self.it.next());

        let depth = match dent {
            None => 0,
            Some(Ok(ref dent)) => dent.depth(),
            Some(Err(ref err)) => err.depth(),
        };

        // Key insight: if depth decreased, we've finished a directory
        if depth < self.depth {
            self.depth -= 1;
            self.next = dent;  // Stash for next call
            return Some(Ok(WalkEvent::Exit));  // Synthesize exit event
        }

        self.depth = depth;
        match dent {
            None => None,
            Some(Err(err)) => Some(Err(err)),
            Some(Ok(dent)) => {
                if walkdir_is_dir(&dent) {
                    self.depth += 1;
                    Some(Ok(WalkEvent::Dir(dent)))
                } else {
                    Some(Ok(WalkEvent::File(dent)))
                }
            }
        }
    }
}

The lookahead-and-stash pattern: when depth decreases, we save the current entry in self.next, emit a synthetic Exit event, and return the stashed entry on the subsequent call. This transforms the flat stream into a structured tree traversal.


Section 6: WalkState and Parallel Visitor Traits

/// Control flow for parallel traversal
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum WalkState {
    Continue,  // Keep walking
    Skip,      // Don't descend into this directory
    Quit,      // Stop all workers as soon as possible
}

/// Builder creates per-thread visitors - no sharing needed
pub trait ParallelVisitorBuilder<'s> {
    fn build(&mut self) -> Box<dyn ParallelVisitor + 's>;
}

/// Each thread gets its own visitor instance
pub trait ParallelVisitor: Send {
    fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState;
}

// Convenience wrapper for closure-based visitors
struct FnBuilder<F> {
    builder: F,
}

impl<'s, F: FnMut() -> FnVisitor<'s>> ParallelVisitorBuilder<'s>
    for FnBuilder<F>
{
    fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
        let visitor = (self.builder)();  // Call factory to create visitor
        Box::new(FnVisitorImp { visitor })
    }
}

type FnVisitor<'s> =
    Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's>;

The ParallelVisitorBuilder creates separate visitors for each thread, enabling lock-free accumulation of results in thread-local storage. The 's lifetime parameter allows visitors to borrow from their environment.


Section 7: The Work-Stealing Stack

/// A work-stealing stack
#[derive(Debug)]
struct Stack {
    index: usize,                      // This thread's index
    deque: Deque<Message>,             // Thread-local work stack
    stealers: Arc<[Stealer<Message>]>, // Access to other threads' work
}

impl Stack {
    /// Create stacks for all threads, distribute initial work
    fn new_for_each_thread(threads: usize, init: Vec<Message>) -> Vec<Stack> {
        // LIFO ensures depth-first traversal - critical for memory usage
        let deques: Vec<Deque<Message>> =
            std::iter::repeat_with(Deque::new_lifo).take(threads).collect();

        let stealers = Arc::<[Stealer<Message>]>::from(
            deques.iter().map(Deque::stealer).collect::<Vec<_>>(),
        );

        let stacks: Vec<Stack> = deques
            .into_iter()
            .enumerate()
            .map(|(index, deque)| Stack {
                index,
                deque,
                stealers: stealers.clone(),
            })
            .collect();

        // Round-robin distribution of initial work
        init.into_iter()
            .rev()  // Reverse to cancel LIFO effect
            .zip(stacks.iter().cycle())
            .for_each(|(m, s)| s.push(m));

        stacks
    }

    fn pop(&self) -> Option<Message> {
        // Try local first, then steal from others
        self.deque.pop().or_else(|| self.steal())
    }

    fn steal(&self) -> Option<Message> {
        // Fairness: steal from index+1, index+2, ..., wrap to 0, 1, ...
        let (left, right) = self.stealers.split_at(self.index);
        let right = &right[1..];  // Don't steal from ourselves

        right
            .iter()
            .chain(left.iter())
            .map(|s| s.steal_batch_and_pop(&self.deque))
            .find_map(|s| s.success())
    }
}

The crossbeam_deque crate provides the work-stealing primitives. Using new_lifo() ensures depth-first traversal, which dramatically reduces memory usage on wide directory trees. The steal operation takes a batch to amortize synchronization overhead.


Section 8: Worker Thread Implementation

struct Worker<'s> {
    visitor: Box<dyn ParallelVisitor + 's>,
    stack: Stack,
    quit_now: Arc<AtomicBool>,           // Global quit signal
    active_workers: Arc<AtomicUsize>,    // Termination detection
    max_depth: Option<usize>,
    // ... other configuration
}

impl<'s> Worker<'s> {
    fn run(mut self) {
        while let Some(work) = self.get_work() {
            if let WalkState::Quit = self.run_one(work) {
                self.quit_now();
            }
        }
    }

    fn get_work(&mut self) -> Option<Work> {
        let mut value = self.recv();
        loop {
            // Priority channel simulation: quit beats all other messages
            if self.is_quit_now() {
                value = Some(Message::Quit)
            }
            match value {
                Some(Message::Work(work)) => return Some(work),
                Some(Message::Quit) => {
                    // Propagate quit to wake other threads (domino effect)
                    self.send_quit();
                    return None;
                }
                None => {
                    // No work available - check if we're done
                    if self.deactivate_worker() == 0 {
                        // All workers idle + all deques empty = done
                        self.send_quit();
                        return None;
                    }
                    // Wait for work to appear
                    loop {
                        if let Some(v) = self.recv() {
                            self.activate_worker();
                            value = Some(v);
                            break;
                        }
                        // Sleep to avoid CPU spin
                        std::thread::sleep(std::time::Duration::from_millis(1));
                    }
                }
            }
        }
    }

    /// Deactivates a worker and returns count of still-active workers
    fn deactivate_worker(&self) -> usize {
        self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
    }

    fn activate_worker(&self) {
        self.active_workers.fetch_add(1, AtomicOrdering::Release);
    }
}

Termination detection uses an atomic counter: when a worker finds no work, it decrements active_workers. If this returns 0, all workers are simultaneously idle with empty deques, meaning work is truly exhausted. The Acquire/Release ordering ensures proper synchronization.


Section 9: Directory Processing and Work Generation

struct Work {
    dent: DirEntry,
    ignore: Ignore,           // Ignore rules for this directory's parents
    root_device: Option<u64>, // For same_file_system check
}

impl Work {
    fn read_dir(&mut self) -> Result<fs::ReadDir, Error> {
        let readdir = match fs::read_dir(self.dent.path()) {
            Ok(readdir) => readdir,
            Err(err) => {
                return Err(Error::from(err)
                    .with_path(self.dent.path())
                    .with_depth(self.dent.depth()));
            }
        };
        // Add ignore rules from this directory's .gitignore etc
        let (ig, err) = self.ignore.add_child(self.dent.path());
        self.ignore = ig;
        self.dent.err = err;  // Attach parsing error to entry
        Ok(readdir)
    }
}

impl<'s> Worker<'s> {
    fn generate_work(
        &mut self,
        ig: &Ignore,
        depth: usize,
        root_device: Option<u64>,
        result: Result<fs::DirEntry, io::Error>,
    ) -> WalkState {
        let fs_dent = match result {
            Ok(fs_dent) => fs_dent,
            Err(err) => {
                return self.visitor.visit(Err(Error::from(err).with_depth(depth)));
            }
        };

        let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) {
            Ok(dent) => DirEntry::new_raw(dent, None),
            Err(err) => return self.visitor.visit(Err(err)),
        };

        // Handle symlinks - potentially resolve and check for loops
        if self.follow_links && dent.file_type().map_or(false, |ft| ft.is_symlink()) {
            // ... symlink resolution code
            if dent.is_dir() {
                if let Err(err) = check_symlink_loop(ig, dent.path(), depth) {
                    return self.visitor.visit(Err(err));
                }
            }
        }

        // CRITICAL: Ignore check BEFORE expensive operations
        if should_skip_entry(ig, &dent) {
            return WalkState::Continue;
        }

        // Additional checks: stdout skip, filesize, filter
        // ...

        // Push work for later processing
        self.send(Work { dent, ignore: ig.clone(), root_device });
        WalkState::Continue
    }
}

Each directory entry goes through: raw entry construction, symlink handling (with loop detection), ignore matching, and finally work submission. The ignore check happens early to avoid expensive metadata operations on files that will be skipped anyway.


Quick Reference

Key Types

Type Purpose
DirEntry Unified directory entry with attached error
DirEntryInner Three-variant enum: Stdin, Walkdir, Raw
DirEntryRaw Platform-specific raw entry for parallel iteration
WalkBuilder Configurable builder for both iterator types
Walk Sequential iterator (implements Iterator)
WalkParallel Parallel traversal (callback-based, not Iterator)
WalkState Control flow: Continue, Skip, Quit
Stack Work-stealing deque wrapper
Worker Per-thread traversal worker
Work Unit of work: entry + ignore rules + device

Platform Differences

Unix:
  - DirEntryRaw stores: path, type, depth, inode
  - Inode from readdir (cheap)
  - Metadata requires stat call

Windows:
  - DirEntryRaw stores: path, type, depth, full metadata
  - Metadata from directory read (free)
  - No inode concept

Traversal Order Priority

1. Glob overrides (whitelist/ignore)
2. Ignore files (.ignore > .gitignore > .git/info/exclude > global)
3. File type matcher
4. Hidden file check
5. File size limit
6. Yielded to caller

Work-Stealing Algorithm

Local pop (LIFO) → Steal from neighbors (round-robin)
                   steal_batch_and_pop
                   Depth-first traversal