Skip to content

Commit

Permalink
fix: add CLI argument to setup the thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
gabyx committed Jul 12, 2024
1 parent 60453c6 commit 96868c0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
28 changes: 27 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ use std::path::PathBuf;
struct Cli {
#[command(subcommand)]
command: Subcommands,

/// Parallelize the convertion loops.
/// Specifies the number of threads to use.
/// A number of `1` will use all threads.
/// By default the loop is not parallelized.
#[arg(short, long, default_value = "0")]
parallel: usize,
}

#[derive(Args, Debug)]
Expand Down Expand Up @@ -75,14 +82,33 @@ enum Subcommands {
Pseudo(PseudoArgs),
}

fn install_thread_pool(threads: usize) {
if threads == 0 {
return;
}

let num_threads = if threads == 1 {
rayon::max_num_threads()
} else {
threads
};

rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global()
.expect("Could not setup thread pool.");
}

fn main() {
let log = create_logger(false);
let cli = Cli::parse();

install_thread_pool(cli.parallel);

match cli.command {
Subcommands::Index(args) => {
info!(log, "Args: {:?}", args);
create_type_map(&args.input, &args.output)
create_type_map(&args.input, &args.output, cli.parallel != 0)
}
Subcommands::Pseudo(args) => {
info!(log, "Args: {:?}", args);
Expand Down
54 changes: 35 additions & 19 deletions src/pass_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,43 @@ fn index_triple(t: Triple, out: &mut impl Write) {
}
}

pub fn create_type_map(input: &Path, output: &Path) {
pub fn create_type_map(input: &Path, output: &Path, parallel: bool) {
let buf_in = io::get_reader(input);
let buf_out = Mutex::new(io::get_writer(output));
let triples = io::parse_ntriples(buf_in);
let mut buf_out = io::get_writer(output);
let mut triples = io::parse_ntriples(buf_in);

// Make a parallel triple iterator over `rdf_types::Triple`.
// We have to wrap the `buf_out` with a `Mutex` to make it
// writable by multiple threads.
// NOTE: Weird `rio_api::into_iter` implementation, why does it use a full-blown
// `Vec<T>`, this could be simpler.
//
let it = triples
.into_iter(|t: TripleView| Result::<Triple, TurtleError>::Ok(t.into()))
.par_bridge();
if parallel {
// Make a parallel triple iterator over `rdf_types::Triple`.
// We have to wrap the `buf_out` with a `Mutex` to make it
// writable by multiple threads.
// NOTE: Weird `rio_api::into_iter` implementation, why does it use a full-blown
// `Vec<T>`, this could be simpler.
//
let buf_out = Mutex::new(io::get_writer(output));
let it = triples
.into_iter(|t: TripleView| Result::<Triple, TurtleError>::Ok(t.into()))
.par_bridge();

// Iterate in parallel over the triples.
it.for_each(|t| match t {
Ok(t) => {
let mut guard = buf_out.lock().unwrap();
index_triple(t, guard.by_ref())
// Iterate in parallel over the triples.
it.for_each(|t| match t {
Ok(t) => {
let mut guard = buf_out.lock().unwrap();
index_triple(t, guard.by_ref())
}
Err(t) => panic!("Parsing error occured: {t}"),
});
} else {
// Run the loop single-threaded.
while !triples.is_end() {
let _ = triples
.parse_step(&mut |t: TripleView| {
index_triple(t.into(), &mut buf_out);

Result::<(), TurtleError>::Ok(())
})
.inspect_err(|e| {
panic!("Parsing error occured: {e}");
});
}
Err(t) => panic!("Parsing error occured: {t}"),
})
}
}

0 comments on commit 96868c0

Please sign in to comment.