Skip to content

Commit

Permalink
refactor: improve FASTA batch serialization add FASTQ (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck authored May 30, 2024
1 parent c42a129 commit 27642f9
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 37 deletions.
36 changes: 28 additions & 8 deletions exon/exon-core/src/physical_plan/planner/exon_extension_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::{str::FromStr, sync::Arc};

use async_trait::async_trait;
use datafusion::{
Expand All @@ -31,10 +31,11 @@ use datafusion::{
},
};
use exon_fasta::FASTASchemaBuilder;
use exon_fastq::new_fastq_schema_builder;

use crate::{
logical_plan::ExonDataSinkLogicalPlanNode, physical_plan::object_store::parse_url,
sinks::FASTADataSink,
datasources::ExonFileType, logical_plan::ExonDataSinkLogicalPlanNode,
physical_plan::object_store::parse_url, sinks::SimpleRecordSink,
};

pub struct ExomeExtensionPlanner {}
Expand Down Expand Up @@ -124,24 +125,43 @@ impl ExtensionPlanner for ExomeExtensionPlanner {

let p_file = PartitionedFile::new(path, 0);

let schema = FASTASchemaBuilder::default().build().file_schema().unwrap();
let stored_as = logical_node.stored_as.as_ref().ok_or_else(|| {
datafusion::error::DataFusionError::Plan(
"Stored as option is required for ExonDataSinkLogicalPlanNode".to_string(),
)
})?;
let exon_file_type = ExonFileType::from_str(stored_as)?;

let schema = match ExonFileType::from_str(stored_as)? {
ExonFileType::FASTA => FASTASchemaBuilder::default().build().file_schema().unwrap(),
ExonFileType::FASTQ => new_fastq_schema_builder().build().file_schema().unwrap(),
_ => {
return Err(datafusion::error::DataFusionError::Plan(
"Invalid file type".to_string(),
))
}
};

let file_sink_config = FileSinkConfig {
object_store_url,
file_groups: vec![p_file],
table_paths: vec![],
output_schema: schema,
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: false,
};

let compression_type = logical_node
.file_compression_type()?
.unwrap_or(FileCompressionType::UNCOMPRESSED);
let sink = Arc::new(FASTADataSink::new(file_sink_config, compression_type));
let sink_schema = FASTASchemaBuilder::default().build().file_schema().unwrap();

let data_sink = DataSinkExec::new(physical_plan, sink, sink_schema, None);
let sink = Arc::new(SimpleRecordSink::new(
file_sink_config,
compression_type,
exon_file_type,
));

let data_sink = DataSinkExec::new(physical_plan, sink, schema, None);

Ok(Some(Arc::new(data_sink)))
}
Expand Down
6 changes: 4 additions & 2 deletions exon/exon-core/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod fasta_data_sink;
mod columns_from_batch;
mod fasta_serializer;
mod fastq_serializer;
mod simple_record_sink;

pub(crate) use fasta_data_sink::FASTADataSink;
pub(crate) use simple_record_sink::SimpleRecordSink;
35 changes: 35 additions & 0 deletions exon/exon-core/src/sinks/columns_from_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 WHERE TRUE Technologies.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) fn get_array_column<'a, T>(
batch: &'a arrow::record_batch::RecordBatch,
column_name: &str,
) -> Result<&'a T, datafusion::error::DataFusionError>
where
T: arrow::array::Array + 'static,
{
if let Some(column) = batch.column_by_name(column_name) {
column.as_any().downcast_ref::<T>().ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"{} should be a string array",
column_name
))
})
} else {
Err(datafusion::error::DataFusionError::Execution(format!(
"{} column not found",
column_name
)))
}
}
23 changes: 6 additions & 17 deletions exon/exon-core/src/sinks/fasta_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::array::StringArray;
use bytes::Bytes;
use datafusion::datasource::file_format::write::BatchSerializer;
use noodles::fasta::{
record::{Definition, Sequence},
Record,
};

use super::columns_from_batch::get_array_column;

#[derive(Debug, Default)]
pub(crate) struct FASTASerializer {}

