-
Notifications
You must be signed in to change notification settings - Fork 38
IO with Adapters
In First Steps and More with CSP we used toy data for the streaming workflows. In real workflows, we need to access data stored in particular storage formats. To bring data into or out of a csp
graph, we use adapters.
csp
has several built-in adapters to access certain types of data such as Kafka and Parquet. You can also write your own adapters for any other data format; for reference, see the various "How-to" guides for historical, real-time and output adapters. I/O adapters form the interface between external data and the time series format used in csp
.
In this tutorial, you write to and read from Parquet files on the local file system.
csp
has the ParquetWriter
and ParquetReader
adapters to stream data to and from Parquet files. Check out the complete API in the Reference documentation.
Important
csp
graphs can process historical and real-time data with little to no changes in the application code.
A csp.Struct
is a basic form of structured data in csp
where each field can be accessed as its own time series. It is analogous to a dataclass in Python, and its fields must be type annotated. We will store some example data in a custom struct called Example
and then stream the struct to a Parquet file.
from csp.adapters.parquet import ParquetOutputConfig, ParquetWriter, ParquetReader
class Example(csp.Struct):
int_val: int
float_val: float
In a graph, create some sample values for Example
and use ParquetWriter
to stream to a Parquet file.
- The
timestamp_column_name
is howcsp
preserves the timestamps on each event. If the timestamp information is not required, you can set the column name argument toNone
. - You can provide optional configurations in the
ParquetOutputConfig
format which can setallow_overwrite
,batch_size
,compression
, andwrite_arrow_binary
. - We use
publish_struct
to publish (write) the time series data to disk.
@csp.graph
def write_struct(file_name: str):
st = datetime(2020, 1, 1)
curve = csp.curve(
Example,
[
(st + timedelta(seconds=1), Example(int_val=1, float_val=1.0)),
(st + timedelta(seconds=2), Example(int_val=2, float_val=2.0)),
(st + timedelta(seconds=3), Example(int_val=3, float_val=3.0)),
],
)
writer = ParquetWriter(
file_name=file_name, timestamp_column_name="csp_time", config=ParquetOutputConfig(allow_overwrite=True)
)
writer.publish_struct(curve)
You can use ParquetReader
with a time_column
to read back the data.
@csp.graph
def read_struct(file_name: str):
struct_reader = ParquetReader(file_name, time_column="csp_time")
struct_all = struct_reader.subscribe_all(Example)
csp.print("struct_all", struct_all)
Go through the complete example at examples/03_using_adapters/parquet/e1_parquet_write_read.py and check out the the API reference for more details.
This wiki is autogenerated. To made updates, open a PR against the original source file in docs/wiki
.
Get Started (Tutorials)
Concepts
- CSP Node
- CSP Graph
- Historical Buffers
- Execution Modes
- Adapters
- Feedback and Delayed Edge
- Common Mistakes
How-to guides
- Use Statistical Nodes
- Create Dynamic Baskets
- Write Adapters:
- Profile CSP Code
References
- API Reference
- Glossary of Terms
- Examples
Developer Guide