Skip to content

Commit

Permalink
feat: add vcf region filter (#201)
Browse files Browse the repository at this point in the history
* feat: add vcf region filter
* fix: fix sql test
  • Loading branch information
tshauck authored Oct 3, 2023
1 parent e66b8a2 commit 05f2d45
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 1,094 deletions.
17 changes: 9 additions & 8 deletions exon-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use datafusion::{
prelude::{col, lit, SessionContext},
};
use exon::{new_exon_config, ExonRuntimeEnvExt, ExonSessionExt};
use noodles::core::Region;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;

Expand Down Expand Up @@ -100,22 +99,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match &cli.command {
Some(Commands::VCFQuery { path, region }) => {
let path = path.as_str();
let region: Region = region.parse().unwrap();

let ctx = SessionContext::new_exon();
ctx.runtime_env()
.exon_register_object_store_uri(path)
.await
.unwrap();

ctx.register_vcf_file("vcf_file", path).await?;

let chrom = region.name();
let start = region.interval().start().unwrap();
let end = region.interval().end().unwrap();
ctx.sql(
format!(
"CREATE EXTERNAL TABLE vcf_file STORED AS INDEXED_VCF COMPRESSION TYPE GZIP LOCATION '{}';",
path
)
.as_str(),
)
.await?;

let df = ctx
.sql(format!("SELECT chrom, pos, array_to_string(id, ':') AS id FROM vcf_file WHERE chrom = '{}' and pos BETWEEN {} and {}", chrom, start, end).as_str())
.sql(format!("SELECT chrom, pos, array_to_string(id, ':') AS id FROM vcf_file WHERE vcf_region_filter('{}', chrom, pos) = true;", region).as_str())
.await?;

let cnt = df.count().await?;
Expand Down
21 changes: 9 additions & 12 deletions exon/src/context/exon_session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ use crate::{
ExonFileType, ExonListingTableFactory,
},
new_exon_config,
physical_optimizer::region_between_rewriter::RegionBetweenRule,
physical_optimizer::{
file_repartitioner::ExonRoundRobin, interval_optimizer_rule::ExonIntervalOptimizer,
physical_optimizer::file_repartitioner::ExonRoundRobin,
udfs::{
bam_region_filter::register_bam_region_filter_udf,
vcf::vcf_region_filter::register_vcf_region_filter_udf,
},
udfs::bam_region_filter::register_bam_udf,
};

/// Extension trait for [`SessionContext`] that adds Exon-specific functionality.
Expand Down Expand Up @@ -104,7 +104,10 @@ pub trait ExonSessionExt {
}

// Register BAM region filter UDF
register_bam_udf(&ctx);
register_bam_region_filter_udf(&ctx);

// Register VCF region filter UDF
register_vcf_region_filter_udf(&ctx);

ctx
}
Expand All @@ -126,15 +129,9 @@ pub trait ExonSessionExt {
/// Create a new Exon based [`SessionContext`] with the given config and runtime.
fn with_config_rt_exon(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> SessionContext {
let round_robin_optimizer = ExonRoundRobin::default();
let region_between_optimizer = RegionBetweenRule::default();
let interval_region_optimizer = ExonIntervalOptimizer::default();

let mut state = SessionState::with_config_rt(config, runtime)
.with_physical_optimizer_rules(vec![
Arc::new(round_robin_optimizer),
Arc::new(region_between_optimizer),
Arc::new(interval_region_optimizer),
]);
.with_physical_optimizer_rules(vec![Arc::new(round_robin_optimizer)]);

let sources = vec![
"BAM",
Expand Down
4 changes: 2 additions & 2 deletions exon/src/datasources/bam/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ impl TableProvider for ListingBAMTable {

if regions.len() > 1 {
return Err(DataFusionError::Execution(
"Multiple regions are not supported".to_string(),
"Multiple regions are not supported yet".to_string(),
));
}

if regions.is_empty() && self.options.indexed {
return Err(DataFusionError::Execution(
"Indexed BAM requires a region filter".to_string(),
"INDEXED_BAM table type requires a region filter".to_string(),
));
}

Expand Down
Loading

0 comments on commit 05f2d45

Please sign in to comment.