Skip to content

Commit

Permalink
feat: support view in memory (#837)
Browse files Browse the repository at this point in the history
This PR is a part of #796, adds support for creating, querying and
dropping views in memory.

The key implementations are: 
1. When creating a view, bind the query and store the logical plan with
the view in catalog.
2. When querying from a view, build executors for all views and then
build other plan nodes on top of them. Given that a view can be consumed
by multiple downstream nodes, we introduce `StreamSubscriber` to allow
multiple consumers of a stream.

Limitations:
1. We don't persist views in disk storage.
2. We don't support inferring schema from the query. Columns must be
defined explicitly when creating a view.
3. We don't maintain dependency relationship between tables and views.

---------

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Mar 19, 2024
1 parent 41d545f commit d492a45
Show file tree
Hide file tree
Showing 28 changed files with 630 additions and 207 deletions.
51 changes: 50 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ python = ["pyo3", "pyo3-build-config"]
[dependencies]
ahash = "0.8"
anyhow = "1"
async-broadcast = "0.7"
async-recursion = "1"
async-stream = "0.3"
async-trait = "0.1"
Expand Down
16 changes: 13 additions & 3 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ macro_rules! impl_array_builder {
(Self::$Abc(a), DataValue::$Value(v)) => a.push(Some(v)),
(Self::$Abc(a), DataValue::Null) => a.push(None),
)*
_ => panic!("failed to push value: type mismatch"),
(b, v) => panic!("failed to push value: type mismatch. builder: {}, value: {:?}", b.type_string(), v),
}
}

Expand All @@ -419,7 +419,7 @@ macro_rules! impl_array_builder {
(Self::$Abc(a), DataValue::$Value(v)) => a.push_n(n, Some(v)),
(Self::$Abc(a), DataValue::Null) => a.push_n(n, None),
)*
_ => panic!("failed to push value: type mismatch"),
(b, v) => panic!("failed to push value: type mismatch. builder: {}, value: {:?}", b.type_string(), v),
}
}

Expand Down Expand Up @@ -450,7 +450,17 @@ macro_rules! impl_array_builder {
$(
(Self::$Abc(builder), ArrayImpl::$Abc(arr)) => builder.append(arr),
)*
_ => panic!("failed to push value: type mismatch"),
(b, a) => panic!("failed to append array: type mismatch. builder: {}, array: {}", b.type_string(), a.type_string()),
}
}

/// Return a string describing the type of this array.
fn type_string(&self) -> &'static str {
match self {
Self::Null(_) => "NULL",
$(
Self::$Abc(_) => stringify!($Abc),
)*
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Binder {
} => (table_name, columns),
CopySource::Query(_) => return Err(BindError::Todo("copy from query".into())),
};
let (table, _) = self.bind_table_id(&table_name)?;
let (table, is_system, is_view) = self.bind_table_id(&table_name)?;

let cols = self.bind_table_columns(&table_name, &columns)?;

