Skip to content

Commit

Permalink
feat: support alter relation name in catalog controller (#12977)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Oct 23, 2023
1 parent a3d2c4b commit 903daed
Show file tree
Hide file tree
Showing 14 changed files with 969 additions and 92 deletions.
4 changes: 4 additions & 0 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ impl Epoch {
Epoch(time << EPOCH_PHYSICAL_SHIFT_BITS)
}

pub fn from_unix_millis(mi: u64) -> Self {
Epoch((mi - UNIX_RISINGWAVE_DATE_SEC * 1000) << EPOCH_PHYSICAL_SHIFT_BITS)
}

pub fn physical_now() -> u64 {
UNIX_RISINGWAVE_DATE_EPOCH
.elapsed()
Expand Down
161 changes: 150 additions & 11 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
use std::iter;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_pb::catalog::{
PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo};
use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand All @@ -30,19 +31,21 @@ use sea_orm::{
};
use tokio::sync::RwLock;

use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs};
use crate::controller::utils::{
check_connection_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id,
ensure_object_not_refer, ensure_schema_empty, ensure_user_id, list_used_by, PartialObject,
ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_referring_objects,
get_referring_objects_cascade, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{MetaSrvEnv, NotificationVersion};
use crate::model_v2::connection::PrivateLinkService;
use crate::model_v2::object::ObjectType;
use crate::model_v2::prelude::*;
use crate::model_v2::{
connection, database, function, index, object, object_dependency, schema, table, view,
ConnectionId, DatabaseId, FunctionId, ObjectId, SchemaId, SourceId, TableId, UserId,
connection, database, function, index, object, object_dependency, schema, sink, source, table,
view, ConnectionId, DatabaseId, FunctionId, ObjectId, PrivateLinkService, SchemaId, SourceId,
TableId, UserId,
};
use crate::rpc::ddl_controller::DropMode;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -503,7 +506,7 @@ impl CatalogController {
assert_eq!(obj.obj_type, object_type);

let mut to_drop_objects = match drop_mode {
DropMode::Cascade => list_used_by(object_id, &txn).await?,
DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
DropMode::Restrict => {
ensure_object_not_refer(object_type, object_id, &txn).await?;
vec![]
Expand Down Expand Up @@ -589,39 +592,39 @@ impl CatalogController {
.into_iter()
.map(|obj| match obj.obj_type {
ObjectType::Table => PbRelation {
relation_info: Some(RelationInfo::Table(PbTable {
relation_info: Some(PbRelationInfo::Table(PbTable {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
..Default::default()
})),
},
ObjectType::Source => PbRelation {
relation_info: Some(RelationInfo::Source(PbSource {
relation_info: Some(PbRelationInfo::Source(PbSource {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
..Default::default()
})),
},
ObjectType::Sink => PbRelation {
relation_info: Some(RelationInfo::Sink(PbSink {
relation_info: Some(PbRelationInfo::Sink(PbSink {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
..Default::default()
})),
},
ObjectType::View => PbRelation {
relation_info: Some(RelationInfo::View(PbView {
relation_info: Some(PbRelationInfo::View(PbView {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
..Default::default()
})),
},
ObjectType::Index => PbRelation {
relation_info: Some(RelationInfo::Index(PbIndex {
relation_info: Some(PbRelationInfo::Index(PbIndex {
id: obj.oid,
schema_id: obj.schema_id.unwrap(),
database_id: obj.database_id.unwrap(),
Expand All @@ -647,6 +650,142 @@ impl CatalogController {
version,
))
}

pub async fn alter_relation_name(
&self,
object_type: ObjectType,
object_id: ObjectId,
object_name: &str,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let obj: PartialObject = Object::find_by_id(object_id)
.into_partial_model()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
assert_eq!(obj.obj_type, object_type);
check_relation_name_duplicate(
object_name,
obj.database_id.unwrap(),
obj.schema_id.unwrap(),
&txn,
)
.await?;

let mut to_update_relations = vec![];
// rename relation.
macro_rules! rename_relation {
($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
let (mut relation, obj) = $entity::find_by_id($object_id)
.find_also_related(Object)
.one(&txn)
.await?
.unwrap();
let old_name = relation.name.clone();
relation.name = object_name.into();
relation.definition = alter_relation_rename(&relation.definition, object_name);
let active_model = $table::ActiveModel {
$identity: ActiveValue::Set(relation.$identity),
name: ActiveValue::Set(object_name.into()),
definition: ActiveValue::Set(relation.definition.clone()),
..Default::default()
};
active_model.update(&txn).await?;
to_update_relations.push(PbRelation {
relation_info: Some(PbRelationInfo::$entity(
ObjectModel(relation, obj.unwrap()).into(),
)),
});
old_name
}};
}

let old_name = match object_type {
ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
ObjectType::Sink => rename_relation!(Sink, sink, sink_id, object_id),
ObjectType::View => rename_relation!(View, view, view_id, object_id),
ObjectType::Index => {
let (mut index, obj) = Index::find_by_id(object_id)
.find_also_related(Object)
.one(&txn)
.await?
.unwrap();
index.name = object_name.into();
let index_table_id = index.index_table_id;

// the name of index and its associated table is the same.
let active_model = index::ActiveModel {
index_id: ActiveValue::Set(index.index_id),
name: ActiveValue::Set(object_name.into()),
..Default::default()
};
active_model.update(&txn).await?;
to_update_relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Index(
ObjectModel(index, obj.unwrap()).into(),
)),
});
rename_relation!(Table, table, table_id, index_table_id)
}
_ => unreachable!("only relation name can be altered."),
};

// rename referring relation name.
macro_rules! rename_relation_ref {
($entity:ident, $table:ident, $identity:ident, $object_id:expr) => {{
let (mut relation, obj) = $entity::find_by_id($object_id)
.find_also_related(Object)
.one(&txn)
.await?
.unwrap();
relation.definition =
alter_relation_rename_refs(&relation.definition, &old_name, object_name);
let active_model = $table::ActiveModel {
$identity: ActiveValue::Set(relation.$identity),
definition: ActiveValue::Set(relation.definition.clone()),
..Default::default()
};
active_model.update(&txn).await?;
to_update_relations.push(PbRelation {
relation_info: Some(PbRelationInfo::$entity(
ObjectModel(relation, obj.unwrap()).into(),
)),
});
}};
}
let objs = get_referring_objects(object_id, &txn).await?;
for obj in objs {
match obj.obj_type {
ObjectType::Table => rename_relation_ref!(Table, table, table_id, obj.oid),
ObjectType::Sink => rename_relation_ref!(Sink, sink, sink_id, obj.oid),
ObjectType::View => rename_relation_ref!(View, view, view_id, obj.oid),
ObjectType::Index => {
let index_table_id: Option<TableId> = Index::find_by_id(obj.oid)
.select_only()
.column(index::Column::IndexTableId)
.into_tuple()
.one(&txn)
.await?;
rename_relation_ref!(Table, table, table_id, index_table_id.unwrap());
}
_ => bail!("only table, sink, view and index depend on other objects."),
}
}
txn.commit().await?;

let version = self
.notify_frontend(
NotificationOperation::Update,
NotificationInfo::RelationGroup(PbRelationGroup {
relations: to_update_relations,
}),
)
.await;

Ok(version)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 903daed

Please sign in to comment.