Skip to content

Commit

Permalink
Improve docs for geoarrow::io
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 20, 2024
1 parent 21264b5 commit 9d8fadc
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 38 deletions.
8 changes: 8 additions & 0 deletions rust/geoarrow/src/io/csv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! Read from and write to CSV files.
//!
//! The CSV reader implements [`RecordBatchReader`], so you can iterate over the batches of the CSV
//! without materializing the entire file in memory.
//!
//! [`RecordBatchReader`]: arrow_array::RecordBatchReader
//!
//! Additionally, the CSV writer takes in a [`RecordBatchReader`], so you can write an Arrow
//! iterator to CSV without materializing all batches in memory at once.
//!
//! # Examples
//!
//! ```
Expand Down
6 changes: 6 additions & 0 deletions rust/geoarrow/src/io/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub struct CSVReaderOptions {
/// When `true`, the first row of the CSV file is treated as a header row
pub has_header: Option<bool>,

/// The maximum number of records to read for schema inference.
///
/// See [`arrow_csv::reader::Format::infer_schema`].
///
/// **By default, all rows are read to infer the CSV schema.**
pub max_records: Option<usize>,

/// Specify a custom delimiter character, defaults to comma `','`
Expand Down Expand Up @@ -119,6 +124,7 @@ pub struct CSVReader<R> {
}

impl<R> CSVReader<R> {
/// Access the schema of this reader
pub fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
Expand Down
4 changes: 2 additions & 2 deletions rust/geoarrow/src/io/csv/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::sync::Arc;

/// Write a Table to CSV
pub fn write_csv<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let mut stream: RecordBatchReader = stream.into();
let reader = stream.take().unwrap();
let stream: RecordBatchReader = stream.into();
let reader = stream.into_inner();

let mut csv_writer = arrow_csv::Writer::new(writer);
for batch in reader {
Expand Down
10 changes: 5 additions & 5 deletions rust/geoarrow/src/io/flatgeobuf/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ pub struct FlatGeobufWriterOptions {
pub detect_type: bool,
/// Convert single to multi geometries, if `geometry_type` is multi type or Unknown
pub promote_to_multi: bool,
// Dataset title
/// Dataset title
pub title: Option<String>,
// Dataset description (intended for free form long text)
/// Dataset description (intended for free form long text)
pub description: Option<String>,
// Dataset metadata (intended to be application specific and
/// Dataset metadata (intended to be application specific and
pub metadata: Option<String>,
/// A method for transforming CRS to WKT
///
Expand Down Expand Up @@ -119,7 +119,7 @@ pub fn write_flatgeobuf_with_options<W: Write, S: Into<RecordBatchReader>>(
) -> Result<()> {
let mut stream: RecordBatchReader = stream.into();

let schema = stream.schema()?;
let schema = stream.schema();
let fields = &schema.fields;
let geom_col_idxs = schema.as_ref().geometry_columns();
if geom_col_idxs.len() != 1 {
Expand All @@ -133,7 +133,7 @@ pub fn write_flatgeobuf_with_options<W: Write, S: Into<RecordBatchReader>>(
let wkt_crs_str = options.create_wkt_crs(&array_meta)?;
let fgb_options = options.create_fgb_options(geo_data_type, wkt_crs_str.as_deref());

let geometry_type = infer_flatgeobuf_geometry_type(stream.schema()?.as_ref())?;
let geometry_type = infer_flatgeobuf_geometry_type(stream.schema().as_ref())?;

let mut fgb = FgbWriter::create_with_options(name, geometry_type, fgb_options)?;
stream.process(&mut fgb)?;
Expand Down
4 changes: 1 addition & 3 deletions rust/geoarrow/src/io/geozero/table/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use geozero::{ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, P

impl GeozeroDatasource for RecordBatchReader {
fn process<P: FeatureProcessor>(&mut self, processor: &mut P) -> Result<(), GeozeroError> {
let reader = self.take().ok_or(GeozeroError::Dataset(
"Cannot read from closed RecordBatchReader".to_string(),
))?;
let reader = self.inner_mut();
let schema = reader.schema();
let geom_indices = schema.as_ref().geometry_columns();
let geometry_column_index = if geom_indices.len() != 1 {
Expand Down
14 changes: 5 additions & 9 deletions rust/geoarrow/src/io/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ use std::io::Write;

use arrow_ipc::writer::{FileWriter, StreamWriter};

use crate::error::{GeoArrowError, Result};
use crate::error::Result;
use crate::io::stream::RecordBatchReader;

/// Write a Table to an Arrow IPC (Feather v2) file
pub fn write_ipc<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let inner = stream
.into()
.take()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
let inner: RecordBatchReader = stream.into();
let inner = inner.into_inner();

let schema = inner.schema();
let mut writer = FileWriter::try_new(writer, &schema)?;
Expand All @@ -23,10 +21,8 @@ pub fn write_ipc<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) ->

/// Write a Table to an Arrow IPC stream
pub fn write_ipc_stream<W: Write, S: Into<RecordBatchReader>>(stream: S, writer: W) -> Result<()> {
let inner = stream
.into()
.take()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
let inner: RecordBatchReader = stream.into();
let inner = inner.into_inner();

let schema = inner.schema();
let mut writer = StreamWriter::try_new(writer, &schema)?;
Expand Down
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Reader and writer implementations of many common geospatial file formats, including
//! interoperability with the `geozero` crate.
#![allow(missing_docs)] // FIXME
// #![allow(missing_docs)] // FIXME

pub mod crs;
#[cfg(feature = "csv")]
Expand Down
41 changes: 23 additions & 18 deletions rust/geoarrow/src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,40 @@ use arrow_schema::SchemaRef;

/// A newtype wrapper around an [`arrow_array::RecordBatchReader`] so that we can implement the
/// [`geozero::GeozeroDatasource`] trait on it.
pub struct RecordBatchReader(Option<Box<dyn _RecordBatchReader>>);
///
/// This allows for exporting Arrow data to a geozero-based consumer even when not all of the Arrow
/// data is present in memory at once.
pub struct RecordBatchReader(Box<dyn _RecordBatchReader>);

impl RecordBatchReader {
/// Create a new RecordBatchReader from an [`arrow_array::RecordBatchReader`].
pub fn new(reader: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(reader))
Self(reader)
}

pub fn schema(&self) -> Result<SchemaRef, GeoArrowError> {
let reader = self
.0
.as_ref()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
Ok(reader.schema())
/// Access the schema of this reader.
pub fn schema(&self) -> SchemaRef {
self.0.schema()
}

pub fn take(&mut self) -> Option<Box<dyn _RecordBatchReader>> {
self.0.take()
/// Access a mutable reference to the underlying [`arrow_array::RecordBatchReader`].
pub fn inner_mut(&mut self) -> &mut Box<dyn _RecordBatchReader> {
&mut self.0
}

/// Access the underlying [`arrow_array::RecordBatchReader`].
pub fn into_inner(self) -> Box<dyn _RecordBatchReader> {
self.0
}
}

impl From<Table> for RecordBatchReader {
fn from(value: Table) -> Self {
let (batches, schema) = value.into_inner();
Self(Some(Box::new(RecordBatchIterator::new(
Self(Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
))))
)))
}
}

Expand All @@ -44,23 +51,21 @@ impl From<&Table> for RecordBatchReader {
impl TryFrom<RecordBatchReader> for Table {
type Error = GeoArrowError;

fn try_from(mut value: RecordBatchReader) -> Result<Self, Self::Error> {
let reader = value
.take()
.ok_or(GeoArrowError::General("Closed stream".to_string()))?;
fn try_from(value: RecordBatchReader) -> Result<Self, Self::Error> {
let reader = value.0;
let schema = reader.schema();
Table::try_new(reader.collect::<Result<_, _>>()?, schema)
}
}

impl From<Box<dyn _RecordBatchReader>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader>) -> Self {
Self(Some(value))
Self(value)
}
}

impl From<Box<dyn _RecordBatchReader + Send>> for RecordBatchReader {
fn from(value: Box<dyn _RecordBatchReader + Send>) -> Self {
Self(Some(value))
Self(value)
}
}

0 comments on commit 9d8fadc

Please sign in to comment.