Skip to content

Commit

Permalink
feat: databend-meta transaction support generic bool-expression
Browse files Browse the repository at this point in the history
Since this commit, application is allowed to specify a complex bool
expressions as the transaction predicate.

For example, the following transaction will be processed if:
`(a == 1 || b == 2) && (x == 3 or y == 4)`

```rust
let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val));

TxnRequest{
    condition: vec![],
    condition_tree: Some(
            eq("a", 1).or(eq("b", 2))
            .and(
                eq("x", 3).or(eq("y", 4))
            ))
    // ...
}
```

For backward compatibility, both already existing `condition` and the new
`condition_tree` will be evaluated: transaction will be processed only
when both of them evaluated to be `true`.
  • Loading branch information
drmingdrmer committed Dec 17, 2024
1 parent 77e9775 commit 2e0001b
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq),
txn_cond_seq(&dbid, Eq, db_meta_seq),
],
condition_tree: None,
if_then: vec![
txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id
txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta
Expand Down Expand Up @@ -596,6 +597,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let txn_req = TxnRequest {
condition,
condition_tree: None,
if_then,
else_then: vec![],
};
Expand Down Expand Up @@ -1416,6 +1418,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq),
txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq),
],
condition_tree: None,
if_then: vec![
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
Expand Down Expand Up @@ -1921,6 +1924,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq),
txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq),
],
condition_tree: None,
if_then: vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
Expand Down Expand Up @@ -2063,6 +2067,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
for chunk in copied_files.chunks(chunk_size as usize) {
let txn = TxnRequest {
condition: vec![],
condition_tree: None,
if_then: chunk
.iter()
.map(|(name, seq_file)| {
Expand Down Expand Up @@ -2382,6 +2387,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
condition_tree: None,
if_then: vec![
txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta
],
Expand Down Expand Up @@ -2487,6 +2493,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let txn_req = TxnRequest {
condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
condition_tree: None,
if_then: vec![
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
],
Expand Down Expand Up @@ -2545,6 +2552,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
condition_tree: None,
if_then: vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
Expand Down Expand Up @@ -2744,6 +2752,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
];
let txn_req = TxnRequest {
condition,
condition_tree: None,
if_then,
else_then: vec![],
};
Expand Down Expand Up @@ -3055,6 +3064,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

let txn_req = TxnRequest {
condition,
condition_tree: None,
if_then,
else_then: vec![],
};
Expand Down Expand Up @@ -4052,6 +4062,7 @@ async fn handle_undrop_table(
// table is not changed
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
],
condition_tree: None,
if_then: vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
Expand Down
1 change: 1 addition & 0 deletions src/meta/api/src/sequence_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {

let txn_req = TxnRequest {
condition,
condition_tree: None,
if_then,
else_then: vec![],
};
Expand Down
4 changes: 4 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// 🖥 server: add `txn_condition::Target::KeysWithPrefix`,
/// to support matching the key count by a prefix.
///
/// - 2024-12-1*: since 1.2.*
/// 🖥 server: add `TxnRequest::condition_tree`,
/// to specify a complex bool expression.
///
///
/// Server feature set:
/// ```yaml
Expand Down
1 change: 1 addition & 0 deletions src/meta/kvapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
databend-common-base = { workspace = true }
databend-common-meta-types = { workspace = true }
fastrace = { workspace = true }
futures-util = { workspace = true }
Expand Down
Loading

0 comments on commit 2e0001b

Please sign in to comment.