Parallel Directory Walking: Code Companion¶
Reference code for the Parallel Directory Walking lecture. Sections correspond to the lecture document.
Section 1: The Directory Entry Abstraction¶
/// A directory entry with a possible error attached.
///
/// The error typically refers to a problem parsing ignore files in a
/// particular directory.
#[derive(Clone, Debug)]
pub struct DirEntry {
dent: DirEntryInner,
err: Option<Error>, // Error attachment pattern - errors travel with entries
}
/// DirEntryInner is the implementation of DirEntry.
///
/// It specifically represents three distinct sources of directory entries:
///
/// 1. From the walkdir crate.
/// 2. Special entries that represent things like stdin.
/// 3. From a path.
#[derive(Clone, Debug)]
enum DirEntryInner {
Stdin, // Pseudo-entry for stdin input
Walkdir(walkdir::DirEntry), // From sequential iteration
Raw(DirEntryRaw), // From parallel iteration
}
impl DirEntryInner {
fn path(&self) -> &Path {
use self::DirEntryInner::*;
match *self {
Stdin => Path::new("<stdin>"),
Walkdir(ref x) => x.path(),
Raw(ref x) => x.path(),
}
}
fn metadata(&self) -> Result<Metadata, Error> {
use self::DirEntryInner::*;
match *self {
Stdin => {
// Stdin has no real metadata - synthesize an error
let err = Error::Io(io::Error::new(
io::ErrorKind::Other,
"<stdin> has no metadata",
));
Err(err.with_path("<stdin>"))
}
Walkdir(ref x) => x.metadata().map_err(|err| {
Error::Io(io::Error::from(err)).with_path(x.path())
}),
Raw(ref x) => x.metadata(),
}
}
fn is_dir(&self) -> bool {
self.file_type().map(|ft| ft.is_dir()).unwrap_or(false)
}
}
The three-variant enum provides a unified interface regardless of entry source. The err field enables the "error attachment" pattern where parsing errors don't abort traversal.
Section 2: Platform-Specific Raw Entries¶
/// DirEntryRaw is essentially copied from the walkdir crate so that we can
/// build `DirEntry`s from whole cloth in the parallel iterator.
#[derive(Clone)]
struct DirEntryRaw {
path: PathBuf,
ty: FileType,
follow_link: bool,
depth: usize,
#[cfg(unix)]
ino: u64, // Unix: inode comes cheap from readdir
#[cfg(windows)]
metadata: fs::Metadata, // Windows: full metadata comes free
}
impl DirEntryRaw {
// Platform-specific construction from directory entry
#[cfg(unix)]
fn from_entry_os(
depth: usize,
ent: &fs::DirEntry,
ty: fs::FileType,
) -> Result<DirEntryRaw, Error> {
use std::os::unix::fs::DirEntryExt;
Ok(DirEntryRaw {
path: ent.path(),
ty,
follow_link: false,
depth,
ino: ent.ino(), // Cheap on Unix - no stat required
})
}
#[cfg(windows)]
fn from_entry_os(
depth: usize,
ent: &fs::DirEntry,
ty: fs::FileType,
) -> Result<DirEntryRaw, Error> {
let md = ent.metadata().map_err(|err| {
let err = Error::Io(io::Error::from(err)).with_path(ent.path());
Error::WithDepth { depth, err: Box::new(err) }
})?;
Ok(DirEntryRaw {
path: ent.path(),
ty,
follow_link: false,
depth,
metadata: md, // Free on Windows during directory reading
})
}
// Platform-specific metadata access
#[cfg(windows)]
fn metadata_internal(&self) -> Result<fs::Metadata, Error> {
if self.follow_link {
fs::metadata(&self.path) // Follow symlink - need fresh stat
} else {
Ok(self.metadata.clone()) // Use cached metadata
}
.map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path))
}
#[cfg(not(windows))]
fn metadata_internal(&self) -> Result<fs::Metadata, Error> {
if self.follow_link {
fs::metadata(&self.path)
} else {
fs::symlink_metadata(&self.path) // Don't follow symlinks
}
.map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path))
}
}
The #[cfg(unix)] and #[cfg(windows)] attributes compile completely different struct layouts and implementations. This avoids runtime branching and stores only what's cheap to obtain on each platform.
Section 3: WalkBuilder and the Builder Pattern¶
#[derive(Clone)]
pub struct WalkBuilder {
paths: Vec<PathBuf>,
ig_builder: IgnoreBuilder,
max_depth: Option<usize>,
min_depth: Option<usize>,
max_filesize: Option<u64>,
follow_links: bool,
same_file_system: bool,
sorter: Option<Sorter>,
threads: usize,
skip: Option<Arc<Handle>>,
filter: Option<Filter>,
/// Lazy initialization - only query CWD if needed, and only once
global_gitignores_relative_to:
OnceLock<Result<PathBuf, Arc<std::io::Error>>>,
}
impl WalkBuilder {
/// Create a new builder for a recursive directory iterator
pub fn new<P: AsRef<Path>>(path: P) -> WalkBuilder {
WalkBuilder {
paths: vec![path.as_ref().to_path_buf()],
ig_builder: IgnoreBuilder::new(),
max_depth: None,
// ... other defaults
threads: 0, // 0 means auto-detect
global_gitignores_relative_to: OnceLock::new(),
}
}
/// Build a new `Walk` iterator (sequential)
pub fn build(&self) -> Walk {
// ... configuration applied to walkdir
}
/// Build a new `WalkParallel` iterator
/// Note: returns a type that does NOT implement Iterator
pub fn build_parallel(&self) -> WalkParallel {
let ig_root = self
.get_or_set_current_dir()
.map(|cwd| self.ig_builder.build_with_cwd(Some(cwd.to_path_buf())))
.unwrap_or_else(|| self.ig_builder.build());
WalkParallel {
paths: self.paths.clone().into_iter(),
ig_root,
// ... other configuration
}
}
/// Lazy CWD initialization - called once, result cached
fn get_or_set_current_dir(&self) -> Option<&Path> {
let result = self.global_gitignores_relative_to.get_or_init(|| {
let result = std::env::current_dir().map_err(Arc::new);
match result {
Ok(ref path) => {
log::trace!(
"automatically discovered CWD: {}",
path.display()
);
}
Err(ref err) => {
log::debug!(
"failed to find CWD \
(global gitignores will be ignored): {err}"
);
}
}
result
});
result.as_ref().ok().map(|path| &**path)
}
}
The OnceLock ensures the potentially expensive CWD lookup happens at most once. The builder can produce two fundamentally different types: Walk (implements Iterator) and WalkParallel (callback-based).
Section 4: The Single-Threaded Walk Iterator¶
pub struct Walk {
its: std::vec::IntoIter<(PathBuf, Option<WalkEventIter>)>,
it: Option<WalkEventIter>,
ig_root: Ignore,
ig: Ignore, // Current ignore state - changes as we traverse
max_filesize: Option<u64>,
skip: Option<Arc<Handle>>,
filter: Option<Filter>,
}
impl Walk {
fn skip_entry(&self, ent: &DirEntry) -> Result<bool, Error> {
if ent.depth() == 0 {
return Ok(false); // Never skip root entries
}
// CRITICAL: Trivial skipping BEFORE expensive filesystem operations
// This prevents unnecessary file downloads on remote filesystems
if should_skip_entry(&self.ig, ent) {
return Ok(true);
}
if let Some(ref stdout) = self.skip {
if path_equals(ent, stdout)? {
return Ok(true);
}
}
// File size check requires metadata - do it last
if self.max_filesize.is_some() && !ent.is_dir() {
return Ok(skip_filesize(
self.max_filesize.unwrap(),
ent.path(),
&ent.metadata().ok(),
));
}
Ok(false)
}
}
impl Iterator for Walk {
type Item = Result<DirEntry, Error>;
fn next(&mut self) -> Option<Result<DirEntry, Error>> {
loop {
let ev = match self.it.as_mut().and_then(|it| it.next()) {
Some(ev) => ev,
None => {
// Move to next root path
match self.its.next() {
None => return None,
Some((_, None)) => {
return Some(Ok(DirEntry::new_stdin()));
}
Some((path, Some(it))) => {
self.it = Some(it);
if path.is_dir() {
// Add parent ignore rules for new root
let (ig, err) = self.ig_root.add_parents(path);
self.ig = ig;
if let Some(err) = err {
return Some(Err(err));
}
}
}
}
continue;
}
};
match ev {
Err(err) => return Some(Err(Error::from_walkdir(err))),
Ok(WalkEvent::Exit) => {
// Pop ignore rules when leaving directory
self.ig = self.ig.parent().unwrap();
}
Ok(WalkEvent::Dir(ent)) => {
// Push new ignore rules when entering directory
let (igtmp, err) = self.ig.add_child(ent.path());
self.ig = igtmp;
// ... return directory entry
}
Ok(WalkEvent::File(ent)) => {
// ... handle file entry
}
}
}
}
}
The ig field maintains the current ignore matcher state. It grows when entering directories (via add_child) and shrinks when exiting (via parent()). The skip_entry method orders checks from cheapest to most expensive.
Section 5: WalkEvent and Directory Structure¶
/// Transforms flat walkdir stream into structured enter/exit events
struct WalkEventIter {
depth: usize,
it: walkdir::IntoIter,
next: Option<Result<walkdir::DirEntry, walkdir::Error>>, // Lookahead buffer
}
#[derive(Debug)]
enum WalkEvent {
Dir(walkdir::DirEntry), // Entering a directory
File(walkdir::DirEntry), // A file (non-directory)
Exit, // Leaving a directory
}
impl Iterator for WalkEventIter {
type Item = walkdir::Result<WalkEvent>;
fn next(&mut self) -> Option<walkdir::Result<WalkEvent>> {
// Take from lookahead buffer, or get next from underlying iterator
let dent = self.next.take().or_else(|| self.it.next());
let depth = match dent {
None => 0,
Some(Ok(ref dent)) => dent.depth(),
Some(Err(ref err)) => err.depth(),
};
// Key insight: if depth decreased, we've finished a directory
if depth < self.depth {
self.depth -= 1;
self.next = dent; // Stash for next call
return Some(Ok(WalkEvent::Exit)); // Synthesize exit event
}
self.depth = depth;
match dent {
None => None,
Some(Err(err)) => Some(Err(err)),
Some(Ok(dent)) => {
if walkdir_is_dir(&dent) {
self.depth += 1;
Some(Ok(WalkEvent::Dir(dent)))
} else {
Some(Ok(WalkEvent::File(dent)))
}
}
}
}
}
The lookahead-and-stash pattern: when depth decreases, we save the current entry in self.next, emit a synthetic Exit event, and return the stashed entry on the subsequent call. This transforms the flat stream into a structured tree traversal.
Section 6: WalkState and Parallel Visitor Traits¶
/// Control flow for parallel traversal
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum WalkState {
Continue, // Keep walking
Skip, // Don't descend into this directory
Quit, // Stop all workers as soon as possible
}
/// Builder creates per-thread visitors - no sharing needed
pub trait ParallelVisitorBuilder<'s> {
fn build(&mut self) -> Box<dyn ParallelVisitor + 's>;
}
/// Each thread gets its own visitor instance
pub trait ParallelVisitor: Send {
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState;
}
// Convenience wrapper for closure-based visitors
struct FnBuilder<F> {
builder: F,
}
impl<'s, F: FnMut() -> FnVisitor<'s>> ParallelVisitorBuilder<'s>
for FnBuilder<F>
{
fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
let visitor = (self.builder)(); // Call factory to create visitor
Box::new(FnVisitorImp { visitor })
}
}
type FnVisitor<'s> =
Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's>;
The ParallelVisitorBuilder creates separate visitors for each thread, enabling lock-free accumulation of results in thread-local storage. The 's lifetime parameter allows visitors to borrow from their environment.
Section 7: The Work-Stealing Stack¶
/// A work-stealing stack
#[derive(Debug)]
struct Stack {
index: usize, // This thread's index
deque: Deque<Message>, // Thread-local work stack
stealers: Arc<[Stealer<Message>]>, // Access to other threads' work
}
impl Stack {
/// Create stacks for all threads, distribute initial work
fn new_for_each_thread(threads: usize, init: Vec<Message>) -> Vec<Stack> {
// LIFO ensures depth-first traversal - critical for memory usage
let deques: Vec<Deque<Message>> =
std::iter::repeat_with(Deque::new_lifo).take(threads).collect();
let stealers = Arc::<[Stealer<Message>]>::from(
deques.iter().map(Deque::stealer).collect::<Vec<_>>(),
);
let stacks: Vec<Stack> = deques
.into_iter()
.enumerate()
.map(|(index, deque)| Stack {
index,
deque,
stealers: stealers.clone(),
})
.collect();
// Round-robin distribution of initial work
init.into_iter()
.rev() // Reverse to cancel LIFO effect
.zip(stacks.iter().cycle())
.for_each(|(m, s)| s.push(m));
stacks
}
fn pop(&self) -> Option<Message> {
// Try local first, then steal from others
self.deque.pop().or_else(|| self.steal())
}
fn steal(&self) -> Option<Message> {
// Fairness: steal from index+1, index+2, ..., wrap to 0, 1, ...
let (left, right) = self.stealers.split_at(self.index);
let right = &right[1..]; // Don't steal from ourselves
right
.iter()
.chain(left.iter())
.map(|s| s.steal_batch_and_pop(&self.deque))
.find_map(|s| s.success())
}
}
The crossbeam_deque crate provides the work-stealing primitives. Using new_lifo() ensures depth-first traversal, which dramatically reduces memory usage on wide directory trees. The steal operation takes a batch to amortize synchronization overhead.
Section 8: Worker Thread Implementation¶
struct Worker<'s> {
visitor: Box<dyn ParallelVisitor + 's>,
stack: Stack,
quit_now: Arc<AtomicBool>, // Global quit signal
active_workers: Arc<AtomicUsize>, // Termination detection
max_depth: Option<usize>,
// ... other configuration
}
impl<'s> Worker<'s> {
fn run(mut self) {
while let Some(work) = self.get_work() {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
}
}
fn get_work(&mut self) -> Option<Work> {
let mut value = self.recv();
loop {
// Priority channel simulation: quit beats all other messages
if self.is_quit_now() {
value = Some(Message::Quit)
}
match value {
Some(Message::Work(work)) => return Some(work),
Some(Message::Quit) => {
// Propagate quit to wake other threads (domino effect)
self.send_quit();
return None;
}
None => {
// No work available - check if we're done
if self.deactivate_worker() == 0 {
// All workers idle + all deques empty = done
self.send_quit();
return None;
}
// Wait for work to appear
loop {
if let Some(v) = self.recv() {
self.activate_worker();
value = Some(v);
break;
}
// Sleep to avoid CPU spin
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
}
}
}
/// Deactivates a worker and returns count of still-active workers
fn deactivate_worker(&self) -> usize {
self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
}
fn activate_worker(&self) {
self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}
Termination detection uses an atomic counter: when a worker finds no work, it decrements active_workers. If this returns 0, all workers are simultaneously idle with empty deques, meaning work is truly exhausted. The Acquire/Release ordering ensures proper synchronization.
Section 9: Directory Processing and Work Generation¶
struct Work {
dent: DirEntry,
ignore: Ignore, // Ignore rules for this directory's parents
root_device: Option<u64>, // For same_file_system check
}
impl Work {
fn read_dir(&mut self) -> Result<fs::ReadDir, Error> {
let readdir = match fs::read_dir(self.dent.path()) {
Ok(readdir) => readdir,
Err(err) => {
return Err(Error::from(err)
.with_path(self.dent.path())
.with_depth(self.dent.depth()));
}
};
// Add ignore rules from this directory's .gitignore etc
let (ig, err) = self.ignore.add_child(self.dent.path());
self.ignore = ig;
self.dent.err = err; // Attach parsing error to entry
Ok(readdir)
}
}
impl<'s> Worker<'s> {
fn generate_work(
&mut self,
ig: &Ignore,
depth: usize,
root_device: Option<u64>,
result: Result<fs::DirEntry, io::Error>,
) -> WalkState {
let fs_dent = match result {
Ok(fs_dent) => fs_dent,
Err(err) => {
return self.visitor.visit(Err(Error::from(err).with_depth(depth)));
}
};
let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) {
Ok(dent) => DirEntry::new_raw(dent, None),
Err(err) => return self.visitor.visit(Err(err)),
};
// Handle symlinks - potentially resolve and check for loops
if self.follow_links && dent.file_type().map_or(false, |ft| ft.is_symlink()) {
// ... symlink resolution code
if dent.is_dir() {
if let Err(err) = check_symlink_loop(ig, dent.path(), depth) {
return self.visitor.visit(Err(err));
}
}
}
// CRITICAL: Ignore check BEFORE expensive operations
if should_skip_entry(ig, &dent) {
return WalkState::Continue;
}
// Additional checks: stdout skip, filesize, filter
// ...
// Push work for later processing
self.send(Work { dent, ignore: ig.clone(), root_device });
WalkState::Continue
}
}
Each directory entry goes through: raw entry construction, symlink handling (with loop detection), ignore matching, and finally work submission. The ignore check happens early to avoid expensive metadata operations on files that will be skipped anyway.
Quick Reference¶
Key Types¶
| Type | Purpose |
|---|---|
DirEntry |
Unified directory entry with attached error |
DirEntryInner |
Three-variant enum: Stdin, Walkdir, Raw |
DirEntryRaw |
Platform-specific raw entry for parallel iteration |
WalkBuilder |
Configurable builder for both iterator types |
Walk |
Sequential iterator (implements Iterator) |
WalkParallel |
Parallel traversal (callback-based, not Iterator) |
WalkState |
Control flow: Continue, Skip, Quit |
Stack |
Work-stealing deque wrapper |
Worker |
Per-thread traversal worker |
Work |
Unit of work: entry + ignore rules + device |
Platform Differences¶
Unix:
- DirEntryRaw stores: path, type, depth, inode
- Inode from readdir (cheap)
- Metadata requires stat call
Windows:
- DirEntryRaw stores: path, type, depth, full metadata
- Metadata from directory read (free)
- No inode concept
Traversal Order Priority¶
1. Glob overrides (whitelist/ignore)
2. Ignore files (.ignore > .gitignore > .git/info/exclude > global)
3. File type matcher
4. Hidden file check
5. File size limit
6. Yielded to caller