Skip to content

Commit

Permalink
feat: copy to FASTA file (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck authored May 27, 2024
1 parent ded22a7 commit ab5cda4
Show file tree
Hide file tree
Showing 33 changed files with 1,059 additions and 362 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ on:
- main
tags-ignore:
- '*'
paths:
- 'exon-r/**'

jobs:
build:
Expand Down
67 changes: 39 additions & 28 deletions exon-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use clap::{Parser, Subcommand};
use datafusion::{
datasource::file_format::file_compression_type::FileCompressionType,
prelude::{col, lit, SessionContext},
prelude::{col, lit},
};
use exon::{
datasources::{
fasta::table_provider::ListingFASTATableOptions,
mzml::table_provider::ListingMzMLTableOptions,
},
new_exon_config, ExonRuntimeEnvExt, ExonSessionExt,
new_exon_config, ExonRuntimeEnvExt, ExonSession,
};
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
Expand Down Expand Up @@ -111,12 +111,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(Commands::VCFQuery { path, region }) => {
let path = path.as_str();

let ctx = SessionContext::new_exon();
ctx.runtime_env()
let ctx = ExonSession::new_exon();
ctx.session
.runtime_env()
.exon_register_object_store_uri(path)
.await?;

ctx.sql(
ctx.session.sql(
format!(
"CREATE EXTERNAL TABLE vcf_file STORED AS INDEXED_VCF COMPRESSION TYPE GZIP LOCATION '{}';",
path
Expand All @@ -126,29 +127,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let df = ctx
.sql(format!("SELECT chrom, pos, array_to_string(id, ':') AS id FROM vcf_file WHERE vcf_region_filter('{}', chrom, pos) = true;", region).as_str())
.session.sql(format!("SELECT chrom, pos, array_to_string(id, ':') AS id FROM vcf_file WHERE vcf_region_filter('{}', chrom, pos) = true;", region).as_str())
.await?;

let cnt = df.count().await?;
eprintln!("Count: {}", cnt);
}
Some(Commands::BAMScan { path }) => {
let path = path.as_str();
let ctx = SessionContext::new_exon();
ctx.runtime_env()
let ctx = ExonSession::new_exon();
ctx.session
.runtime_env()
.exon_register_object_store_uri(path)
.await?;

ctx.sql(
format!(
"CREATE EXTERNAL TABLE bam STORED AS BAM LOCATION '{}';",
path
ctx.session
.sql(
format!(
"CREATE EXTERNAL TABLE bam STORED AS BAM LOCATION '{}';",
path
)
.as_str(),
)
.as_str(),
)
.await?;
.await?;

let df = ctx.sql("SELECT COUNT(*) FROM bam").await?.collect().await?;
let df = ctx
.session
.sql("SELECT COUNT(*) FROM bam")
.await?
.collect()
.await?;

assert!(df.len() == 1);

Expand All @@ -158,21 +166,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let path = path.as_str();
let region = region.as_str();

let ctx = SessionContext::new_exon();
ctx.runtime_env()
let ctx = ExonSession::new_exon();
ctx.session
.runtime_env()
.exon_register_object_store_uri(path)
.await?;

ctx.sql(
format!(
"CREATE EXTERNAL TABLE bam STORED AS INDEXED_BAM LOCATION '{}';",
path
ctx.session
.sql(
format!(
"CREATE EXTERNAL TABLE bam STORED AS INDEXED_BAM LOCATION '{}';",
path
)
.as_str(),
)
.as_str(),
)
.await?;
.await?;

let df = ctx
.session
.sql(
format!(
"SELECT reference FROM bam WHERE bam_region_filter('{}', reference, start, end) = true;",
Expand All @@ -189,7 +200,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Some(Commands::FASTACodonScan { path, compression }) => {
let options = ListingFASTATableOptions::new(compression.unwrap());

let ctx = SessionContext::new_exon();
let ctx = ExonSession::new_exon();

let df = ctx.read_fasta(path, options).await?;

Expand All @@ -199,7 +210,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
Some(Commands::FASTAScanParallel { path, workers }) => {
let exon_config = new_exon_config().with_target_partitions(*workers);
let ctx = SessionContext::with_config_exon(exon_config);
let ctx = ExonSession::with_config_exon(exon_config);

let options = ListingFASTATableOptions::default();

Expand All @@ -216,7 +227,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let compression = compression.unwrap_or(FileCompressionType::UNCOMPRESSED);
let options = ListingMzMLTableOptions::new(compression);

let ctx = SessionContext::new_exon();
let ctx = ExonSession::new_exon();

let df = ctx.read_mzml(path, options).await?;
let count = df.count().await?;
Expand Down
7 changes: 4 additions & 3 deletions exon-examples/examples/gff_annotation_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
use arrow::util::pretty::pretty_format_batches;
use datafusion::error::DataFusionError;
use datafusion::prelude::*;
use exon::ExonSessionExt;
use exon::ExonSession;

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let config = SessionConfig::new()
.with_target_partitions(4)
.with_repartition_file_scans(true);

let ctx = SessionContext::with_config_exon(config);
let ctx = ExonSession::with_config_exon(config);

let path = "./exon-examples/data/Ga0604745_crt.gff";
let sql = format!("CREATE EXTERNAL TABLE gff STORED AS GFF LOCATION '{path}';",);

ctx.sql(&sql).await?;
ctx.session.sql(&sql).await?;

let df = ctx
.session
.sql(
r#"SELECT crispr.seqname, crispr.start, crispr.end, repeat.start, repeat.end
FROM (SELECT * FROM gff WHERE type = 'CRISPR') AS crispr
Expand Down
8 changes: 4 additions & 4 deletions exon-examples/examples/mzml_querying.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
use arrow::util::pretty::pretty_format_batches;
use datafusion::error::DataFusionError;
use datafusion::prelude::*;
use exon::ExonSessionExt;
use exon::ExonSession;

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let ctx = SessionContext::new_exon();
let ctx = ExonSession::new_exon();

// From GNPS, create a table.
let path = "./exon-examples/data/GNPS00002_A3_p.mzML";
let sql = format!("CREATE EXTERNAL TABLE mzml STORED AS MZML LOCATION '{path}';",);
ctx.sql(&sql).await?;
ctx.session.sql(&sql).await?;

// Query the table, select the scan id where the spectrum contains a peak of interest.
let df = ctx
.session
.sql(
r#"SELECT id
FROM mzml
Expand Down
2 changes: 1 addition & 1 deletion exon-r/exonr/src/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct ExonSessionContext {
impl Default for ExonSessionContext {
fn default() -> Self {
Self {
ctx: SessionContext::new_exon(),
ctx: ExonSession::new_exon(),
runtime: Runtime::new().unwrap(),
}
}
Expand Down
11 changes: 5 additions & 6 deletions exon/exon-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

use clap::Parser;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionContext;
use datafusion_cli::exec;
use datafusion_cli::print_format::PrintFormat;
use datafusion_cli::print_options::{MaxRows, PrintOptions};
use exon::{new_exon_config, ExonSessionExt};
use exon::{new_exon_config, ExonSession};
use tracing_subscriber::{EnvFilter, FmtSubscriber};

#[derive(Debug, Parser, PartialEq)]
Expand Down Expand Up @@ -63,7 +62,7 @@ pub async fn main() -> Result<()> {
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let config = new_exon_config();
let mut ctx = SessionContext::with_config_exon(config);
let mut ctx = ExonSession::with_config_exon(config);

let mut print_options = PrintOptions {
format: args.format,
Expand All @@ -76,17 +75,17 @@ pub async fn main() -> Result<()> {
let files = args.file;

if commands.is_empty() && files.is_empty() {
return exec::exec_from_repl(&mut ctx, &mut print_options)
return exec::exec_from_repl(&mut ctx.session, &mut print_options)
.await
.map_err(|e| DataFusionError::External(Box::new(e)));
}

if !commands.is_empty() {
exec::exec_from_commands(&mut ctx, commands, &print_options).await?;
exec::exec_from_commands(&mut ctx.session, commands, &print_options).await?;
}

if !files.is_empty() {
exec::exec_from_files(&mut ctx, files, &print_options).await?;
exec::exec_from_files(&mut ctx.session, files, &print_options).await?;
}

Ok(())
Expand Down
24 changes: 12 additions & 12 deletions exon/exon-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ impl ConfigExtension for ExonConfigExtension {

#[cfg(test)]
mod tests {
use datafusion::prelude::SessionContext;

use crate::{config::ExonConfigExtension, new_exon_config, ExonSessionExt};
use crate::{config::ExonConfigExtension, new_exon_config, ExonSession};

#[tokio::test]
async fn test_config_set_with_defaults() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -157,19 +155,21 @@ mod tests {

#[tokio::test]
async fn test_setting_config_through_sql() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();
let ctx = ExonSession::new_exon();

ctx.sql("SET exon.vcf_parse_info = true").await?;
ctx.sql("SET exon.vcf_parse_formats = true").await?;
ctx.sql("SET exon.fasta_sequence_buffer_capacity = 1024")
ctx.session.sql("SET exon.vcf_parse_info = true").await?;
ctx.session.sql("SET exon.vcf_parse_formats = true").await?;
ctx.session
.sql("SET exon.fasta_sequence_buffer_capacity = 1024")
.await?;
ctx.sql("SET exon.sam_parse_tags = true").await?;
ctx.sql("SET exon.bam_parse_tags = true").await?;
ctx.sql("SET exon.cram_parse_tags = true").await?;
ctx.sql("SET exon.fasta_sequence_data_type = 'large_utf8'")
ctx.session.sql("SET exon.sam_parse_tags = true").await?;
ctx.session.sql("SET exon.bam_parse_tags = true").await?;
ctx.session.sql("SET exon.cram_parse_tags = true").await?;
ctx.session
.sql("SET exon.fasta_sequence_data_type = 'large_utf8'")
.await?;

let state = ctx.state();
let state = ctx.session.state();
let exon_config = state
.config()
.options()
Expand Down
15 changes: 7 additions & 8 deletions exon/exon-core/src/datasources/exon_listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,12 @@ impl TableProviderFactory for ExonListingTableFactory {
mod tests {
use std::{path::PathBuf, sync::Arc};

use datafusion::{
catalog::{listing_schema::ListingSchemaProvider, CatalogProvider, MemoryCatalogProvider},
prelude::SessionContext,
use datafusion::catalog::{
listing_schema::ListingSchemaProvider, CatalogProvider, MemoryCatalogProvider,
};
use object_store::local::LocalFileSystem;

use crate::{datasources::ExonListingTableFactory, ExonSessionExt};
use crate::{datasources::ExonListingTableFactory, ExonSession};

#[tokio::test]
async fn test_in_catalog() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -435,12 +434,12 @@ mod tests {
// let session_config = SessionConfig::from_env()?;
// let runtime_env = create_runtime_env()?;
// let ctx = SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let ctx = SessionContext::new_exon();
let ctx = ExonSession::new_exon();

ctx.register_catalog("exon", Arc::new(mem_catalog));
ctx.refresh_catalogs().await?;
ctx.session.register_catalog("exon", Arc::new(mem_catalog));
ctx.session.refresh_catalogs().await?;

let gotten_catalog = ctx.catalog("exon").ok_or("No catalog found")?;
let gotten_catalog = ctx.session.catalog("exon").ok_or("No catalog found")?;
let schema_names = gotten_catalog.schema_names();
assert_eq!(schema_names, vec!["exon"]);

Expand Down
13 changes: 5 additions & 8 deletions exon/exon-core/src/datasources/fastq/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,18 @@ impl<T: ExonListingOptions + 'static> TableProvider for ListingFASTQTable<T> {
mod tests {
use std::collections::HashMap;

use datafusion::{
datasource::file_format::file_compression_type::FileCompressionType,
prelude::SessionContext,
};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use exon_test::test_listing_table_url;

use crate::{
datasources::{ExonFileType, ExonListingTableFactory},
ExonSessionExt,
ExonSession,
};

#[tokio::test]
async fn test_table_scan() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();
let session_state = ctx.state();
let ctx = ExonSession::new_exon();
let session_state = ctx.session.state();

let table_path = test_listing_table_url("fastq");
let table = ExonListingTableFactory::new()
Expand All @@ -257,7 +254,7 @@ mod tests {
)
.await?;

let df = ctx.read_table(table)?;
let df = ctx.session.read_table(table)?;

let mut row_cnt = 0;
let bs = df.collect().await?;
Expand Down
13 changes: 5 additions & 8 deletions exon/exon-core/src/datasources/fcs/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,16 @@ impl ExecutionPlan for FCSScan {
mod tests {
use std::collections::HashMap;

use crate::{datasources::ExonListingTableFactory, ExonSessionExt};
use crate::{datasources::ExonListingTableFactory, ExonSession};

use datafusion::{
datasource::file_format::file_compression_type::FileCompressionType,
prelude::SessionContext,
};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;

use exon_test::test_listing_table_url;

#[tokio::test]
async fn test_fcs_read() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();
let session_state = ctx.state();
let ctx = ExonSession::new_exon();
let session_state = ctx.session.state();

let table_path = test_listing_table_url("fcs");

Expand All @@ -186,7 +183,7 @@ mod tests {
)
.await?;

let df = ctx.read_table(table)?;
let df = ctx.session.read_table(table)?;

let mut row_cnt = 0;
let bs = df.collect().await?;
Expand Down
Loading

0 comments on commit ab5cda4

Please sign in to comment.