Skip to content

Commit

Permalink
feat: databend-meta transaction support generic bool-expression and e…
Browse files Browse the repository at this point in the history
…lse-if chain

Since this commit, application is allowed to specify a complex bool
expressions as the transaction predicate.

For example, the transaction will execute as if running the following
pseudo codes:
```
if (a == 1 || b == 2) && (x == 3 || y == 4) { ops1 }
else if (x == 2 || y == 1) { ops2 }
else if (y == 3 && z == 4) { ops3 }
else { ops4 }
```

```rust
let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val));

TxnRequest{
    operations: vec![
                BoolExpression::new(
                    Some(eq("a", 1).or(eq("b", 2))
                         .and(eq("x", 3).or(eq("y", 4)))),
                    ops1),
                BoolExpression::new(
                    Some(eq("x", 2).or(eq("y", 1))),
                    ops2),
            ],

    condition: vec![eq("y", 3), eq("z", 4)],
    if_then: ops3,
    else_then: ops4,
}
```

For backward compatibility, both already existing `condition` and the new
`operations` will be evaluated: transaction handler evaluate the
`operations` first. If there is a met condition, execute and return.
Otherwise, it evaluate `condition` and then execute `if_then` branch or
`else_then` branch.

TxnReply changes:

Add field `execution_path` to indicate the executed branch, which is one
of:
- `"operation:<index>"`, operation at `index` is executed.
- `"then"`: `if_then` is executed.
- `"else"`: `else_then` is executed.

`TxnReply.success` is set to `false` only when `else` is executed.
  • Loading branch information
drmingdrmer committed Dec 20, 2024
1 parent f7a4a5d commit e092f8e
Show file tree
Hide file tree
Showing 21 changed files with 955 additions and 292 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 49 additions & 68 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,18 +439,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
db_meta.drop_on = None;

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
txn_cond_seq(name_key, Eq, 0),
txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq),
txn_cond_seq(&dbid, Eq, db_meta_seq),
],
if_then: vec![
vec![
txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id
txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -594,11 +593,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
), /* __fd_database_id_to_name/<db_id> -> (tenant,db_name) */
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -1203,19 +1198,19 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&save_key_table_id_list, Eq, tb_id_list_seq),
]);

txn.if_then.extend( vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
txn_op_put(
key_table_id,
serialize_struct(&req.table_meta)?,
), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
// This record does not need to assert `table_id_to_name_key == 0`,
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
]);
txn.if_then.extend(vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&key_dbid, serialize_struct(&db_meta.data)?), /* (db_id) -> db_meta */
txn_op_put(
key_table_id,
serialize_struct(&req.table_meta)?,
), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&save_key_table_id_list, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
// This record does not need to assert `table_id_to_name_key == 0`,
// Because this is a reverse index for db_id/table_name -> table_id, and it is unique.
txn_op_put(&key_table_id_to_name, serialize_struct(&key_dbid_tbname)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
]);

if req.as_dropped {
// To create the table in a "dropped" state,
Expand Down Expand Up @@ -1401,8 +1396,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
tb_id_list.pop();
new_tb_id_list.append(table_id);

let mut txn = TxnRequest {
condition: vec![
let mut txn = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq),
Expand All @@ -1416,7 +1411,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq),
txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq),
],
if_then: vec![
vec![
txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
// Changing a table in a db has to update the seq of db_meta,
Expand All @@ -1426,8 +1421,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
],
else_then: vec![],
};
);

if *seq_db_id.data != *new_seq_db_id.data {
txn.if_then.push(
Expand Down Expand Up @@ -1909,8 +1903,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
tb_meta.drop_on = None;

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta_seq),
Expand All @@ -1921,7 +1915,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq),
txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq),
],
if_then: vec![
vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&DatabaseId { db_id }, serialize_struct(&db_meta)?), /* (db_id) -> db_meta */
Expand All @@ -1931,8 +1925,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_del(&orphan_dbid_tbname_idlist), // del orphan table idlist
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list.data)?), /* _fd_table_id_list/db_id/table_name -> tb_id_list */
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -2061,16 +2054,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// non-changed ones.

for chunk in copied_files.chunks(chunk_size as usize) {
let txn = TxnRequest {
condition: vec![],
if_then: chunk
let txn = TxnRequest::new(
vec![],
chunk
.iter()
.map(|(name, seq_file)| {
TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq()))
})
.collect(),
else_then: vec![],
};
);

let (_succ, _responses) = send_txn(self, txn).await?;
}
Expand Down Expand Up @@ -2377,16 +2369,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

let mut txn_req = TxnRequest {
condition: vec![
let mut txn_req = TxnRequest::new(
vec![
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
if_then: vec![
vec![
txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};
);

let _ = update_mask_policy(self, &req.action, &mut txn_req, &req.tenant, req.table_id)
.await;
Expand Down Expand Up @@ -2485,13 +2476,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest {
condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
if_then: vec![
let txn_req = TxnRequest::new(
//
vec![txn_cond_eq_seq(&tbid, tb_meta_seq)],
vec![
txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -2540,16 +2531,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
indexes.remove(&req.name);

let txn_req = TxnRequest {
condition: vec![
let txn_req = TxnRequest::new(
vec![
// table is not changed
txn_cond_seq(&tbid, Eq, seq_meta.seq),
],
if_then: vec![
vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(self, txn_req).await?;
debug!(id :? =(&tbid),succ = succ;"drop_table_index");
Expand Down Expand Up @@ -2742,11 +2732,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put(&id_generator, b"".to_vec()),
txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?,
];
let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);
let (succ, _responses) = send_txn(self, txn_req).await?;

if succ {
Expand Down Expand Up @@ -3053,11 +3039,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
txn_op_put_pb(&new_name_ident, &dict_id.data, None)?, // put new dict name
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down Expand Up @@ -4042,8 +4024,8 @@ async fn handle_undrop_table(
// reset drop on time
seq_table_meta.drop_on = None;

let txn = TxnRequest {
condition: vec![
let txn = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq),
Expand All @@ -4052,15 +4034,14 @@ async fn handle_undrop_table(
// table is not changed
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
],
if_then: vec![
vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */
],
else_then: vec![],
};
);

let (succ, _responses) = send_txn(kv_api, txn).await?;

Expand Down
6 changes: 1 addition & 5 deletions src/meta/api/src/sequence_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SequenceApi for KV {
txn_op_put_pb(&ident, &sequence_meta, None)?, // name -> meta
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let txn_req = TxnRequest::new(condition, if_then);

let (succ, _responses) = send_txn(self, txn_req).await?;

Expand Down
6 changes: 1 addition & 5 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,7 @@ async fn benchmark_table_copy_file(
serde_json::from_str(param).unwrap()
};

let mut txn = TxnRequest {
condition: vec![],
if_then: vec![],
else_then: vec![],
};
let mut txn = TxnRequest::default();

for file_index in 0..param.file_cnt {
let copied_file_ident = TableCopiedFileNameIdent {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// 🖥 server: add `txn_condition::Target::KeysWithPrefix`,
/// to support matching the key count by a prefix.
///
/// - 2024-12-1*: since 1.2.*
/// 🖥 server: add `TxnRequest::condition_tree`,
/// to specify a complex bool expression.
///
///
/// Server feature set:
/// ```yaml
Expand Down
1 change: 1 addition & 0 deletions src/meta/kvapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
databend-common-base = { workspace = true }
databend-common-meta-types = { workspace = true }
fastrace = { workspace = true }
futures-util = { workspace = true }
Expand Down
Loading

0 comments on commit e092f8e

Please sign in to comment.