multithreaded scan

This commit is contained in:
🪞👃🪞 2025-03-11 15:24:15 +02:00
parent 928d38bfaa
commit a0f1577744
2 changed files with 76 additions and 37 deletions

View file

@ -5,6 +5,8 @@ use std::sync::{Arc, RwLock};
use std::path::{Path, PathBuf};
use std::env::{current_dir, set_current_dir};
use std::fs::read;
use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration;
use tek_tui::*;
use tek_tui::tek_output::*;
@ -12,7 +14,7 @@ use tek_tui::tek_input::*;
use crate::crossterm::event::{Event, KeyEvent, KeyCode, KeyModifiers, KeyEventState, KeyEventKind};
use clap::{arg, command, value_parser};
use walkdir::WalkDir;
use walkdir::{WalkDir, DirEntry};
use xxhash_rust::xxh3::xxh3_64;
use file_type::FileType;
@ -29,9 +31,9 @@ fn cli () -> clap::Command {
command!()
.arg(arg!([path] "Path to root directory")
.value_parser(value_parser!(PathBuf)))
//.arg(arg!(-j --threads <N> "Number of indexing threads")
//.required(false)
//.value_parser(value_parser!(usize)))
.arg(arg!(-j --threads <N> "Number of indexing threads")
.required(false)
.value_parser(value_parser!(usize)))
}
fn main () -> Usually<()> {
@ -41,7 +43,60 @@ fn main () -> Usually<()> {
} else {
current_dir()?
};
let threads = args.get_one::<usize>("threads").map(|x|*x).unwrap_or_default().max(1);
set_current_dir(&path)?;
let state = Arc::new(RwLock::new(Taggart::new(&path)?));
Tui::new()?.run(&state)
let results = collect(&path, threads)?;
if let Ok(results) = Arc::try_unwrap(results) {
let mut results = results.into_inner()?;
results.sort();
let state = Arc::new(RwLock::new(Taggart::new(&path, results)?));
return Tui::new()?.run(&state)
} else {
panic!("read did not finish")
}
Ok(())
}
fn collect (root: &impl AsRef<Path>, thread_count: usize) -> Usually<Arc<RwLock<Vec<Entry>>>> {
let results = Arc::new(RwLock::new(vec![]));
let entries = Arc::new(RwLock::new(WalkDir::new(&root).into_iter()
.filter_entry(|e|!e.file_name().to_str().map(|s|s.starts_with(".")).unwrap_or(false))));
let mut threads: Vec<JoinHandle<()>> = vec![];
for thread_id in (0..thread_count).map(|x|x+1) {
threads.push({
let root = root.as_ref().to_path_buf().clone();
let results = results.clone();
let entries = entries.clone();
spawn(move || loop {
let (path, depth) = {
let entry = entries.write().unwrap().next();
if let Some(entry) = entry {
let entry = entry.expect("failed to walk entry");
let path = entry.path().to_path_buf();
let depth = entry.depth();
(path, depth)
} else {
break
}
};
if depth > 0 {
let short_path = path.strip_prefix(root.as_path())
.expect("failed to strip prefix");
println!("(thread {thread_id}) {}", short_path.display());
if let Ok(Some(entry)) = Entry::new(&root, &path, depth) {
results.write().unwrap().push(entry);
}
}
})
});
}
let timer = Duration::from_millis(100);
spawn(move || loop {
if threads.iter().all(|x|x.is_finished()) {
break
} else {
sleep(timer)
}
}).join();
Ok(results)
}