Expand All @@ -28,23 +31,9 @@ impl BatchSerializer for FASTASerializer {
batch: arrow::array::RecordBatch,
_initial: bool,
) -> datafusion::error::Result<bytes::Bytes> {
let ids = batch
.column(0)
.as_any()
.downcast_ref::<arrow::array::StringArray>()
.expect("ids should be a string array");

let descriptions = batch
.column(1)
.as_any()
.downcast_ref::<arrow::array::StringArray>()
.expect("descriptions should be a string array");

let sequences = batch
.column(2)
.as_any()
.downcast_ref::<arrow::array::StringArray>()
.expect("sequences should be a string array");
let ids = get_array_column::<StringArray>(&batch, "id")?;
let descriptions = get_array_column::<StringArray>(&batch, "description")?;
let sequences = get_array_column::<StringArray>(&batch, "sequence")?;

let b = Vec::new();
let mut fasta_writer = noodles::fasta::writer::Writer::new(b);
Expand Down
53 changes: 53 additions & 0 deletions exon/exon-core/src/sinks/fastq_serializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 WHERE TRUE Technologies.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::array::StringArray;
use bytes::Bytes;
use datafusion::datasource::file_format::write::BatchSerializer;

use super::columns_from_batch::get_array_column;

#[derive(Debug, Default)]
pub(crate) struct FASTQSerializer {}

impl BatchSerializer for FASTQSerializer {
fn serialize(
&self,
batch: arrow::array::RecordBatch,
_initial: bool,
) -> datafusion::error::Result<bytes::Bytes> {
let names = get_array_column::<StringArray>(&batch, "name")?;
let descriptions = get_array_column::<StringArray>(&batch, "description")?;
let sequences = get_array_column::<StringArray>(&batch, "sequence")?;
let quality_scores = get_array_column::<StringArray>(&batch, "quality_scores")?;

let b = Vec::new();
let mut fasta_writer = noodles::fastq::Writer::new(b);

for i in 0..batch.num_rows() {
let id = names.value(i);
let description = descriptions.value(i);
let sequence = sequences.value(i);
let quality_scores = quality_scores.value(i);

let definition = noodles::fastq::record::Definition::new(id, description);
let record = noodles::fastq::Record::new(definition, sequence, quality_scores);

fasta_writer.write_record(&record)?;
}

// todo benchmark this
Ok(Bytes::from(fasta_writer.get_ref().to_vec()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,39 @@ use datafusion::{
use futures::StreamExt;
use tokio::io::AsyncWriteExt;

use super::fasta_serializer::FASTASerializer;
use crate::datasources::ExonFileType;

pub struct FASTADataSink {
use super::{fasta_serializer::FASTASerializer, fastq_serializer::FASTQSerializer};

pub struct SimpleRecordSink {
file_compression_type: FileCompressionType,
file_sink_config: FileSinkConfig,
exon_file_type: ExonFileType,
}

impl FASTADataSink {
impl SimpleRecordSink {
pub fn new(
file_sink_config: FileSinkConfig,
file_compression_type: FileCompressionType,
exon_file_type: ExonFileType,
) -> Self {
Self {
file_sink_config,
file_compression_type,
exon_file_type,
}
}
}

use std::fmt::Debug;

impl Debug for FASTADataSink {
impl Debug for SimpleRecordSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FASTADataSync").finish()
}
}

impl DisplayAs for FASTADataSink {
impl DisplayAs for SimpleRecordSink {
fn fmt_as(
&self,
_display_type: DisplayFormatType,
Expand All @@ -64,7 +69,7 @@ impl DisplayAs for FASTADataSink {
}

#[async_trait::async_trait]
impl DataSink for FASTADataSink {
impl DataSink for SimpleRecordSink {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -94,7 +99,11 @@ impl DataSink for FASTADataSink {
.file_compression_type
.convert_async_writer(buf_writer)?;

let serializer = FASTASerializer::default();
let serializer: Arc<dyn BatchSerializer> = match self.exon_file_type {
ExonFileType::FASTA => Arc::new(FASTASerializer::default()),
ExonFileType::FASTQ => Arc::new(FASTQSerializer::default()),
_ => return Err(DataFusionError::Execution("Invalid file type".to_string())),
};

while let Some(batch) = data.next().await {
let batch = batch?;
Expand All @@ -115,7 +124,8 @@ impl DataSink for FASTADataSink {
#[cfg(test)]
mod tests {
use crate::datasources::fasta::table_provider::ListingFASTATableOptions;
use crate::sinks::FASTADataSink;
use crate::datasources::ExonFileType;
use crate::sinks::SimpleRecordSink;
use crate::ExonSession;

use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -153,7 +163,12 @@ mod tests {
overwrite: false,
};

let sink = FASTADataSink::new(file_sink_config, FileCompressionType::UNCOMPRESSED);
let exon_file_type = ExonFileType::FASTA;
let sink = SimpleRecordSink::new(
file_sink_config,
FileCompressionType::UNCOMPRESSED,
exon_file_type,
);

let total_bytes = sink
.write_all(stream, &ctx.session.task_ctx())
Expand Down
6 changes: 5 additions & 1 deletion exon/exon-core/src/sql/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use datafusion::sql::{
parser::{DFParser, Statement},
sqlparser::{keywords::Keyword, tokenizer::Token},
};

use crate::datasources::ExonFileType;

use super::exon_copy_statement::ExonCopyToStatement;

pub(crate) struct ExonParser<'a> {
Expand Down Expand Up @@ -52,7 +56,7 @@ impl ExonParser<'_> {

if let Statement::CopyTo(s) = &df_statement {
match &s.stored_as {
Some(v) if v == "FASTA" => Ok(ExonStatement::ExonCopyTo(
Some(v) if ExonFileType::from_str(v).is_ok() => Ok(ExonStatement::ExonCopyTo(
ExonCopyToStatement::from(s.clone()),
)),
_ => Ok(ExonStatement::DFStatement(Box::from(df_statement))),
Expand Down
15 changes: 15 additions & 0 deletions exon/exon-core/tests/sqllogictests/slt/fastq-copy-tests.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

statement ok
CREATE EXTERNAL TABLE fastq_table STORED AS FASTQ LOCATION '$CARGO_MANIFEST_DIR/test-data/datasources/fastq/test.fastq';

statement ok
COPY (SELECT * FROM fastq_table) TO '${__TEST_DIR__}test.fastq' STORED AS FASTQ;

query I
SELECT COUNT(*) FROM fastq_scan('${__TEST_DIR__}test.fastq');
----
2

statement ok
DROP TABLE fastq_table;

0 comments on commit 27642f9

Please sign in to comment.