Skip to content

Commit

Permalink
fix: add rayon dependency
Browse files Browse the repository at this point in the history
fix: correct all clippy errors

fix: add CLI argument to setup the thread pool

fix: CLI handling

fix: wrong argument

docs(readme): add copyright notice (#35)

fix: parallelize also second pass

fix: correct output in `write` -> `write_all`

- `write` may fail an return the number of bytes written, which is
  the wrong function.

fix: remove from first-pass since no benefit
  • Loading branch information
gabyx committed Jul 15, 2024
1 parent 0b2dba1 commit 51cd62f
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 90 deletions.
48 changes: 47 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
bitflags = '2.5.0'
blake3 = '1.5.1'
io-enum = '1.1.3'
rayon = "1.10.0"
rio_api = '0.8.4'
rio_turtle = '0.8.4'
rstest = '0.21.0'
Expand Down
20 changes: 5 additions & 15 deletions src/crypto.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::model::Entity;
use crate::{model::TripleMask, rdf_types::*};
use blake3;

pub trait Pseudonymize {
// Pseudonymize parts of a triple set by its mask
Expand All @@ -26,9 +25,9 @@ pub trait Pseudonymize {

fn pseudo_entity(&self, e: &Entity) -> Entity {
match e {
Entity::Literal(l) => Entity::Literal(self.pseudo_literal(&l)),
Entity::NamedNode(n) => Entity::NamedNode(self.pseudo_named_node(&n)),
Entity::BlankNode(b) => Entity::BlankNode(self.pseudo_blank_node(&b)),
Entity::Literal(l) => Entity::Literal(self.pseudo_literal(l)),
Entity::NamedNode(n) => Entity::NamedNode(self.pseudo_named_node(n)),
Entity::BlankNode(b) => Entity::BlankNode(self.pseudo_blank_node(b)),
}
}
// private methods? Blanket implementations
Expand All @@ -42,17 +41,8 @@ pub trait Pseudonymize {
// return u.clone()
}

pub struct DefaultHasher {
hasher: blake3::Hasher,
}

impl DefaultHasher {
pub fn new() -> Self {
return DefaultHasher {
hasher: blake3::Hasher::new(),
};
}
}
#[derive(Default)]
pub struct DefaultHasher {}

impl Pseudonymize for DefaultHasher {
fn pseudo_named_node(&self, t: &NamedNode) -> NamedNode {
Expand Down
6 changes: 2 additions & 4 deletions src/io.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::rules::Rules;
use rio_turtle::NTriplesParser;
use serde_yml;
use std::{
boxed::Box,
fs::File,
io::{self, stdin, stdout, BufRead, BufReader, BufWriter, Write},
io::{self, stdin, stdout, BufRead, BufReader, BufWriter},
path::Path,
};

Expand Down Expand Up @@ -46,7 +44,7 @@ pub fn parse_ntriples(reader: impl BufRead) -> NTriplesParser<impl BufRead> {

// Parse yaml configuration file.
pub fn parse_config(path: &Path) -> Rules {
return match File::open(&path) {
return match File::open(path) {
Ok(file) => serde_yml::from_reader(file).expect("Error parsing config file."),
Err(e) => panic!("Cannot open file '{:?}': '{}'.", path, e),
};
Expand Down
1 change: 0 additions & 1 deletion src/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use slog::{self, o, Drain};
use slog_async;
use std::{io, sync::Arc};

pub type Logger = slog::Logger;
Expand Down
40 changes: 38 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
};

use clap::{Args, Parser, Subcommand};
use log::Logger;
use std::path::PathBuf;

#[derive(Parser)]
Expand All @@ -25,6 +26,12 @@ use std::path::PathBuf;
struct Cli {
#[command(subcommand)]
command: Subcommands,

/// Parallelize the convertion loops.
/// Specifies the number of threads to use.
/// A number of `0` will use all threads.
#[arg(short, long)]
parallel: Option<usize>,
}

#[derive(Args, Debug)]
Expand All @@ -35,7 +42,7 @@ struct IndexArgs {

/// File descriptor to read triples from.
/// Defaults to `stdin`.
#[arg(default_value = "-")]
#[arg(short, long, default_value = "-")]
input: PathBuf,
}

Expand Down Expand Up @@ -75,18 +82,47 @@ enum Subcommands {
Pseudo(PseudoArgs),
}

fn install_thread_pool(log: &Logger, threads: usize) {
let num_threads = if threads == 0 {
std::thread::available_parallelism()
.expect("Could not get available num. threads.")
.get()
} else {
threads
};

info!(log, "Installing thread pool with '{num_threads}' threads.");
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build_global()
.expect("Could not setup thread pool.");
}

fn main() {
let log = create_logger(false);
let cli = Cli::parse();

let mut parallelize = false;
if let Some(num_threads) = cli.parallel {
parallelize = true;
install_thread_pool(&log, num_threads);
}

match cli.command {
Subcommands::Index(args) => {
info!(log, "Args: {:?}", args);
create_type_map(&args.input, &args.output)
}
Subcommands::Pseudo(args) => {
info!(log, "Args: {:?}", args);
pseudonymize_graph(&log, &args.input, &args.config, &args.output, &args.index)
pseudonymize_graph(
&log,
&args.input,
&args.config,
&args.output,
&args.index,
parallelize,
)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/model.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::hash::Hash;

use crate::rdf_types::*;
use bitflags;

#[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum Entity {
Expand Down
38 changes: 23 additions & 15 deletions src/pass_first.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
use rio_api::{model::Triple, parser::TriplesParser};
use rio_api::parser::TriplesParser;
use rio_turtle::TurtleError;
use std::{
io::{stdin, BufRead, BufReader, Write},
path::Path,
use std::{io::Write, path::Path};

use crate::{
io,
rdf_types::{Triple, TripleView},
};

use crate::io;
fn index_triple(t: Triple, out: &mut impl Write) {
if t.predicate.iri.as_str() == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" {
let r = || -> std::io::Result<()> {
out.write_all(t.to_string().as_bytes())?;
out.write_all(b" .\n")
}();

fn index_triple(t: Triple, out: &mut impl Write) -> Result<(), TurtleError> {
match t.predicate.iri {
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type" => {
let _ = out.write(&format!("{} .\n", &t.to_string()).into_bytes());
if let Err(e) = r {
panic!("Error writting to out buffer: {e}");
}
_ => {}
}

Ok(())
}

pub fn create_type_map(input: &Path, output: &Path) {
let buf_in = io::get_reader(input);
let mut buf_out = io::get_writer(output);
let mut triples = io::parse_ntriples(buf_in);

while !triples.is_end() {
triples
.parse_step(&mut |t| index_triple(t, &mut buf_out))
.unwrap();
let _ = triples
.parse_step(&mut |t: TripleView| {
index_triple(t.into(), &mut buf_out);
Result::<(), TurtleError>::Ok(())
})
.inspect_err(|e| {
panic!("Parsing error occured: {e}");
});
}
}
Loading

0 comments on commit 51cd62f

Please sign in to comment.