From d492a4511dad7b4ec615ac10d4edbe56e4832b91 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 19 Mar 2024 13:58:44 +0800 Subject: [PATCH] feat: support view in memory (#837) This PR is a part of #796, adds support for creating, querying and dropping views in memory. The key implementations are: 1. When creating a view, bind the query and store the logical plan with the view in catalog. 2. When querying from a view, build executors for all views and then build other plan nodes on top of them. Given that a view can be consumed by multiple downstream nodes, we introduce `StreamSubscriber` to allow multiple consumers of a stream. Limitations: 1. We don't persist views in disk storage. 2. We don't support inferring schema from the query. Columns must be defined explicitly when creating a view. 3. We don't maintain dependency relationship between tables and views. --------- Signed-off-by: Runji Wang --- Cargo.lock | 51 ++++- Cargo.toml | 1 + src/array/mod.rs | 16 +- src/binder/copy.rs | 7 +- src/binder/create_table.rs | 1 - src/binder/create_view.rs | 61 ++++++ src/binder/delete.rs | 6 +- src/binder/drop.rs | 86 ++------ src/binder/insert.rs | 6 +- src/binder/mod.rs | 18 +- src/binder/table.rs | 13 +- src/catalog/root.rs | 21 +- src/catalog/schema.rs | 27 ++- src/catalog/table.rs | 55 +++++- src/db.rs | 3 + src/executor/copy_from_file.rs | 11 +- src/executor/copy_to_file.rs | 2 +- src/executor/{create.rs => create_table.rs} | 13 +- src/executor/create_view.rs | 26 +++ src/executor/drop.rs | 13 +- src/executor/error.rs | 94 +++++++++ src/executor/mod.rs | 205 +++++++++++++------- src/executor/system_table_scan.rs | 5 +- src/planner/explain.rs | 9 +- src/planner/mod.rs | 14 +- src/storage/memory/mod.rs | 1 - src/storage/secondary/manifest.rs | 1 - tests/sql/view.slt | 71 +++++++ 28 files changed, 630 insertions(+), 207 deletions(-) create mode 100644 src/binder/create_view.rs rename src/executor/{create.rs => create_table.rs} (68%) create mode 100644 src/executor/create_view.rs create mode 100644 src/executor/error.rs create mode 100644 tests/sql/view.slt diff --git a/Cargo.lock b/Cargo.lock index 6fdb0a97d..73b528893 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,13 +131,25 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-broadcast" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258b52a1aa741b9f09783b2d86cf0aeeb617bbf847f6933340a39644227acbdb" +dependencies = [ + "event-listener 5.2.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-lock" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "event-listener", + "event-listener 2.5.3", ] [[package]] @@ -591,6 +603,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "120133d4db2ec47efe2e26502ee984747630c67f51974fca0b6c1340cf2368d3" +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.8" @@ -995,6 +1016,27 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +dependencies = [ + "event-listener 5.2.0", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1879,6 +1921,12 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2535,6 +2583,7 @@ version = "0.2.0" dependencies = [ "ahash 0.8.11", "anyhow", + "async-broadcast", "async-recursion", "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 319505bf3..391232727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ python = ["pyo3", "pyo3-build-config"] [dependencies] ahash = "0.8" anyhow = "1" +async-broadcast = "0.7" async-recursion = "1" async-stream = "0.3" async-trait = "0.1" diff --git a/src/array/mod.rs b/src/array/mod.rs index d0a7439d6..b874ab85e 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -407,7 +407,7 @@ macro_rules! impl_array_builder { (Self::$Abc(a), DataValue::$Value(v)) => a.push(Some(v)), (Self::$Abc(a), DataValue::Null) => a.push(None), )* - _ => panic!("failed to push value: type mismatch"), + (b, v) => panic!("failed to push value: type mismatch. builder: {}, value: {:?}", b.type_string(), v), } } @@ -419,7 +419,7 @@ macro_rules! impl_array_builder { (Self::$Abc(a), DataValue::$Value(v)) => a.push_n(n, Some(v)), (Self::$Abc(a), DataValue::Null) => a.push_n(n, None), )* - _ => panic!("failed to push value: type mismatch"), + (b, v) => panic!("failed to push value: type mismatch. builder: {}, value: {:?}", b.type_string(), v), } } @@ -450,7 +450,17 @@ macro_rules! impl_array_builder { $( (Self::$Abc(builder), ArrayImpl::$Abc(arr)) => builder.append(arr), )* - _ => panic!("failed to push value: type mismatch"), + (b, a) => panic!("failed to append array: type mismatch. builder: {}, array: {}", b.type_string(), a.type_string()), + } + } + + /// Return a string describing the type of this array. + fn type_string(&self) -> &'static str { + match self { + Self::Null(_) => "NULL", + $( + Self::$Abc(_) => stringify!($Abc), + )* } } } diff --git a/src/binder/copy.rs b/src/binder/copy.rs index d574ac87a..35c602bbc 100644 --- a/src/binder/copy.rs +++ b/src/binder/copy.rs @@ -62,7 +62,7 @@ impl Binder { } => (table_name, columns), CopySource::Query(_) => return Err(BindError::Todo("copy from query".into())), }; - let (table, _) = self.bind_table_id(&table_name)?; + let (table, is_system, is_view) = self.bind_table_id(&table_name)?; let cols = self.bind_table_columns(&table_name, &columns)?; @@ -81,6 +81,11 @@ impl Binder { self.egraph.add(Node::CopyTo([ext_source, scan])) } else { // COPY FROM + if is_system { + return Err(BindError::CopyTo("system table".into())); + } else if is_view { + return Err(BindError::CopyTo("view".into())); + } let types = self.type_(cols)?; let types = self.egraph.add(Node::Type(types)); let copy = self.egraph.add(Node::CopyFrom([ext_source, types])); diff --git a/src/binder/create_table.rs b/src/binder/create_table.rs index 7b8a0e2b9..f86a2e957 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create_table.rs @@ -116,7 +116,6 @@ impl Binder { .collect(); for &index in &ordered_pk_ids { - columns[index as usize].set_primary(true); columns[index as usize].set_nullable(false); } diff --git a/src/binder/create_view.rs b/src/binder/create_view.rs new file mode 100644 index 000000000..2e350f0e2 --- /dev/null +++ b/src/binder/create_view.rs @@ -0,0 +1,61 @@ +use std::collections::HashSet; + +use super::*; +use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + +impl Binder { + pub(super) fn bind_create_view( + &mut self, + name: ObjectName, + columns: Vec, + query: Query, + ) -> Result { + let name = lower_case_name(&name); + let (schema_name, table_name) = split_name(&name)?; + let schema = self + .catalog + .get_schema_by_name(schema_name) + .ok_or_else(|| BindError::InvalidSchema(schema_name.into()))?; + if schema.get_table_by_name(table_name).is_some() { + return Err(BindError::TableExists(table_name.into())); + } + + // check duplicated column names + let mut set = HashSet::new(); + for col in &columns { + if !set.insert(col.value.to_lowercase()) { + return Err(BindError::ColumnExists(col.value.to_lowercase())); + } + } + + let (query, _) = self.bind_query(query)?; + let query_type = self.type_(query)?; + let output_types = query_type.as_struct(); + + // TODO: support inferring column names from query + if columns.len() != output_types.len() { + return Err(BindError::ViewAliasesMismatch); + } + + let columns: Vec = columns + .into_iter() + .zip(output_types) + .enumerate() + .map(|(idx, (name, ty))| { + ColumnCatalog::new( + idx as ColumnId, + ColumnDesc::new(name.value, ty.clone(), true), + ) + }) + .collect(); + + let table = self.egraph.add(Node::CreateTable(CreateTable { + schema_id: schema.id(), + table_name: table_name.into(), + columns, + ordered_pk_ids: vec![], + })); + let create_view = self.egraph.add(Node::CreateView([table, query])); + Ok(create_view) + } +} diff --git a/src/binder/delete.rs b/src/binder/delete.rs index 701531a6f..28048ceb1 100644 --- a/src/binder/delete.rs +++ b/src/binder/delete.rs @@ -14,9 +14,9 @@ impl Binder { let TableFactor::Table { name, alias, .. } = &from[0].relation else { return Err(BindError::Todo(format!("delete from {from:?}"))); }; - let (table_id, is_internal) = self.bind_table_id(name)?; - if is_internal { - return Err(BindError::NotSupportedOnInternalTable); + let (table_id, is_system, is_view) = self.bind_table_id(name)?; + if is_system || is_view { + return Err(BindError::CanNotDelete); } let scan = self.bind_table_def(name, alias.clone(), true)?; let cond = self.bind_where(selection)?; diff --git a/src/binder/drop.rs b/src/binder/drop.rs index 3521e285d..1f74e29a3 100644 --- a/src/binder/drop.rs +++ b/src/binder/drop.rs @@ -1,59 +1,7 @@ // Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0. -use std::result::Result as RawResult; -use std::str::FromStr; - -use pretty_xmlish::helper::delegate_fmt; -use pretty_xmlish::Pretty; -use serde::{Deserialize, Serialize}; - use super::*; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Serialize, Deserialize)] -pub struct BoundDrop { - pub object: Object, - pub if_exists: bool, - pub cascade: bool, -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Serialize, Deserialize)] -pub enum Object { - Table(TableRefId), -} - -impl std::fmt::Display for BoundDrop { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let explainer = Pretty::childless_record("Drop", self.pretty_table()); - delegate_fmt(&explainer, f, String::with_capacity(1000)) - } -} - -impl BoundDrop { - pub fn pretty_table<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> { - vec![ - ("object", Pretty::display(&self.object)), - ("if_exists", Pretty::display(&self.if_exists)), - ("cascade", Pretty::display(&self.cascade)), - ] - } -} - -impl std::fmt::Display for Object { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Object::Table(table_id_ref) => write!(f, "table {}", table_id_ref), - } - } -} - -impl FromStr for BoundDrop { - type Err = (); - - fn from_str(_s: &str) -> RawResult { - Err(()) - } -} - impl Binder { pub(super) fn bind_drop( &mut self, @@ -62,22 +10,26 @@ impl Binder { names: Vec, cascade: bool, ) -> Result { - match object_type { - ObjectType::Table => { - let name = lower_case_name(&names[0]); - let (schema_name, table_name) = split_name(&name)?; - let table_ref_id = self - .catalog - .get_table_id_by_name(schema_name, table_name) - .ok_or_else(|| BindError::InvalidTable(table_name.into()))?; - - Ok(self.egraph.add(Node::Drop(BoundDrop { - object: Object::Table(table_ref_id), - if_exists, - cascade, - }))) + if !matches!(object_type, ObjectType::Table | ObjectType::View) { + return Err(BindError::Todo(format!("drop {object_type:?}"))); + } + if cascade { + return Err(BindError::Todo("cascade drop".into())); + } + let mut table_ids = Vec::with_capacity(names.len()); + for name in names { + let name = lower_case_name(&name); + let (schema_name, table_name) = split_name(&name)?; + let result = self.catalog.get_table_id_by_name(schema_name, table_name); + if if_exists && result.is_none() { + continue; } - _ => Err(BindError::UnsupportedObjectName(object_type)), + let table_id = result.ok_or_else(|| BindError::InvalidTable(table_name.into()))?; + let id = self.egraph.add(Node::Table(table_id)); + table_ids.push(id); } + let list = self.egraph.add(Node::List(table_ids.into())); + let drop = self.egraph.add(Node::Drop(list)); + Ok(drop) } } diff --git a/src/binder/insert.rs b/src/binder/insert.rs index 379b413c1..cae147be4 100644 --- a/src/binder/insert.rs +++ b/src/binder/insert.rs @@ -10,9 +10,9 @@ impl Binder { columns: Vec, source: Box, ) -> Result { - let (table, is_internal) = self.bind_table_id(&table_name)?; - if is_internal { - return Err(BindError::NotSupportedOnInternalTable); + let (table, is_internal, is_view) = self.bind_table_id(&table_name)?; + if is_internal || is_view { + return Err(BindError::CanNotInsert); } let cols = self.bind_table_columns(&table_name, &columns)?; let source = self.bind_query(*source)?.0; diff --git a/src/binder/mod.rs b/src/binder/mod.rs index c4321cbca..1de55cbfb 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -17,6 +17,7 @@ use crate::planner::{Expr as Node, RecExpr, TypeError, TypeSchemaAnalysis}; pub mod copy; mod create_function; mod create_table; +mod create_view; mod delete; mod drop; mod expr; @@ -26,7 +27,6 @@ mod table; pub use self::create_function::*; pub use self::create_table::*; -pub use self::drop::*; pub type Result = std::result::Result; @@ -83,14 +83,20 @@ pub enum BindError { ColumnNotInAgg(String), #[error("ORDER BY items must appear in the select list if DISTINCT is specified")] OrderKeyNotInDistinct, - #[error("operation on internal table is not supported")] - NotSupportedOnInternalTable, #[error("{0:?} is not an aggregate function")] NotAgg(String), #[error("unsupported object name: {0:?}")] UnsupportedObjectName(ObjectType), #[error("not supported yet: {0}")] Todo(String), + #[error("can not copy to {0}")] + CopyTo(String), + #[error("can only insert into table")] + CanNotInsert, + #[error("can only delete from table")] + CanNotDelete, + #[error("VIEW aliases mismatch query result")] + ViewAliasesMismatch, } /// The binder resolves all expressions referring to schema objects such as @@ -284,6 +290,12 @@ impl Binder { constraints, .. } => self.bind_create_table(name, &columns, &constraints), + Statement::CreateView { + name, + columns, + query, + .. + } => self.bind_create_view(name, columns, *query), Statement::CreateFunction { name, args, diff --git a/src/binder/table.rs b/src/binder/table.rs index 8b196dc41..37c59a3fd 100644 --- a/src/binder/table.rs +++ b/src/binder/table.rs @@ -247,11 +247,11 @@ impl Binder { Ok(id) } - /// Returns a [`Table`](Node::Table) node. + /// Returns a [`Table`](Node::Table) node, `is_system` flag, and `is_view` flag. /// /// # Example /// - `bind_table_id(t)` => `$1` - pub(super) fn bind_table_id(&mut self, table_name: &ObjectName) -> Result<(Id, bool)> { + pub(super) fn bind_table_id(&mut self, table_name: &ObjectName) -> Result<(Id, bool, bool)> { let name = lower_case_name(table_name); let (schema_name, table_name) = split_name(&name)?; @@ -259,8 +259,13 @@ impl Binder { .catalog .get_table_id_by_name(schema_name, table_name) .ok_or_else(|| BindError::InvalidTable(table_name.into()))?; + let table = self.catalog.get_table(&table_ref_id).unwrap(); let id = self.egraph.add(Node::Table(table_ref_id)); - Ok((id, table_ref_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID)) + Ok(( + id, + table_ref_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID, + table.is_view(), + )) } } @@ -278,7 +283,7 @@ mod tests { let catalog = Arc::new(RootCatalog::new()); let col_catalog = ColumnCatalog::new(0, ColumnDesc::new("a", DataType::Int32, false)); catalog - .add_table(1, "t".into(), vec![col_catalog], false, vec![]) + .add_table(1, "t".into(), vec![col_catalog], vec![]) .unwrap(); let stmts = parse("select x.b from (select a as b from t) as x").unwrap(); diff --git a/src/catalog/root.rs b/src/catalog/root.rs index 50950c035..cc9dc7d78 100644 --- a/src/catalog/root.rs +++ b/src/catalog/root.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use super::function::FunctionCatalog; use super::*; use crate::parser; +use crate::planner::RecExpr; /// The root of all catalogs. pub struct RootCatalog { @@ -78,12 +79,23 @@ impl RootCatalog { schema_id: SchemaId, name: String, columns: Vec, - is_materialized_view: bool, ordered_pk_ids: Vec, ) -> Result { let mut inner = self.inner.lock().unwrap(); let schema = inner.schemas.get_mut(&schema_id).unwrap(); - schema.add_table(name, columns, is_materialized_view, ordered_pk_ids) + schema.add_table(name, columns, ordered_pk_ids) + } + + pub fn add_view( + &self, + schema_id: SchemaId, + name: String, + columns: Vec, + query: RecExpr, + ) -> Result { + let mut inner = self.inner.lock().unwrap(); + let schema = inner.schemas.get_mut(&schema_id).unwrap(); + schema.add_view(name, columns, query) } pub fn drop_table(&self, table_ref_id: TableRefId) { @@ -170,7 +182,6 @@ impl Inner { column }) .collect(), - false, vec![], ) .expect("failed to add system table"); @@ -235,9 +246,7 @@ mod tests { assert_eq!(schema_catalog2.name(), RootCatalog::DEFAULT_SCHEMA_NAME); let col = ColumnCatalog::new(0, ColumnDesc::new("a", DataType::Int32, false)); - let table_id = catalog - .add_table(1, "t".into(), vec![col], false, vec![]) - .unwrap(); + let table_id = catalog.add_table(1, "t".into(), vec![col], vec![]).unwrap(); assert_eq!(table_id, 0); } } diff --git a/src/catalog/schema.rs b/src/catalog/schema.rs index 8a01815a1..b33547d31 100644 --- a/src/catalog/schema.rs +++ b/src/catalog/schema.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use super::function::FunctionCatalog; use super::*; +use crate::planner::RecExpr; /// The catalog of a schema. #[derive(Clone)] @@ -34,7 +35,6 @@ impl SchemaCatalog { &mut self, name: String, columns: Vec, - is_materialized_view: bool, ordered_pk_ids: Vec, ) -> Result { if self.table_idxs.contains_key(&name) { @@ -46,7 +46,6 @@ impl SchemaCatalog { table_id, name.clone(), columns, - is_materialized_view, ordered_pk_ids, )); self.table_idxs.insert(name, table_id); @@ -54,6 +53,28 @@ impl SchemaCatalog { Ok(table_id) } + pub(super) fn add_view( + &mut self, + name: String, + columns: Vec, + query: RecExpr, + ) -> Result { + if self.table_idxs.contains_key(&name) { + return Err(CatalogError::Duplicated("view", name)); + } + let table_id = self.next_table_id; + self.next_table_id += 1; + let table_catalog = Arc::new(TableCatalog::new_view( + table_id, + name.clone(), + columns, + query, + )); + self.table_idxs.insert(name, table_id); + self.tables.insert(table_id, table_catalog); + Ok(table_id) + } + pub(super) fn delete_table(&mut self, id: TableId) { let catalog = self.tables.remove(&id).unwrap(); self.table_idxs.remove(catalog.name()).unwrap(); @@ -130,7 +151,7 @@ mod tests { assert_eq!(schema_catalog.name(), "test"); let table_id = schema_catalog - .add_table("t".into(), col_catalogs, false, vec![]) + .add_table("t".into(), col_catalogs, vec![]) .unwrap(); assert_eq!(table_id, 0); diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 1d33af41b..451f5466c 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, HashMap}; use super::*; +use crate::planner::RecExpr; /// The catalog of a table. pub struct TableCatalog { @@ -12,11 +13,15 @@ pub struct TableCatalog { column_idxs: HashMap, columns: BTreeMap, - #[allow(dead_code)] - is_materialized_view: bool, + kind: TableKind, next_column_id: ColumnId, - #[allow(dead_code)] - ordered_pk_ids: Vec, + primary_key: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum TableKind { + Table, + View(RecExpr), } impl TableCatalog { @@ -24,17 +29,35 @@ impl TableCatalog { id: TableId, name: String, columns: Vec, - is_materialized_view: bool, - ordered_pk_ids: Vec, + primary_key: Vec, + ) -> TableCatalog { + Self::new_(id, name, columns, TableKind::Table, primary_key) + } + + pub fn new_view( + id: TableId, + name: String, + columns: Vec, + query: RecExpr, + ) -> TableCatalog { + Self::new_(id, name, columns, TableKind::View(query), vec![]) + } + + fn new_( + id: TableId, + name: String, + columns: Vec, + kind: TableKind, + primary_key: Vec, ) -> TableCatalog { let mut table_catalog = TableCatalog { id, name, column_idxs: HashMap::new(), columns: BTreeMap::new(), - is_materialized_view, + kind, next_column_id: 0, - ordered_pk_ids, + primary_key, }; table_catalog .add_column(ColumnCatalog::new( @@ -102,7 +125,19 @@ impl TableCatalog { } pub fn primary_keys(&self) -> Vec { - self.ordered_pk_ids.clone() + self.primary_key.clone() + } + + pub fn is_view(&self) -> bool { + matches!(self.kind, TableKind::View(_)) + } + + /// Returns the query if it is a view. + pub fn query(&self) -> Option<&RecExpr> { + match &self.kind { + TableKind::Table => None, + TableKind::View(query) => Some(query), + } } } @@ -116,7 +151,7 @@ mod tests { let col1 = ColumnCatalog::new(1, ColumnDesc::new("b", DataType::Bool, false)); let col_catalogs = vec![col0, col1]; - let table_catalog = TableCatalog::new(0, "t".into(), col_catalogs, false, vec![]); + let table_catalog = TableCatalog::new(0, "t".into(), col_catalogs, vec![]); assert!(!table_catalog.contains_column("c")); assert!(table_catalog.contains_column("a")); diff --git a/src/db.rs b/src/db.rs index d21c232a7..94ba4aa5d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -126,6 +126,9 @@ impl Database { continue; } for table in schema.all_tables().values() { + if table.is_view() { + continue; + } let table_id = TableRefId::new(schema.id(), table.id()); let table = storage.get_table(table_id)?; let txn = table.read().await?; diff --git a/src/executor/copy_from_file.rs b/src/executor/copy_from_file.rs index 63699ab29..b77aa5a7e 100644 --- a/src/executor/copy_from_file.rs +++ b/src/executor/copy_from_file.rs @@ -46,7 +46,7 @@ impl CopyFromFileExecutor { /// Read records from file using blocking IO. /// /// The read data chunks will be sent through `tx`. - fn read_file_blocking(self, tx: Sender) -> Result<(), ExecutorError> { + fn read_file_blocking(self, tx: Sender) -> Result<()> { let file = File::open(self.source.path)?; let file_size = file.metadata()?.len(); let mut buf_reader = BufReader::new(file); @@ -91,10 +91,7 @@ impl CopyFromFileExecutor { if !(record.len() == column_count || record.len() == column_count + 1 && record.get(column_count) == Some("")) { - return Err(ExecutorError::LengthMismatch { - expected: column_count, - actual: record.len(), - }); + return Err(Error::length_mismatch(column_count, record.len())); } size_count += record.as_slice().as_bytes().len(); @@ -102,12 +99,12 @@ impl CopyFromFileExecutor { // push a raw str row and send it if necessary if let Some(chunk) = chunk_builder.push_str_row(record.iter())? { bar.set_position(size_count as u64); - tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?; + tx.blocking_send(chunk).map_err(|_| Error::aborted())?; } } // send left chunk if let Some(chunk) = chunk_builder.take() { - tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?; + tx.blocking_send(chunk).map_err(|_| Error::aborted())?; } bar.finish(); Ok(()) diff --git a/src/executor/copy_to_file.rs b/src/executor/copy_to_file.rs index 55b57c68b..d257c1cab 100644 --- a/src/executor/copy_to_file.rs +++ b/src/executor/copy_to_file.rs @@ -40,7 +40,7 @@ impl CopyToFileExecutor { path: PathBuf, format: FileFormat, mut recver: mpsc::Receiver, - ) -> Result { + ) -> Result { let file = File::create(path)?; let mut writer = match format { FileFormat::Csv { diff --git a/src/executor/create.rs b/src/executor/create_table.rs similarity index 68% rename from src/executor/create.rs rename to src/executor/create_table.rs index bcf9b6e39..8adefffa1 100644 --- a/src/executor/create.rs +++ b/src/executor/create_table.rs @@ -8,7 +8,7 @@ use crate::storage::Storage; /// The executor of `create table` statement. pub struct CreateTableExecutor { - pub plan: CreateTable, + pub table: CreateTable, pub storage: Arc, } @@ -17,14 +17,13 @@ impl CreateTableExecutor { pub async fn execute(self) { self.storage .create_table( - self.plan.schema_id, - &self.plan.table_name, - &self.plan.columns, - &self.plan.ordered_pk_ids, + self.table.schema_id, + &self.table.table_name, + &self.table.columns, + &self.table.ordered_pk_ids, ) .await?; - let chunk = DataChunk::single(1); - yield chunk + yield DataChunk::single(1); } } diff --git a/src/executor/create_view.rs b/src/executor/create_view.rs new file mode 100644 index 000000000..b7bfbd097 --- /dev/null +++ b/src/executor/create_view.rs @@ -0,0 +1,26 @@ +// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0. + +use super::*; +use crate::binder::CreateTable; +use crate::catalog::RootCatalogRef; + +/// The executor of `create view` statement. +pub struct CreateViewExecutor { + pub table: CreateTable, + pub query: RecExpr, + pub catalog: RootCatalogRef, +} + +impl CreateViewExecutor { + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + pub async fn execute(self) { + self.catalog.add_view( + self.table.schema_id, + self.table.table_name, + self.table.columns, + self.query, + )?; + + yield DataChunk::single(1); + } +} diff --git a/src/executor/drop.rs b/src/executor/drop.rs index 0637f3e69..dfbc1c94f 100644 --- a/src/executor/drop.rs +++ b/src/executor/drop.rs @@ -3,20 +3,25 @@ use std::sync::Arc; use super::*; -use crate::binder::{BoundDrop, Object}; +use crate::catalog::{RootCatalogRef, TableRefId}; use crate::storage::Storage; /// The executor of `drop` statement. pub struct DropExecutor { - pub plan: BoundDrop, + pub tables: Vec, + pub catalog: RootCatalogRef, pub storage: Arc, } impl DropExecutor { #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] pub async fn execute(self) { - match self.plan.object { - Object::Table(id) => self.storage.drop_table(id).await?, + for table in self.tables { + if self.catalog.get_table(&table).unwrap().is_view() { + self.catalog.drop_table(table); + } else { + self.storage.drop_table(table).await?; + } } yield DataChunk::single(1); } diff --git a/src/executor/error.rs b/src/executor/error.rs new file mode 100644 index 000000000..7984380a5 --- /dev/null +++ b/src/executor/error.rs @@ -0,0 +1,94 @@ +// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0. + +use std::sync::Arc; + +use crate::catalog::CatalogError; +use crate::storage::TracedStorageError; +use crate::types::ConvertError; + +/// The result type of execution. +pub type Result = std::result::Result; + +/// The error type of execution. +#[derive(thiserror::Error, Debug, Clone)] +#[error(transparent)] +pub struct Error { + inner: Arc, +} + +#[derive(thiserror::Error, Debug)] +enum Inner { + #[error("storage error: {0}")] + Storage( + #[from] + #[backtrace] + TracedStorageError, + ), + #[error("catalog error: {0}")] + Catalog(#[from] CatalogError), + #[error("conversion error: {0}")] + Convert(#[from] ConvertError), + #[error("io error: {0}")] + Io(#[from] std::io::Error), + #[error("csv error: {0}")] + Csv(#[from] csv::Error), + #[error("tuple length mismatch: expected {expected} but got {actual}")] + LengthMismatch { expected: usize, actual: usize }, + #[error("exceed char/varchar length limit: item length {length} > char/varchar width {width}")] + ExceedLengthLimit { length: u64, width: u64 }, + #[error("value can not be null")] + NotNullable, + #[error("abort")] + Aborted, +} + +impl From for Error { + fn from(e: Inner) -> Self { + Error { inner: Arc::new(e) } + } +} + +impl From for Error { + fn from(e: TracedStorageError) -> Self { + Inner::from(e).into() + } +} + +impl From for Error { + fn from(e: CatalogError) -> Self { + Inner::from(e).into() + } +} + +impl From for Error { + fn from(e: ConvertError) -> Self { + Inner::from(e).into() + } +} + +impl From for Error { + fn from(e: std::io::Error) -> Self { + Inner::from(e).into() + } +} + +impl From for Error { + fn from(e: csv::Error) -> Self { + Inner::from(e).into() + } +} + +impl Error { + pub fn length_mismatch(expected: usize, actual: usize) -> Self { + Inner::LengthMismatch { expected, actual }.into() + } + pub fn not_nullable() -> Self { + Inner::NotNullable.into() + } + pub fn exceed_length_limit(length: u64, width: u64) -> Self { + Inner::ExceedLengthLimit { length, width }.into() + } + pub fn aborted() -> Self { + Inner::Aborted.into() + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 6a502b71b..889ae951d 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -12,19 +12,24 @@ //! //! [`try_stream`]: async_stream::try_stream +use std::collections::HashMap; use std::sync::Arc; use egg::{Id, Language}; -use futures::stream::{BoxStream, Stream, StreamExt}; +use futures::stream::{BoxStream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; // use minitrace::prelude::*; use self::copy_from_file::*; use self::copy_to_file::*; -use self::create::*; +use self::create_function::*; +use self::create_table::*; +use self::create_view::*; use self::delete::*; use self::drop::*; +pub use self::error::Error as ExecutorError; +use self::error::*; use self::evaluator::*; use self::explain::*; use self::filter::*; @@ -46,16 +51,16 @@ use self::top_n::TopNExecutor; use self::values::*; use self::window::*; use crate::array::DataChunk; -use crate::catalog::RootCatalog; -use crate::executor::create_function::CreateFunctionExecutor; +use crate::catalog::{RootCatalog, RootCatalogRef, TableRefId}; use crate::planner::{Expr, ExprAnalysis, Optimizer, RecExpr, TypeSchemaAnalysis}; -use crate::storage::{Storage, TracedStorageError}; -use crate::types::{ColumnIndex, ConvertError, DataType}; +use crate::storage::Storage; +use crate::types::{ColumnIndex, DataType}; mod copy_from_file; mod copy_to_file; -mod create; mod create_function; +mod create_table; +mod create_view; mod delete; mod drop; mod evaluator; @@ -69,6 +74,7 @@ mod nested_loop_join; mod order; mod system_table_scan; // mod perfect_hash_agg; +mod error; mod merge_join; mod projection; mod simple_agg; @@ -78,40 +84,6 @@ mod top_n; mod values; mod window; -/// The error type of execution. -#[derive(thiserror::Error, Debug)] -pub enum ExecutorError { - #[error("storage error: {0}")] - Storage( - #[from] - #[backtrace] - #[source] - TracedStorageError, - ), - #[error("conversion error: {0}")] - Convert(#[from] ConvertError), - #[error("tuple length mismatch: expected {expected} but got {actual}")] - LengthMismatch { expected: usize, actual: usize }, - #[error("io error")] - Io( - #[from] - #[source] - std::io::Error, - ), - #[error("csv error")] - Csv( - #[from] - #[source] - csv::Error, - ), - #[error("value can not be null")] - NotNullable, - #[error("exceed char/varchar length limit: item length {length} > char/varchar width {width}")] - ExceedLengthLimit { length: u64, width: u64 }, - #[error("abort")] - Abort, -} - /// The maximum chunk length produced by executor at a time. const PROCESSING_WINDOW_SIZE: usize = 1024; @@ -121,7 +93,7 @@ const PROCESSING_WINDOW_SIZE: usize = 1024; /// /// It consumes one or more streams from its child executors, /// and produces a stream to its parent. -pub type BoxedExecutor = BoxStream<'static, Result>; +pub type BoxedExecutor = BoxStream<'static, Result>; pub fn build(optimizer: Optimizer, storage: Arc, plan: &RecExpr) -> BoxedExecutor { Builder::new(optimizer, storage, plan).build() @@ -133,6 +105,9 @@ struct Builder { optimizer: Optimizer, egraph: egg::EGraph, root: Id, + /// For scans on views, we prebuild their executors and store them here. + /// Multiple scans on the same view will share the same executor. + views: HashMap, } impl Builder { @@ -142,15 +117,31 @@ impl Builder { catalog: optimizer.catalog().clone(), }); let root = egraph.add_expr(plan); + + // recursively build for all views + let mut views = HashMap::new(); + for node in plan.as_ref() { + if let Expr::Table(tid) = node + && let Some(query) = optimizer.catalog().get_table(tid).unwrap().query() + { + let builder = Self::new(optimizer.clone(), storage.clone(), query); + let subscriber = builder.build_subscriber(); + views.insert(*tid, subscriber); + } + } + Builder { storage, optimizer, egraph, root, + views, } } + /// Get the node from id. fn node(&self, id: Id) -> &Expr { + // each e-class has exactly one node since there is no rewrite or union. &self.egraph[id].nodes[0] } @@ -179,22 +170,64 @@ impl Builder { }) } + /// Returns the catalog. + fn catalog(&self) -> &RootCatalogRef { + self.optimizer.catalog() + } + + /// Builds the executor. fn build(self) -> BoxedExecutor { self.build_id(self.root) } + /// Builds the executor and returns its subscriber. + fn build_subscriber(self) -> StreamSubscriber { + self.build_id_subscriber(self.root) + } + + /// Builds the executor for the given id. fn build_id(&self, id: Id) -> BoxedExecutor { + self.build_id_subscriber(id).subscribe() + } + + /// Builds the executor for the given id and returns its subscriber. + fn build_id_subscriber(&self, id: Id) -> StreamSubscriber { use Expr::*; let stream = match self.node(id).clone() { Scan([table, list, filter]) => { let table_id = self.node(table).as_table(); let columns = (self.node(list).as_list().iter()) .map(|id| self.node(*id).as_column()) - .collect(); + .collect_vec(); + // analyze range filter + let filter = { + let mut egraph = egg::EGraph::new(ExprAnalysis::default()); + let root = egraph.add_expr(&self.recexpr(filter)); + egraph[root].data.range.clone().map(|(_, r)| r) + }; + + if let Some(subscriber) = self.views.get(&table_id) { + // scan a view + assert!( + filter.is_none(), + "range filter is not supported in view scan" + ); + + // resolve column index + // child schema: [$v.0, $v.1, ...] + let mut projs = RecExpr::default(); + let lists = columns + .iter() + .map(|c| { + projs.add(ColumnIndex(crate::types::ColumnIndex(c.column_id as _))) + }) + .collect(); + projs.add(List(lists)); - if table_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID { + ProjectionExecutor { projs }.execute(subscriber.subscribe()) + } else if table_id.schema_id == RootCatalog::SYSTEM_SCHEMA_ID { SystemTableScan { - catalog: self.optimizer.catalog().clone(), + catalog: self.catalog().clone(), storage: self.storage.clone(), table_id, columns, @@ -204,12 +237,7 @@ impl Builder { TableScanExecutor { table_id, columns, - filter: { - // analyze range for the filter - let mut egraph = egg::EGraph::new(ExprAnalysis::default()); - let root = egraph.add_expr(&self.recexpr(filter)); - egraph[root].data.range.clone().map(|(_, r)| r) - }, + filter, storage: self.storage.clone(), } .execute() @@ -309,20 +337,30 @@ impl Builder { } .execute(self.build_id(child)), - CreateTable(plan) => CreateTableExecutor { - plan, + CreateTable(table) => CreateTableExecutor { + table, storage: self.storage.clone(), } .execute(), + CreateView([table, query]) => CreateViewExecutor { + table: self.node(table).as_create_table(), + query: self.recexpr(query), + catalog: self.catalog().clone(), + } + .execute(), + CreateFunction(f) => CreateFunctionExecutor { f, catalog: self.optimizer.catalog().clone(), } .execute(), - Drop(plan) => DropExecutor { - plan, + Drop(tables) => DropExecutor { + tables: (self.node(tables).as_list().iter()) + .map(|id| self.node(*id).as_table()) + .collect(), + catalog: self.catalog().clone(), storage: self.storage.clone(), } .execute(), @@ -390,34 +428,57 @@ impl Builder { } /// Spawn a new task to execute the given stream. -fn spawn(name: &str, mut stream: BoxedExecutor) -> BoxedExecutor { - let (tx, rx) = tokio::sync::mpsc::channel(16); +fn spawn(name: &str, mut stream: BoxedExecutor) -> StreamSubscriber { + let (tx, rx) = async_broadcast::broadcast(16); let handle = tokio::task::Builder::default() .name(name) .spawn(async move { while let Some(item) = stream.next().await { - if tx.send(item).await.is_err() { + if tx.broadcast(item).await.is_err() { + // all receivers are dropped, stop the task. return; } } }) .expect("failed to spawn task"); - use std::pin::Pin; - use std::task::{Context, Poll}; - struct SpawnedStream { - rx: tokio::sync::mpsc::Receiver>, - handle: tokio::task::JoinHandle<()>, + + StreamSubscriber { + rx: rx.deactivate(), + handle: Arc::new(AbortOnDropHandle(handle)), } - impl Stream for SpawnedStream { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.rx.poll_recv(cx) +} + +/// A subscriber of an executor's output stream. +/// +/// New streams can be created by calling `subscribe`. +struct StreamSubscriber { + rx: async_broadcast::InactiveReceiver>, + handle: Arc, +} + +impl StreamSubscriber { + /// Subscribes an output stream from the executor. + fn subscribe(&self) -> BoxedExecutor { + #[try_stream(boxed, ok = DataChunk, error = ExecutorError)] + async fn to_stream( + rx: async_broadcast::Receiver>, + handle: Arc, + ) { + #[for_await] + for chunk in rx { + yield chunk?; + } + drop(handle); } + to_stream(self.rx.activate_cloned(), self.handle.clone()) } - impl Drop for SpawnedStream { - fn drop(&mut self) { - self.handle.abort(); - } +} + +/// A wrapper over `JoinHandle` that aborts the task when dropped. +struct AbortOnDropHandle(tokio::task::JoinHandle<()>); + +impl Drop for AbortOnDropHandle { + fn drop(&mut self) { + self.0.abort(); } - Box::pin(SpawnedStream { rx, handle }) } diff --git a/src/executor/system_table_scan.rs b/src/executor/system_table_scan.rs index c9b1c74c2..03273ea84 100644 --- a/src/executor/system_table_scan.rs +++ b/src/executor/system_table_scan.rs @@ -172,10 +172,7 @@ fn pg_attribute(catalog: RootCatalogRef) -> DataChunk { } /// Returns `pg_stat` table. -async fn pg_stat( - catalog: RootCatalogRef, - storage: &impl Storage, -) -> Result { +async fn pg_stat(catalog: RootCatalogRef, storage: &impl Storage) -> Result { // let mut schema_id = I32ArrayBuilder::new(); // let mut table_id = I32ArrayBuilder::new(); // let mut column_id = I32ArrayBuilder::new(); diff --git a/src/planner/explain.rs b/src/planner/explain.rs index c75d57ba2..4d32d402e 100644 --- a/src/planner/explain.rs +++ b/src/planner/explain.rs @@ -338,12 +338,17 @@ impl<'a> Explain<'a> { let fields = t.pretty_table().with(cost, rows); Pretty::childless_record("CreateTable", fields) } + CreateView([table, query]) => Pretty::simple_record( + "CreateView", + vec![("table", self.expr(table).pretty())].with(cost, rows), + vec![self.expr(query).pretty()], + ), CreateFunction(f) => { let v = f.pretty_function(); Pretty::childless_record("CreateFunction", v) } - Drop(t) => { - let fields = t.pretty_table().with(cost, rows); + Drop(tables) => { + let fields = vec![("objects", self.expr(tables).pretty())].with(cost, rows); Pretty::childless_record("Drop", fields) } Insert([table, cols, child]) => Pretty::simple_record( diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 1ca63f8d2..ea29762d2 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -3,7 +3,7 @@ use egg::{define_language, Id, Symbol}; use crate::binder::copy::ExtSource; -use crate::binder::{BoundDrop, CreateFunction, CreateTable}; +use crate::binder::{CreateFunction, CreateTable}; use crate::catalog::{ColumnRefId, TableRefId}; use crate::parser::{BinaryOperator, UnaryOperator}; use crate::types::{ColumnIndex, DataType, DataValue, DateTimeField}; @@ -31,7 +31,6 @@ define_language! { Column(ColumnRefId), // $1.2, $2.1, ... Table(TableRefId), // $1, $2, ... ColumnIndex(ColumnIndex), // #0, #1, ... - ExtSource(ExtSource), // utilities "ref" = Ref(Id), // (ref expr) @@ -117,11 +116,13 @@ define_language! { // output = child || exprs CreateTable(CreateTable), CreateFunction(CreateFunction), - Drop(BoundDrop), + "create_view" = CreateView([Id; 2]), // (create_view create_table child) + "drop" = Drop(Id), // (drop [table..]) "insert" = Insert([Id; 3]), // (insert table [column..] child) "delete" = Delete([Id; 2]), // (delete table child) "copy_from" = CopyFrom([Id; 2]), // (copy_from dest types) "copy_to" = CopyTo([Id; 2]), // (copy_to dest child) + ExtSource(ExtSource), "explain" = Explain(Id), // (explain child) // internal functions @@ -181,6 +182,13 @@ impl Expr { t } + pub fn as_create_table(&self) -> CreateTable { + let Self::CreateTable(v) = self else { + panic!("not a create table: {self}") + }; + v.clone() + } + pub fn as_ext_source(&self) -> ExtSource { let Self::ExtSource(v) = self else { panic!("not an external source: {self}") diff --git a/src/storage/memory/mod.rs b/src/storage/memory/mod.rs index 5a741a819..3d508400d 100644 --- a/src/storage/memory/mod.rs +++ b/src/storage/memory/mod.rs @@ -84,7 +84,6 @@ impl Storage for InMemoryStorage { schema_id, table_name.into(), column_descs.to_vec(), - false, ordered_pk_ids.to_vec(), ) .map_err(|_| StorageError::Duplicated("table", table_name.into()))?; diff --git a/src/storage/secondary/manifest.rs b/src/storage/secondary/manifest.rs index 65ec26c73..dfe60b4af 100644 --- a/src/storage/secondary/manifest.rs +++ b/src/storage/secondary/manifest.rs @@ -205,7 +205,6 @@ impl SecondaryStorage { schema_id, table_name.clone(), column_descs.to_vec(), - false, ordered_pk_ids.clone(), ) .map_err(|_| TracedStorageError::duplicated("table", table_name))?; diff --git a/tests/sql/view.slt b/tests/sql/view.slt new file mode 100644 index 000000000..8feed3973 --- /dev/null +++ b/tests/sql/view.slt @@ -0,0 +1,71 @@ +statement ok +CREATE TABLE persons ( + id INT, + name VARCHAR, + gender VARCHAR -- M or F +); + +# FIXME: creating a view without column aliases is not supported +# statement ok +# CREATE VIEW males AS +# SELECT id, name +# FROM persons +# WHERE gender = 'M'; + +statement ok +CREATE VIEW males(id, name) AS +SELECT id, name +FROM persons +WHERE gender = 'M'; + +statement ok +CREATE VIEW females(id, name) AS +SELECT id, name +FROM persons +WHERE gender = 'F'; + +statement ok +CREATE VIEW male_females(mname, fname) AS +SELECT m.name, f.name +FROM males m, females f; + +query T +SELECT name FROM males; +---- + +query T +SELECT name FROM females; +---- + +query TT +SELECT * FROM male_females; +---- + +statement ok +INSERT INTO persons (id, name, gender) VALUES +(1, 'John', 'M'), +(2, 'Jane', 'F'), +(3, 'Mike', 'M'); + +query T +SELECT name FROM males; +---- +John +Mike + +query T +SELECT name FROM females; +---- +Jane + +query TT rowsort +SELECT * FROM male_females; +---- +John Jane +Mike Jane + +statement ok +DROP VIEW male_females, males, females; + +statement ok +DROP TABLE persons;