Skip to content

Commit

Permalink
Handle batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 23, 2024
1 parent 401180a commit b85aa03
Showing 1 changed file with 95 additions and 51 deletions.
146 changes: 95 additions & 51 deletions rust/geoarrow/src/io/flatgeobuf/reader/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ use geozero::{FeatureProcessor, FeatureProperties};
use std::io::{Read, Seek};
use std::sync::Arc;

/// A builder for [FlatGeobufRecordBatchReader]
pub struct FlatGeobufReaderBuilder<R> {
reader: FgbReader<R>,
}

impl<R: Read> FlatGeobufReaderBuilder<R> {
/// Open a new FlatGeobuf reader
pub fn open(reader: R) -> Result<Self> {
let reader = FgbReader::open(reader)?;
Ok(Self { reader })
Expand Down Expand Up @@ -80,6 +82,7 @@ impl<R: Read> FlatGeobufReaderBuilder<R> {
Ok((data_type, properties_schema, array_metadata))
}

/// Read features sequentially, without using `Seek`
pub fn read_seq(
self,
options: FlatGeobufReaderOptions,
Expand Down Expand Up @@ -112,6 +115,7 @@ impl<R: Read> FlatGeobufReaderBuilder<R> {
}

impl<R: Read + Seek> FlatGeobufReaderBuilder<R> {
/// Read features
pub fn read(
self,
options: FlatGeobufReaderOptions,
Expand Down Expand Up @@ -143,6 +147,9 @@ impl<R: Read + Seek> FlatGeobufReaderBuilder<R> {
}
}

/// An iterator over record batches from a FlatGeobuf file.
///
/// This implements [arrow_array::RecordBatchReader], which you can use to access data.
pub struct FlatGeobufRecordBatchReader<R, S> {
selection: FeatureIter<R, S>,
data_type: NativeType,
Expand Down Expand Up @@ -173,118 +180,155 @@ impl<R, S> FlatGeobufRecordBatchReader<R, S> {
impl<R: Read> FlatGeobufRecordBatchReader<R, NotSeekable> {
fn process_batch(&mut self) -> Result<Option<RecordBatch>> {
let options = self.construct_options();
macro_rules! impl_read {
($builder:ty, $dim:expr) => {{
let mut builder = GeoTableBuilder::<$builder>::new_with_options($dim, options);
while let Some(feature) = self.selection.next()? {
feature.process_properties(&mut builder)?;
builder.properties_end()?;

builder.push_geometry(feature.geometry_trait()?.as_ref())?;
let batch_size = options.batch_size;

builder.feature_end(0)?;
macro_rules! impl_read {
($builder:expr) => {{
let mut row_count = 0;
loop {
if row_count >= batch_size {
let (batches, _schema) = $builder.finish()?.into_inner();
assert_eq!(batches.len(), 1);
return Ok(Some(batches.into_iter().next().unwrap()));
}

if let Some(feature) = self.selection.next()? {
feature.process_properties(&mut $builder)?;
$builder.properties_end()?;

$builder.push_geometry(feature.geometry_trait()?.as_ref())?;

$builder.feature_end(0)?;
row_count += 1;
} else {
return Ok(None);
}
}
builder.finish()
}};
}

let table = match self.data_type {
match self.data_type {
NativeType::Point(_, dim) => {
impl_read!(PointBuilder, dim)
let mut builder = GeoTableBuilder::<PointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::LineString(_, dim) => {
impl_read!(LineStringBuilder, dim)
let mut builder =
GeoTableBuilder::<LineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Polygon(_, dim) => {
impl_read!(PolygonBuilder, dim)
let mut builder = GeoTableBuilder::<PolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPoint(_, dim) => {
impl_read!(MultiPointBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiPointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiLineString(_, dim) => {
impl_read!(MultiLineStringBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiLineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPolygon(_, dim) => {
impl_read!(MultiPolygonBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiPolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Geometry(_) | NativeType::GeometryCollection(_, _) => {
let mut builder = GeoTableBuilder::<MixedGeometryStreamBuilder>::new_with_options(
// TODO: I think this is unused? remove.
Dimension::XY,
options,
);
self.selection.process_features(&mut builder)?;
builder.finish()
impl_read!(builder)
}
geom_type => Err(GeoArrowError::NotYetImplemented(format!(
"Parsing FlatGeobuf from {:?} geometry type not yet supported",
geom_type
))),
}?;
let (batches, _schema) = table.into_inner();
assert_eq!(batches.len(), 1);

// TODO: need to propagate when we've reached the end of the fgb iterator
Ok(Some(batches.into_iter().next().unwrap()))
}
}
}

impl<R: Read + Seek> FlatGeobufRecordBatchReader<R, Seekable> {
fn process_batch(&mut self) -> Result<Option<RecordBatch>> {
let options = self.construct_options();
macro_rules! impl_read {
($builder:ty, $dim:expr) => {{
let mut builder = GeoTableBuilder::<$builder>::new_with_options($dim, options);
while let Some(feature) = self.selection.next()? {
feature.process_properties(&mut builder)?;
builder.properties_end()?;
let batch_size = options.batch_size;

builder.push_geometry(feature.geometry_trait()?.as_ref())?;

builder.feature_end(0)?;
macro_rules! impl_read {
($builder:expr) => {{
let mut row_count = 0;
loop {
if row_count >= batch_size {
let (batches, _schema) = $builder.finish()?.into_inner();
assert_eq!(batches.len(), 1);
return Ok(Some(batches.into_iter().next().unwrap()));
}

if let Some(feature) = self.selection.next()? {
feature.process_properties(&mut $builder)?;
$builder.properties_end()?;

$builder.push_geometry(feature.geometry_trait()?.as_ref())?;

$builder.feature_end(0)?;
row_count += 1;
} else {
return Ok(None);
}
}
builder.finish()
}};
}

let table = match self.data_type {
match self.data_type {
NativeType::Point(_, dim) => {
impl_read!(PointBuilder, dim)
let mut builder = GeoTableBuilder::<PointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::LineString(_, dim) => {
impl_read!(LineStringBuilder, dim)
let mut builder =
GeoTableBuilder::<LineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Polygon(_, dim) => {
impl_read!(PolygonBuilder, dim)
let mut builder = GeoTableBuilder::<PolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPoint(_, dim) => {
impl_read!(MultiPointBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiPointBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiLineString(_, dim) => {
impl_read!(MultiLineStringBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiLineStringBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::MultiPolygon(_, dim) => {
impl_read!(MultiPolygonBuilder, dim)
let mut builder =
GeoTableBuilder::<MultiPolygonBuilder>::new_with_options(dim, options);
impl_read!(builder)
}
NativeType::Geometry(_) | NativeType::GeometryCollection(_, _) => {
NativeType::Geometry(_) => {
let mut builder = GeoTableBuilder::<MixedGeometryStreamBuilder>::new_with_options(
// TODO: I think this is unused? remove.
Dimension::XY,
options,
);
self.selection.process_features(&mut builder)?;
builder.finish()
impl_read!(builder)
}
// NativeType::GeometryCollection(_, dim) => {
// let mut builder =
// GeoTableBuilder::<GeometryCollectionBuilder>::new_with_options(dim, options);
// impl_read!(builder)
// }
geom_type => Err(GeoArrowError::NotYetImplemented(format!(
"Parsing FlatGeobuf from {:?} geometry type not yet supported",
geom_type
))),
}?;
let (batches, _schema) = table.into_inner();
assert_eq!(batches.len(), 1);

// TODO: need to propagate when we've reached the end of the fgb iterator
Ok(Some(batches.into_iter().next().unwrap()))
}
}
}

Expand Down

0 comments on commit b85aa03

Please sign in to comment.