diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index b3054299b7f7..a05b46d22840 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -337,7 +337,9 @@ impl RecordBatchReceiverStream { pin_project! { /// Combines a [`Stream`] with a [`SchemaRef`] implementing - /// [`RecordBatchStream`] for the combination + /// [`SendableRecordBatchStream`] for the combination + /// + /// See [`Self::new`] for an example pub struct RecordBatchStreamAdapter { schema: SchemaRef, @@ -347,7 +349,28 @@ pin_project! { } impl RecordBatchStreamAdapter { - /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream + /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream. + /// + /// Note to create a [`SendableRecordBatchStream`] you pin the result + /// + /// # Example + /// ``` + /// # use arrow::array::record_batch; + /// # use datafusion_execution::SendableRecordBatchStream; + /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter; + /// // Create stream of Result + /// let batch = record_batch!( + /// ("a", Int32, [1, 2, 3]), + /// ("b", Float64, [Some(4.0), None, Some(5.0)]) + /// ).expect("created batch"); + /// let schema = batch.schema(); + /// let stream = futures::stream::iter(vec![Ok(batch)]); + /// // Convert the stream to a SendableRecordBatchStream + /// let adapter = RecordBatchStreamAdapter::new(schema, stream); + /// // Now you can use the adapter as a SendableRecordBatchStream + /// let batch_stream: SendableRecordBatchStream = Box::pin(adapter); + /// // ... + /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { Self { schema, stream } }