Skip to content

Commit

Permalink
feat: add fcs method
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck committed Apr 23, 2024
1 parent cbda9e5 commit c9fe44f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl ExonListingTableFactory {

let table_schema = options.infer_schema(state, &table_path).await?;

let config = ListingFCSTableConfig::new(table_path).with_options(options);
let config = ListingFCSTableConfig::new(table_path, options);
let table = ListingFCSTable::try_new(config, table_schema)?;

Ok(Arc::new(table))
Expand Down
41 changes: 16 additions & 25 deletions exon/exon-core/src/datasources/fcs/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,15 @@ use super::scanner::FCSScan;
pub struct ListingFCSTableConfig {
inner: ListingTableConfig,

options: Option<ListingFCSTableOptions>,
options: ListingFCSTableOptions,
}

impl ListingFCSTableConfig {
/// Create a new VCF listing table configuration
pub fn new(table_path: ListingTableUrl) -> Self {
pub fn new(table_path: ListingTableUrl, options: ListingFCSTableOptions) -> Self {
Self {
inner: ListingTableConfig::new(table_path),
options: None,
}
}

/// Set the options for the VCF listing table
pub fn with_options(self, options: ListingFCSTableOptions) -> Self {
Self {
options: Some(options),
..self
options,
}
}
}
Expand Down Expand Up @@ -174,22 +166,17 @@ impl ListingFCSTableOptions {
#[derive(Debug, Clone)]
/// A FCS listing table
pub struct ListingFCSTable {
table_paths: Vec<ListingTableUrl>,

table_schema: TableSchema,

options: ListingFCSTableOptions,
config: ListingFCSTableConfig,
}

impl ListingFCSTable {
/// Create a new FCS listing table
pub fn try_new(config: ListingFCSTableConfig, table_schema: TableSchema) -> Result<Self> {
Ok(Self {
table_paths: config.inner.table_paths,
table_schema,
options: config
.options
.ok_or_else(|| DataFusionError::Internal(String::from("Options must be set")))?,
config,
})
}
}
Expand All @@ -214,7 +201,7 @@ impl TableProvider for ListingFCSTable {
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters
.iter()
.map(|f| filter_matches_partition_cols(f, &self.options.table_partition_cols))
.map(|f| filter_matches_partition_cols(f, &self.config.options.table_partition_cols))
.collect())
}

Expand All @@ -225,7 +212,7 @@ impl TableProvider for ListingFCSTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let object_store_url = if let Some(url) = self.table_paths.first() {
let object_store_url = if let Some(url) = self.config.inner.table_paths.first() {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
Expand All @@ -236,10 +223,10 @@ impl TableProvider for ListingFCSTable {
let file_list = pruned_partition_list(
state,
&object_store,
&self.table_paths[0],
&self.config.inner.table_paths[0],
filters,
self.options.file_extension.as_str(),
&self.options.table_partition_cols,
self.config.options.file_extension.as_str(),
&self.config.options.table_partition_cols,
)
.await?
.try_collect::<Vec<_>>()
Expand All @@ -249,11 +236,15 @@ impl TableProvider for ListingFCSTable {
let file_scan_config =
FileScanConfigBuilder::new(object_store_url.clone(), file_schema, vec![file_list])
.projection_option(projection.cloned())
.table_partition_cols(self.options.table_partition_cols.clone())
.table_partition_cols(self.config.options.table_partition_cols.clone())
.limit_option(limit)
.build();

let plan = self.options.create_physical_plan(file_scan_config).await?;
let plan = self
.config
.options
.create_physical_plan(file_scan_config)
.await?;

Ok(plan)
}
Expand Down
7 changes: 4 additions & 3 deletions exon/exon-core/src/datasources/fcs/udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ impl TableFunctionImpl for FCSScanFunction {
Ok::<TableSchema, datafusion::error::DataFusionError>(schema)
})?;

let listing_table_config =
ListingFCSTableConfig::new(listing_scan_function.listing_table_url)
.with_options(listing_table_options);
let listing_table_config = ListingFCSTableConfig::new(
listing_scan_function.listing_table_url,
listing_table_options,
);

let listing_table = ListingFCSTable::try_new(listing_table_config, schema)?;

Expand Down
52 changes: 52 additions & 0 deletions exon/exon-core/src/session_context/exon_context_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ pub trait ExonSessionExt {
options: ListingCRAMTableOptions,
) -> Result<DataFrame, ExonError>;

/// Read an FCS file.
#[cfg(feature = "fcs")]
async fn read_fcs(
&self,
table_path: &str,
options: crate::datasources::fcs::table_provider::ListingFCSTableOptions,
) -> Result<DataFrame, ExonError>;

/// Read a GENBANK file.
#[cfg(feature = "genbank")]
async fn read_genbank(
Expand Down Expand Up @@ -370,6 +378,29 @@ impl ExonSessionExt for SessionContext {
Ok(table)
}

#[cfg(feature = "fcs")]
async fn read_fcs(
&self,
table_path: &str,
options: crate::datasources::fcs::table_provider::ListingFCSTableOptions,
) -> Result<DataFrame, ExonError> {
use crate::datasources::fcs::table_provider::ListingFCSTableConfig;

let table_path = ListingTableUrl::parse(table_path)?;

let table_schema = options.infer_schema(&self.state(), &table_path).await?;

let config = ListingFCSTableConfig::new(table_path, options);
let table = crate::datasources::fcs::table_provider::ListingFCSTable::try_new(
config,
table_schema,
)?;

let table = self.read_table(Arc::new(table))?;

Ok(table)
}

async fn read_cram(
&self,
table_path: &str,
Expand Down Expand Up @@ -889,6 +920,27 @@ mod tests {
Ok(())
}

#[cfg(feature = "fcs")]
#[tokio::test]
async fn test_read_fcs() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();

let fcs_path = exon_test::test_path("fcs", "Guava Muse.fcs");

let df = ctx
.read_fcs(
fcs_path.to_str().unwrap(),
crate::datasources::fcs::table_provider::ListingFCSTableOptions::new(
FileCompressionType::UNCOMPRESSED,
),
)
.await?;

assert_eq!(df.count().await?, 108);

Ok(())
}

#[tokio::test]
async fn test_read_fasta_gzip() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new_exon();
Expand Down

0 comments on commit c9fe44f

Please sign in to comment.