Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: databend-meta transaction support generic bool-expression and else-if chain #17064

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 49 additions & 68 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,18 +439,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
db_meta.drop_on = None;

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
txn_cond_seq(name_key, Eq, 0),
txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq),
txn_cond_seq(&dbid, Eq, db_meta_seq),
],
if_then: vec![
vec![
txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id
txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -594,11 +593,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
), /* __fd_database_id_to_name/<db_id> -> (tenant,db_name) */
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -1203,19 +1198,19 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&save_key_table_id_list, Eq, tb_id_list_seq),
]);

txn.if_then.extend( vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
txn_op_put(
key_table_id,
serialize_struct(&req.table_meta)?,
), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
// This record does not need to assert `table_id_to_name_key == 0`,
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
]);
txn.if_then.extend(vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
txn_op_put(
key_table_id,
serialize_struct(&req.table_meta)?,
), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
// This record does not need to assert `table_id_to_name_key == 0`,
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
]);

if req.as_dropped {
// To create the table in a "dropped" state,
Expand Down Expand Up @@ -1401,8 +1396,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
tb_id_list.pop();
new_tb_id_list.append(table_id);

let mut txn = TxnRequest {
condition: vec![
let mut txn = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq),
Expand All @@ -1416,7 +1411,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq),
txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq),
],
if_then: vec![
vec![
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
// Changing a table in a db has to update the seq of db_meta,
Expand All @@ -1426,8 +1421,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
],
else_then: vec![],
};
);

if *seq_db_id.data != *new_seq_db_id.data {
txn.if_then.push(
Expand Down Expand Up @@ -1909,8 +1903,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
tb_meta.drop_on = None;

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta_seq),
Expand All @@ -1921,7 +1915,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq),
txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq),
],
if_then: vec![
vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
Expand All @@ -1931,8 +1925,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_del(&orphan_dbid_tbname_idlist), // del orphan table idlist
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list.data)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -2061,16 +2054,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// non-changed ones.

for chunk in copied_files.chunks(chunk_size as usize) {
let txn = TxnRequest {
condition: vec![],
if_then: chunk
let txn = TxnRequest::new(
vec![],
chunk
.iter()
.map(|(name, seq_file)| {
TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq()))
})
.collect(),
else_then: vec![],
};
);

let (_succ, _responses) = send_txn(self, txn).await?;
}
Expand Down Expand Up @@ -2377,16 +2369,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

let mut txn_req = TxnRequest {
condition: vec![
let mut txn_req = TxnRequest::new(
vec![
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
if_then: vec![
vec![
txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};
);

let _ = update_mask_policy(self, &req.action, &mut txn_req, &req.tenant, req.table_id)
.await;
Expand Down Expand Up @@ -2485,13 +2476,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest {
condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
if_then: vec![
let txn_req = TxnRequest::new(
//
vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
vec![
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -2540,16 +2531,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
indexes.remove(&req.name);

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
if_then: vec![
vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;
debug!(id :? =(&tbid),succ = succ;"drop_table_index");
Expand Down Expand Up @@ -2742,11 +2732,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&id_generator, b"".to_vec()),
txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?,
];
let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);
let (succ, _responses) = send_txn(self, txn_req).await?;

if succ {
Expand Down Expand Up @@ -3053,11 +3039,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put_pb(&new_name_ident, &dict_id.data, None)?, // put new dict name
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -4042,8 +4024,8 @@ async fn handle_undrop_table(
// reset drop on time
seq_table_meta.drop_on = None;

let txn = TxnRequest {
condition: vec![
let txn = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq),
Expand All @@ -4052,15 +4034,14 @@ async fn handle_undrop_table(
// table is not changed
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
],
if_then: vec![
vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(kv_api, txn).await?;

Expand Down
6 changes: 1 addition & 5 deletions src/meta/api/src/sequence_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {
txn_op_put_pb(&ident, &sequence_meta, None)?, // name -> meta
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down
6 changes: 1 addition & 5 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,7 @@ async fn benchmark_table_copy_file(
serde_json::from_str(param).unwrap()
};

let mut txn = TxnRequest {
condition: vec![],
if_then: vec![],
else_then: vec![],
};
let mut txn = TxnRequest::default();

for file_index in 0..param.file_cnt {
let copied_file_ident = TableCopiedFileNameIdent {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// 🖥 server: add `txn_condition::Target::KeysWithPrefix`,
/// to support matching the key count by a prefix.
///
/// - 2024-12-1*: since 1.2.*
/// 🖥 server: add `TxnRequest::condition_tree`,
/// to specify a complex bool expression.
///
///
/// Server feature set:
/// ```yaml
Expand Down
1 change: 1 addition & 0 deletions src/meta/kvapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
databend-common-base = { workspace = true }
databend-common-meta-types = { workspace = true }
fastrace = { workspace = true }
futures-util = { workspace = true }
Expand Down
Loading
Loading