Expand All @@ -81,6 +81,11 @@ impl Binder {
self.egraph.add(Node::CopyTo([ext_source, scan]))
} else {
// COPY <dest_table> FROM <source_file>
if is_system {
return Err(BindError::CopyTo("system table".into()));
} else if is_view {
return Err(BindError::CopyTo("view".into()));
}
let types = self.type_(cols)?;
let types = self.egraph.add(Node::Type(types));
let copy = self.egraph.add(Node::CopyFrom([ext_source, types]));
Expand Down
1 change: 0 additions & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ impl Binder {
.collect();

for &index in &ordered_pk_ids {
columns[index as usize].set_primary(true);
columns[index as usize].set_nullable(false);
}

Expand Down
61 changes: 61 additions & 0 deletions src/binder/create_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::HashSet;

use super::*;
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnId};

impl Binder {
pub(super) fn bind_create_view(
&mut self,
name: ObjectName,
columns: Vec<Ident>,
query: Query,
) -> Result {
let name = lower_case_name(&name);
let (schema_name, table_name) = split_name(&name)?;
let schema = self
.catalog
.get_schema_by_name(schema_name)
.ok_or_else(|| BindError::InvalidSchema(schema_name.into()))?;
if schema.get_table_by_name(table_name).is_some() {
return Err(BindError::TableExists(table_name.into()));
}

// check duplicated column names
let mut set = HashSet::new();
for col in &columns {
if !set.insert(col.value.to_lowercase()) {
return Err(BindError::ColumnExists(col.value.to_lowercase()));
}
}

let (query, _) = self.bind_query(query)?;
let query_type = self.type_(query)?;
let output_types = query_type.as_struct();

// TODO: support inferring column names from query
if columns.len() != output_types.len() {
return Err(BindError::ViewAliasesMismatch);
}

let columns: Vec<ColumnCatalog> = columns
.into_iter()
.zip(output_types)
.enumerate()
.map(|(idx, (name, ty))| {
ColumnCatalog::new(
idx as ColumnId,
ColumnDesc::new(name.value, ty.clone(), true),
)
})
.collect();

let table = self.egraph.add(Node::CreateTable(CreateTable {
schema_id: schema.id(),
table_name: table_name.into(),
columns,
ordered_pk_ids: vec![],
}));
let create_view = self.egraph.add(Node::CreateView([table, query]));
Ok(create_view)
}
}
6 changes: 3 additions & 3 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ impl Binder {
let TableFactor::Table { name, alias, .. } = &from[0].relation else {
return Err(BindError::Todo(format!("delete from {from:?}")));
};
let (table_id, is_internal) = self.bind_table_id(name)?;
if is_internal {
return Err(BindError::NotSupportedOnInternalTable);
let (table_id, is_system, is_view) = self.bind_table_id(name)?;
if is_system || is_view {
return Err(BindError::CanNotDelete);
}
let scan = self.bind_table_def(name, alias.clone(), true)?;
let cond = self.bind_where(selection)?;
Expand Down
86 changes: 19 additions & 67 deletions src/binder/drop.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,7 @@
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.

use std::result::Result as RawResult;
use std::str::FromStr;

use pretty_xmlish::helper::delegate_fmt;
use pretty_xmlish::Pretty;
use serde::{Deserialize, Serialize};

use super::*;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Serialize, Deserialize)]
pub struct BoundDrop {
pub object: Object,
pub if_exists: bool,
pub cascade: bool,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Serialize, Deserialize)]
pub enum Object {
Table(TableRefId),
}

impl std::fmt::Display for BoundDrop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let explainer = Pretty::childless_record("Drop", self.pretty_table());
delegate_fmt(&explainer, f, String::with_capacity(1000))
}
}

impl BoundDrop {
pub fn pretty_table<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
vec![
("object", Pretty::display(&self.object)),
("if_exists", Pretty::display(&self.if_exists)),
("cascade", Pretty::display(&self.cascade)),
]
}
}

impl std::fmt::Display for Object {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Object::Table(table_id_ref) => write!(f, "table {}", table_id_ref),
}
}
}

impl FromStr for BoundDrop {
type Err = ();

fn from_str(_s: &str) -> RawResult<Self, Self::Err> {
Err(())
}
}

impl Binder {
pub(super) fn bind_drop(
&mut self,
Expand All @@ -62,22 +10,26 @@ impl Binder {
names: Vec<ObjectName>,
cascade: bool,
) -> Result {
match object_type {
ObjectType::Table => {
let name = lower_case_name(&names[0]);
let (schema_name, table_name) = split_name(&name)?;
let table_ref_id = self
.catalog
.get_table_id_by_name(schema_name, table_name)
.ok_or_else(|| BindError::InvalidTable(table_name.into()))?;

Ok(self.egraph.add(Node::Drop(BoundDrop {
object: Object::Table(table_ref_id),
if_exists,
cascade,
})))
if !matches!(object_type, ObjectType::Table | ObjectType::View) {
return Err(BindError::Todo(format!("drop {object_type:?}")));
}
if cascade {
return Err(BindError::Todo("cascade drop".into()));
}
let mut table_ids = Vec::with_capacity(names.len());
for name in names {
let name = lower_case_name(&name);
let (schema_name, table_name) = split_name(&name)?;
let result = self.catalog.get_table_id_by_name(schema_name, table_name);
if if_exists && result.is_none() {
continue;
}
_ => Err(BindError::UnsupportedObjectName(object_type)),
let table_id = result.ok_or_else(|| BindError::InvalidTable(table_name.into()))?;
let id = self.egraph.add(Node::Table(table_id));
table_ids.push(id);
}
let list = self.egraph.add(Node::List(table_ids.into()));
let drop = self.egraph.add(Node::Drop(list));
Ok(drop)
}
}
6 changes: 3 additions & 3 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ impl Binder {
columns: Vec<Ident>,
source: Box<Query>,
) -> Result {
let (table, is_internal) = self.bind_table_id(&table_name)?;
if is_internal {
return Err(BindError::NotSupportedOnInternalTable);
let (table, is_internal, is_view) = self.bind_table_id(&table_name)?;
if is_internal || is_view {
return Err(BindError::CanNotInsert);
}
let cols = self.bind_table_columns(&table_name, &columns)?;
let source = self.bind_query(*source)?.0;
Expand Down
Loading

0 comments on commit d492a45

Please sign in to comment.