From 5552e4bbbff5026db318dceb0ae58537b238a7b1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 10 Dec 2024 08:58:56 -0500 Subject: [PATCH 1/4] Add example of interacting with a remote catalog --- datafusion-examples/README.md | 1 + .../examples/remote_catalog.rs | 404 ++++++++++++++++++ datafusion/catalog/src/catalog.rs | 13 +- .../core/src/execution/session_state.rs | 2 +- 4 files changed, 414 insertions(+), 6 deletions(-) create mode 100644 datafusion-examples/examples/remote_catalog.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 528e7dd857e5..aca600e50e4f 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -77,6 +77,7 @@ cargo run --example dataframe - [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 - [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP - [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions +- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network) - [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) - [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF) - [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs new file mode 100644 index 000000000000..84edc4f6c679 --- /dev/null +++ b/datafusion-examples/examples/remote_catalog.rs @@ -0,0 +1,404 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// This example shows how to implement the DataFusion [`CatalogProvider`] API +/// for catalogs that are remote (require network access) and/or offer only +/// asynchronous APIs such as [Polaris], [Unity], and [Hive]. +/// +/// Integrating with this catalogs is a bit more complex than with local +/// catalogs because calls like `ctx.sql("SELECT * FROM db.schm.tbl")` may need +/// to perform remote network requests, but many Catalog APIs are synchronous. +/// See the documentation on [`CatalogProvider`] for more details. +/// +/// [`CatalogProvider`]: datafusion_catalog::CatalogProvider +/// +/// [Polaris]: https://github.com/apache/polaris +/// [Unity]: https://github.com/unitycatalog/unitycatalog +/// [Hive]: https://hive.apache.org/ +use arrow::array::record_batch; +use arrow_schema::{Field, Fields, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{SchemaProvider, TableProvider}; +use datafusion::common::DataFusionError; +use datafusion::common::Result; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion_catalog::Session; +use datafusion_common::{ + assert_batches_eq, internal_datafusion_err, plan_err, TableReference, +}; +use datafusion_expr::{Expr, TableType}; +use futures::TryStreamExt; +use std::any::Any; +use std::sync::{Arc, Mutex}; + +#[tokio::main] +async fn main() -> Result<()> { + // As always, we create a session context to interact with DataFusion + let ctx = SessionContext::new(); + + // Make a connection to the remote catalog, asynchronously, and configure it + let remote_catalog_interface = RemoteCatalogInterface::connect().await?; + + // Register a SchemaProvider for tables in a schema named "remote_schema". + // + // This will let DataFusion query tables such as + // `datafusion.remote_schema.remote_table` + let remote_schema: Arc = + Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface))); + ctx.catalog("datafusion") + .ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))? + .register_schema("remote_schema", Arc::clone(&remote_schema))?; + + // Here is a query that selects data from a table in the remote catalog. + let sql = "SELECT * from remote_schema.remote_table"; + + // While the `SessionContext::sql` interface is async, but it does not + // support asynchronous access to catalogs, so the following query errors. + let results = ctx.sql(sql).await; + assert_eq!( + results.unwrap_err().to_string(), + "Error during planning: table 'datafusion.remote_schema.remote_table' not found" + ); + + // Instead, to use a remote catalog, we must use lower level APIs on + // SessionState (what `SessionContext::sql` does internally). + let state = ctx.state(); + + // First, parse the SQL (but don't plan it / resolve any table references) + let dialect = state.config().options().sql_parser.dialect.as_str(); + let statement = state.sql_to_statement(sql, dialect)?; + + // Find all `TableReferences` in the parsed queries. These correspond to the + // tables referred to by the query (in this case + // `remote_schema.remote_table`) + let references = state.resolve_table_references(&statement)?; + + // Call `load_tables` to load information from the remote catalog for each + // of the referenced tables. Best practice is to fetch the the information + // for all tables required by the query once (rather than one per table) to + // minimize network overhead + let table_names = references.iter().filter_map(|r| { + if refers_to_schema("datafusion", "remote_schema", r) { + Some(r.table()) + } else { + None + } + }); + remote_schema + .as_any() + .downcast_ref::() + .expect("correct types") + .load_tables(table_names) + .await?; + + // Now continue planing the query after having fetched the remote table and + // it can run as normal + let plan = state.statement_to_plan(statement).await?; + let results = DataFrame::new(state, plan).collect().await?; + assert_batches_eq!( + [ + "+----+-------+", + "| id | name |", + "+----+-------+", + "| 1 | alpha |", + "| 2 | beta |", + "| 3 | gamma |", + "+----+-------+", + ], + &results + ); + + Ok(()) +} + +/// This is an example of an API that interacts with a remote catalog. +/// +/// Specifically, its APIs are all `async` and thus can not be used by +/// [`SchemaProvider`] or [`TableProvider`] directly. +#[derive(Debug)] +struct RemoteCatalogInterface {} + +impl RemoteCatalogInterface { + /// Establish a connection to the remote catalog + pub async fn connect() -> Result { + // In a real implementation this method might connect to a remote + // catalog, validate credentials, cache basic information, etc + Ok(Self {}) + } + + /// Fetches information for a specific table + pub async fn table_info(&self, name: &str) -> Result { + if name != "remote_table" { + return plan_err!("Remote table not found: {}", name); + } + + // In this example, we'll model a remote table with columns "id" and + // "name" + // + // A real remote catalog would make a network call to fetch this + // information from a remote source. + let schema = Schema::new(Fields::from(vec![ + Field::new("id", arrow::datatypes::DataType::Int32, false), + Field::new("name", arrow::datatypes::DataType::Utf8, false), + ])); + Ok(Arc::new(schema)) + } + + /// Fetches data for a table from a remote data source + pub async fn read_data(&self, name: &str) -> Result { + if name != "remote_table" { + return plan_err!("Remote table not found: {}", name); + } + + // In a real remote catalog this call would likely perform network IO to + // open and begin reading from a remote datasource, prefetching + // information, etc. + // + // In this example we are just demonstrating how the API works so simply + // return back some static data as a stream. + let batch = record_batch!( + ("id", Int32, [1, 2, 3]), + ("name", Utf8, ["alpha", "beta", "gamma"]) + ) + .unwrap(); + let schema = batch.schema(); + + let stream = futures::stream::iter([Ok(batch)]); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } +} + +/// Implements the DataFusion Catalog API interface for tables +/// stored in a remote catalog. +#[derive(Debug)] +struct RemoteSchema { + /// Connection with the remote catalog + remote_catalog_interface: Arc, + /// Local cache of tables that have been preloaded from the remote + /// catalog + tables: Mutex>>, +} + +impl RemoteSchema { + /// Create a new RemoteSchema + pub fn new(remote_catalog_interface: Arc) -> Self { + Self { + remote_catalog_interface, + tables: Mutex::new(vec![]), + } + } + + /// Load information for the specified tables from the remote source into + /// the local cached copy. + pub async fn load_tables( + &self, + references: impl IntoIterator, + ) -> Result<()> { + for table_name in references { + if !self.table_exist(table_name) { + // Fetch information about the table from the remote catalog + // + // Note that a real remote catalog interface could return more + // information, but at the minimum, DataFusion requires the + // table's schema for planing. + let schema = self.remote_catalog_interface.table_info(table_name).await?; + let remote_table = RemoteTable::new( + Arc::clone(&self.remote_catalog_interface), + table_name, + schema, + ); + + // Add the table to our local cached list + self.tables + .lock() + .expect("mutex invalid") + .push(Arc::new(remote_table)); + }; + } + Ok(()) + } +} + +/// Implement the DataFusion Catalog API for [`RemoteSchema`] +#[async_trait] +impl SchemaProvider for RemoteSchema { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + // Note this API is not async so we can't directly call the RemoteCatalogInterface + // instead we use the cached list of loaded tables + self.tables + .lock() + .expect("mutex valid") + .iter() + .map(|remote_table| { + // note it is possible to downcast to RemoteTable and call methods on it + remote_table + .as_any() + .downcast_ref::() + .expect("downcast to RemoteTable") + .name() + .to_string() + }) + .collect() + } + + // While this API is actually `async` and thus could consult a remote + // catalog directly it is more efficient to use a local cached copy instead, + // which is what we model in this example + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { + // Look for any pre-loaded tables + let table = self + .tables + .lock() + .expect("mutex valid") + .iter() + .find(|remote_table| { + // note it is possible to downcast to RemoteTable and call methods on it + remote_table + .as_any() + .downcast_ref::() + .expect("downcast to RemoteTable") + .name() + == name + }) + .map(Arc::clone); + Ok(table) + } + + fn table_exist(&self, name: &str) -> bool { + // Look for any pre-loaded tables, note this function is also `async` + self.tables + .lock() + .expect("mutex valid") + .iter() + .any(|remote_table| { + // note it is possible to downcast to RemoteTable and call methods on it + remote_table + .as_any() + .downcast_ref::() + .expect("downcast to RemoteTable") + .name() + == name + }) + } +} + +/// Represents the information about a table retrieved from the remote catalog +#[derive(Debug)] +struct RemoteTable { + /// connection to the remote catalog + remote_catalog_interface: Arc, + name: String, + schema: SchemaRef, +} + +impl RemoteTable { + pub fn new( + remote_catalog_interface: Arc, + name: impl Into, + schema: SchemaRef, + ) -> Self { + Self { + remote_catalog_interface, + name: name.into(), + schema, + } + } + + /// Return the name of this table + pub fn name(&self) -> &str { + &self.name + } +} + +/// Implement the DataFusion Catalog API for [`RemoteTable`] +#[async_trait] +impl TableProvider for RemoteTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + // Note that `scan` is called once the plan begin execution, and thus is + // async. When interacting with remote data sources, this is the place + // to begin establishing the remote connections and interacting with the + // remote storage system. + // + // As this example is just modeling the catalog API interface, we buffer + // the results locally in memory for simplicity. + let batches = self + .remote_catalog_interface + .read_data(&self.name) + .await? + .try_collect() + .await?; + Ok(Arc::new(MemoryExec::try_new( + &[batches], + self.schema.clone(), + projection.cloned(), + )?)) + } +} + +/// Return true if this `table_reference` might be for a table in the specified +/// catalog and schema. +fn refers_to_schema( + catalog_name: &str, + schema_name: &str, + table_reference: &TableReference, +) -> bool { + // Check the references are in the correct catalog and schema + // references like foo.bar.baz + if let Some(catalog) = table_reference.catalog() { + if catalog != catalog_name { + return false; + } + } + // references like bar.baz + if let Some(schema) = table_reference.schema() { + if schema != schema_name { + return false; + } + } + + true +} diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs index 85f2dede2f27..c7cb90d5912a 100644 --- a/datafusion/catalog/src/catalog.rs +++ b/datafusion/catalog/src/catalog.rs @@ -52,12 +52,16 @@ use datafusion_common::Result; /// /// # Implementing "Remote" catalogs /// +/// See [`remote_catalog`] for an end to end example of how to implement a +/// remote catalog. +/// /// Sometimes catalog information is stored remotely and requires a network call /// to retrieve. For example, the [Delta Lake] table format stores table /// metadata in files on S3 that must be first downloaded to discover what /// schemas and tables exist. /// /// [Delta Lake]: https://delta.io/ +/// [`remote_catalog`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs /// /// The [`CatalogProvider`] can support this use case, but it takes some care. /// The planning APIs in DataFusion are not `async` and thus network IO can not @@ -72,15 +76,14 @@ use datafusion_common::Result; /// batch access to the remote catalog to retrieve multiple schemas and tables /// in a single network call. /// -/// Note that [`SchemaProvider::table`] is an `async` function in order to +/// Note that [`SchemaProvider::table`] **is** an `async` function in order to /// simplify implementing simple [`SchemaProvider`]s. For many table formats it /// is easy to list all available tables but there is additional non trivial /// access required to read table details (e.g. statistics). /// /// The pattern that DataFusion itself uses to plan SQL queries is to walk over -/// the query to find all table references, -/// performing required remote catalog in parallel, and then plans the query -/// using that snapshot. +/// the query to find all table references, performing required remote catalog +/// in parallel, and then plans the query using that snapshot. /// /// # Example Catalog Implementations /// @@ -150,7 +153,7 @@ pub trait CatalogProvider: Debug + Sync + Send { /// Represent a list of named [`CatalogProvider`]s. /// -/// Please see the documentation on `CatalogProvider` for details of +/// Please see the documentation on [`CatalogProvider`] for details of /// implementing a custom catalog. pub trait CatalogProviderList: Debug + Sync + Send { /// Returns the catalog list as [`Any`] diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 4ccad5ffd323..5e248968ffc9 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -296,7 +296,7 @@ impl SessionState { } /// Retrieve the [`SchemaProvider`] for a specific [`TableReference`], if it - /// esists. + /// exsists. pub fn schema_for_ref( &self, table_ref: impl Into, From 772bf051f979100e497f304c7c70bbe34ceff489 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Dec 2024 06:51:22 -0500 Subject: [PATCH 2/4] Update datafusion/core/src/execution/session_state.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> --- datafusion/core/src/execution/session_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 5e248968ffc9..706f075f4ea0 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -296,7 +296,7 @@ impl SessionState { } /// Retrieve the [`SchemaProvider`] for a specific [`TableReference`], if it - /// exsists. + /// exists. pub fn schema_for_ref( &self, table_ref: impl Into, From 5b288712c487337e6b6d1551b3ed237c80cbb7c6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 15 Dec 2024 17:16:32 -0500 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Jonah Gao Co-authored-by: Weston Pace --- datafusion-examples/examples/remote_catalog.rs | 3 +-- datafusion/catalog/src/catalog.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 84edc4f6c679..9cebe888efc6 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -1,5 +1,4 @@ // Licensed to the Apache Software Foundation (ASF) under one -// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -71,7 +70,7 @@ async fn main() -> Result<()> { // Here is a query that selects data from a table in the remote catalog. let sql = "SELECT * from remote_schema.remote_table"; - // While the `SessionContext::sql` interface is async, but it does not + // The `SessionContext::sql` interface is async, but it does not // support asynchronous access to catalogs, so the following query errors. let results = ctx.sql(sql).await; assert_eq!( diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs index c7cb90d5912a..71b9eccf9d65 100644 --- a/datafusion/catalog/src/catalog.rs +++ b/datafusion/catalog/src/catalog.rs @@ -83,7 +83,8 @@ use datafusion_common::Result; /// /// The pattern that DataFusion itself uses to plan SQL queries is to walk over /// the query to find all table references, performing required remote catalog -/// in parallel, and then plans the query using that snapshot. +/// lookups in parallel, storing the results in a cached snapshot, and then plans +/// the query using that snapshot. /// /// # Example Catalog Implementations /// From 69723c47a67129402d1b4cbe3226f89624495ab8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 15 Dec 2024 17:28:54 -0500 Subject: [PATCH 4/4] Use HashMap to hold tables --- .../examples/remote_catalog.rs | 50 +++---------------- 1 file changed, 8 insertions(+), 42 deletions(-) diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 9cebe888efc6..206b7ba9c4be 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -42,7 +42,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::Session; use datafusion_common::{ - assert_batches_eq, internal_datafusion_err, plan_err, TableReference, + assert_batches_eq, internal_datafusion_err, plan_err, HashMap, TableReference, }; use datafusion_expr::{Expr, TableType}; use futures::TryStreamExt; @@ -194,7 +194,7 @@ struct RemoteSchema { remote_catalog_interface: Arc, /// Local cache of tables that have been preloaded from the remote /// catalog - tables: Mutex>>, + tables: Mutex>>, } impl RemoteSchema { @@ -202,7 +202,7 @@ impl RemoteSchema { pub fn new(remote_catalog_interface: Arc) -> Self { Self { remote_catalog_interface, - tables: Mutex::new(vec![]), + tables: Mutex::new(HashMap::new()), } } @@ -230,7 +230,7 @@ impl RemoteSchema { self.tables .lock() .expect("mutex invalid") - .push(Arc::new(remote_table)); + .insert(table_name.to_string(), Arc::new(remote_table)); }; } Ok(()) @@ -250,16 +250,8 @@ impl SchemaProvider for RemoteSchema { self.tables .lock() .expect("mutex valid") - .iter() - .map(|remote_table| { - // note it is possible to downcast to RemoteTable and call methods on it - remote_table - .as_any() - .downcast_ref::() - .expect("downcast to RemoteTable") - .name() - .to_string() - }) + .keys() + .cloned() .collect() } @@ -275,35 +267,14 @@ impl SchemaProvider for RemoteSchema { .tables .lock() .expect("mutex valid") - .iter() - .find(|remote_table| { - // note it is possible to downcast to RemoteTable and call methods on it - remote_table - .as_any() - .downcast_ref::() - .expect("downcast to RemoteTable") - .name() - == name - }) + .get(name) .map(Arc::clone); Ok(table) } fn table_exist(&self, name: &str) -> bool { // Look for any pre-loaded tables, note this function is also `async` - self.tables - .lock() - .expect("mutex valid") - .iter() - .any(|remote_table| { - // note it is possible to downcast to RemoteTable and call methods on it - remote_table - .as_any() - .downcast_ref::() - .expect("downcast to RemoteTable") - .name() - == name - }) + self.tables.lock().expect("mutex valid").contains_key(name) } } @@ -328,11 +299,6 @@ impl RemoteTable { schema, } } - - /// Return the name of this table - pub fn name(&self) -> &str { - &self.name - } } /// Implement the DataFusion Catalog API for [`RemoteTable`]