From 2e0001bd8644692a43dc312d3beba8dd229444ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 16 Dec 2024 22:25:47 +0800 Subject: [PATCH] feat: databend-meta transaction support generic bool-expression Since this commit, application is allowed to specify a complex bool expressions as the transaction predicate. For example, the following transaction will be processed if: `(a == 1 || b == 2) && (x == 3 or y == 4)` ```rust let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val)); TxnRequest{ condition: vec![], condition_tree: Some( eq("a", 1).or(eq("b", 2)) .and( eq("x", 3).or(eq("y", 4)) )) // ... } ``` For backward compatibility, both already existing `condition` and the new `condition_tree` will be evaluated: transaction will be processed only when both of them evaluated to be `true`. --- Cargo.lock | 1 + src/meta/api/src/schema_api_impl.rs | 11 ++ src/meta/api/src/sequence_api_impl.rs | 1 + src/meta/client/src/lib.rs | 4 + src/meta/kvapi/Cargo.toml | 1 + src/meta/kvapi/src/kvapi/test_suite.rs | 110 +++++++++++++ src/meta/process/src/kv_processor.rs | 1 + src/meta/raft-store/src/applier.rs | 53 ++++++ src/meta/types/build.rs | 8 + src/meta/types/proto/meta.proto | 28 ++++ src/meta/types/src/cmd/mod.rs | 1 + src/meta/types/src/proto_display.rs | 68 +++++++- src/meta/types/src/proto_ext/txn_ext.rs | 208 ++++++++++++++++++++++++ 13 files changed, 492 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5982a33dc2fc..15ee9db2e4ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3659,6 +3659,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "databend-common-base", "databend-common-meta-types", "fastrace", "futures-util", diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 3fc0baff53eb..ec38c72f6efe 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -445,6 +445,7 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq), txn_cond_seq(&dbid, Eq, db_meta_seq), ], + condition_tree: None, if_then: vec![ txn_op_put(name_key, serialize_u64(db_id)?), // (tenant, db_name) -> db_id txn_op_put(&dbid, serialize_struct(&db_meta)?), // (db_id) -> db_meta @@ -596,6 +597,7 @@ impl + ?Sized> SchemaApi for KV { let txn_req = TxnRequest { condition, + condition_tree: None, if_then, else_then: vec![], }; @@ -1416,6 +1418,7 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&new_dbid_tbname_idlist, Eq, new_tb_id_list_seq), txn_cond_seq(&table_id_to_name_key, Eq, table_id_to_name_seq), ], + condition_tree: None, if_then: vec![ txn_op_del(&dbid_tbname), // (db_id, tb_name) -> tb_id txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */ @@ -1921,6 +1924,7 @@ impl + ?Sized> SchemaApi for KV { txn_cond_seq(&orphan_dbid_tbname_idlist, Eq, orphan_tb_id_list.seq), txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list.seq), ], + condition_tree: None, if_then: vec![ // Changing a table in a db has to update the seq of db_meta, // to block the batch-delete-tables when deleting a db. @@ -2063,6 +2067,7 @@ impl + ?Sized> SchemaApi for KV { for chunk in copied_files.chunks(chunk_size as usize) { let txn = TxnRequest { condition: vec![], + condition_tree: None, if_then: chunk .iter() .map(|(name, seq_file)| { @@ -2382,6 +2387,7 @@ impl + ?Sized> SchemaApi for KV { // table is not changed txn_cond_seq(&tbid, Eq, seq_meta.seq), ], + condition_tree: None, if_then: vec![ txn_op_put(&tbid, serialize_struct(&new_table_meta)?), // tb_id -> tb_meta ], @@ -2487,6 +2493,7 @@ impl + ?Sized> SchemaApi for KV { let txn_req = TxnRequest { condition: vec![txn_cond_eq_seq(&tbid, tb_meta_seq)], + condition_tree: None, if_then: vec![ txn_op_put_pb(&tbid, &table_meta, None)?, // tb_id -> tb_meta ], @@ -2545,6 +2552,7 @@ impl + ?Sized> SchemaApi for KV { // table is not changed txn_cond_seq(&tbid, Eq, seq_meta.seq), ], + condition_tree: None, if_then: vec![ txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta ], @@ -2744,6 +2752,7 @@ impl + ?Sized> SchemaApi for KV { ]; let txn_req = TxnRequest { condition, + condition_tree: None, if_then, else_then: vec![], }; @@ -3055,6 +3064,7 @@ impl + ?Sized> SchemaApi for KV { let txn_req = TxnRequest { condition, + condition_tree: None, if_then, else_then: vec![], }; @@ -4052,6 +4062,7 @@ async fn handle_undrop_table( // table is not changed txn_cond_eq_seq(&tbid, seq_table_meta.seq), ], + condition_tree: None, if_then: vec![ // Changing a table in a db has to update the seq of db_meta, // to block the batch-delete-tables when deleting a db. diff --git a/src/meta/api/src/sequence_api_impl.rs b/src/meta/api/src/sequence_api_impl.rs index 6183d0cfd5e8..cf5a55375d17 100644 --- a/src/meta/api/src/sequence_api_impl.rs +++ b/src/meta/api/src/sequence_api_impl.rs @@ -144,6 +144,7 @@ impl + ?Sized> SequenceApi for KV { let txn_req = TxnRequest { condition, + condition_tree: None, if_then, else_then: vec![], }; diff --git a/src/meta/client/src/lib.rs b/src/meta/client/src/lib.rs index 7b6ffc037afc..5ed6d5f191f8 100644 --- a/src/meta/client/src/lib.rs +++ b/src/meta/client/src/lib.rs @@ -117,6 +117,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock = LazyLock::new(|| { /// 🖥 server: add `txn_condition::Target::KeysWithPrefix`, /// to support matching the key count by a prefix. /// +/// - 2024-12-1*: since 1.2.* +/// 🖥 server: add `TxnRequest::condition_tree`, +/// to specify a complex bool expression. +/// /// /// Server feature set: /// ```yaml diff --git a/src/meta/kvapi/Cargo.toml b/src/meta/kvapi/Cargo.toml index 666be11aea23..db292b95b9bc 100644 --- a/src/meta/kvapi/Cargo.toml +++ b/src/meta/kvapi/Cargo.toml @@ -15,6 +15,7 @@ test = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +databend-common-base = { workspace = true } databend-common-meta-types = { workspace = true } fastrace = { workspace = true } futures-util = { workspace = true } diff --git a/src/meta/kvapi/src/kvapi/test_suite.rs b/src/meta/kvapi/src/kvapi/test_suite.rs index a5b113f35d6f..2cef7937e10a 100644 --- a/src/meta/kvapi/src/kvapi/test_suite.rs +++ b/src/meta/kvapi/src/kvapi/test_suite.rs @@ -14,7 +14,10 @@ use std::time::Duration; +use databend_common_base::display::display_option::DisplayOptionExt; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::protobuf::BooleanExpression; use databend_common_meta_types::seq_value::KVMeta; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::txn_condition; @@ -94,6 +97,8 @@ impl kvapi::TestSuite { .await?; self.kv_transaction_condition_keys_with_prefix(&builder.build().await) .await?; + self.kv_transaction_complex_conditions(&builder.build().await) + .await?; self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await) .await?; self.kv_transaction_delete_match_seq_some_match(&builder.build().await) @@ -512,6 +517,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: conditions, + condition_tree: None, if_then, else_then, }; @@ -570,6 +576,7 @@ impl kvapi::TestSuite { let else_then: Vec = vec![]; let txn = TxnRequest { condition, + condition_tree: None, if_then, else_then, }; @@ -616,6 +623,7 @@ impl kvapi::TestSuite { }]; let txn = TxnRequest { condition, + condition_tree: None, if_then, else_then, }; @@ -666,6 +674,7 @@ impl kvapi::TestSuite { let else_then: Vec = vec![]; let txn = TxnRequest { condition, + condition_tree: None, if_then, else_then, }; @@ -711,6 +720,7 @@ impl kvapi::TestSuite { let else_then: Vec = vec![]; let txn = TxnRequest { condition, + condition_tree: None, if_then, else_then, }; @@ -782,6 +792,7 @@ impl kvapi::TestSuite { let else_then: Vec = vec![]; let txn = TxnRequest { condition, + condition_tree: None, if_then, else_then, }; @@ -877,6 +888,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Eq, b("v10"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v2")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -893,6 +905,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Eq, b("v1"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v2")), TxnOp::get(k1)], else_then: vec![], }; @@ -911,6 +924,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v2"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -927,6 +941,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Lt, b("v3"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v3")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -945,6 +960,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v0"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -961,6 +977,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Le, b("v3"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v4")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -979,6 +996,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v5"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -995,6 +1013,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Gt, b("v3"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v5")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -1013,6 +1032,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v6"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -1029,6 +1049,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![TxnCondition::match_value(k1, ConditionResult::Ge, b("v5"))], + condition_tree: None, if_then: vec![TxnOp::put(k1, b("v6")), TxnOp::get(k1)], else_then: vec![TxnOp::get(k1)], }; @@ -1054,6 +1075,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![], + condition_tree: None, if_then: vec![TxnOp::put_with_ttl( "k1", b("v1"), @@ -1107,6 +1129,7 @@ impl kvapi::TestSuite { op, n, )], + condition_tree: None, if_then: vec![TxnOp::put(&positive, b(format!("{op:?}")))], else_then: vec![TxnOp::put(&negative, b(format!("{op:?}")))], }; @@ -1152,6 +1175,90 @@ impl kvapi::TestSuite { Ok(()) } + pub async fn kv_transaction_complex_conditions( + &self, + kv: &KV, + ) -> anyhow::Result<()> { + let prefix = func_name!(); + + let sample = |suffix: &str| format!("{}/{}", prefix, suffix); + let positive = format!("{prefix}/positive"); + let negative = format!("{prefix}/negative"); + + kv.upsert_kv(UpsertKV::update(sample("a"), &b("a"))).await?; + kv.upsert_kv(UpsertKV::update(sample("b"), &b("b"))).await?; + kv.upsert_kv(UpsertKV::update(sample("c"), &b("c"))).await?; + + // Build a simple equal-value condition + let eq = |key: &str, val: &str| TxnCondition::eq_value(sample(key), b(val)); + + // A transaction that set positive key if succeeded, + // otherwise set the negative key. + let txn = |conditions: Vec, tree: Option| TxnRequest { + condition: conditions, + condition_tree: tree, + if_then: vec![TxnOp::put(&positive, b("T"))], + else_then: vec![TxnOp::put(&negative, b("T"))], + }; + + for (conditions, tree, expected) in [ + (vec![eq("a", "a")], None, true), + (vec![eq("a", "a"), eq("b", "b")], None, true), + ( + vec![eq("a", "a"), eq("b", "b")], + Some(eq("c", "c").or(eq("x", "x"))), + true, + ), + (vec![], Some(eq("a", "a").and(eq("b", "b"))), true), + (vec![], Some(eq("a", "a").and(eq("b", "c"))), false), + ( + vec![eq("a", "a")], + Some(eq("a", "a").and(eq("b", "c"))), + false, + ), + ( + vec![eq("a", "a")], + Some(eq("a", "a").or(eq("b", "c"))), + true, + ), + ( + vec![], + Some( + eq("a", "a") + .or(eq("x", "x")) + .and(eq("b", "b").or(eq("y", "y"))), + ), + true, + ), + ] { + kv.upsert_kv(UpsertKV::update(&positive, &b(""))).await?; + kv.upsert_kv(UpsertKV::update(&negative, &b(""))).await?; + + let resp = kv + .transaction(txn(conditions.clone(), tree.clone())) + .await?; + assert_eq!( + resp.success, + expected, + "case: {} {}, expected: {expected}", + conditions.display(), + tree.display() + ); + + let expected_key = if expected { &positive } else { &negative }; + let got = kv.get_kv(expected_key).await?.unwrap().data; + assert_eq!( + got, + b("T"), + "case: {} {}, expected: {expected}", + conditions.display(), + tree.display() + ); + } + + Ok(()) + } + /// If `TxnDeleteRequest.match_seq` is not set, /// the delete operation will always be executed. pub async fn kv_transaction_delete_match_seq_none( @@ -1166,6 +1273,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![], + condition_tree: None, if_then: vec![TxnOp::delete(key())], else_then: vec![], }; @@ -1200,6 +1308,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![], + condition_tree: None, if_then: vec![TxnOp::delete_exact(key(), Some(100))], else_then: vec![], }; @@ -1238,6 +1347,7 @@ impl kvapi::TestSuite { let txn = TxnRequest { condition: vec![], + condition_tree: None, if_then: vec![TxnOp::delete_exact(key(), Some(1))], else_then: vec![], }; diff --git a/src/meta/process/src/kv_processor.rs b/src/meta/process/src/kv_processor.rs index cad80935c285..f1d56c1bb775 100644 --- a/src/meta/process/src/kv_processor.rs +++ b/src/meta/process/src/kv_processor.rs @@ -166,6 +166,7 @@ where F: Fn(&str, Vec) -> Result, anyhow::Error> time_ms: log_entry.time_ms, cmd: Cmd::Transaction(TxnRequest { condition, + condition_tree: None, if_then, else_then, }), diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index 6bfa2dbd40d6..d8142053d8ea 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -18,6 +18,8 @@ use std::time::Duration; use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; use databend_common_meta_types::protobuf as pb; +use databend_common_meta_types::protobuf::boolean_expression::CombiningOperator; +use databend_common_meta_types::protobuf::BooleanExpression; use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::EntryPayload; use databend_common_meta_types::raft_types::StoredMembership; @@ -50,6 +52,7 @@ use databend_common_meta_types::TxnRequest; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use futures::stream::TryStreamExt; +use futures_util::future::BoxFuture; use futures_util::StreamExt; use log::debug; use log::error; @@ -248,6 +251,15 @@ where SM: StateMachineApi + 'static debug!(txn :% =(req); "apply txn cmd"); let success = self.eval_txn_conditions(&req.condition).await?; + let success = if success { + if let Some(expr) = &req.condition_tree { + self.eval_condition_tree(expr).await? + } else { + success + } + } else { + false + }; let ops = if success { &req.if_then @@ -284,6 +296,47 @@ where SM: StateMachineApi + 'static Ok(true) } + fn eval_condition_tree<'x>( + &'x mut self, + tree: &'x BooleanExpression, + ) -> BoxFuture<'x, Result> { + let op = tree.operator(); + + let fu = async move { + match op { + CombiningOperator::And => { + for expr in tree.sub_expressions.iter() { + if !self.eval_condition_tree(expr).await? { + return Ok(false); + } + } + + for cond in tree.conditions.iter() { + if !self.eval_one_condition(cond).await? { + return Ok(false); + } + } + } + CombiningOperator::Or => { + for expr in tree.sub_expressions.iter() { + if self.eval_condition_tree(expr).await? { + return Ok(true); + } + } + + for cond in tree.conditions.iter() { + if self.eval_one_condition(cond).await? { + return Ok(true); + } + } + } + } + Ok(true) + }; + + Box::pin(fu) + } + #[fastrace::trace] async fn eval_one_condition(&self, cond: &TxnCondition) -> Result { debug!(cond :% =(cond); "txn_execute_one_condition"); diff --git a/src/meta/types/build.rs b/src/meta/types/build.rs index 3f5a4e4d1185..8f18b84fc1a3 100644 --- a/src/meta/types/build.rs +++ b/src/meta/types/build.rs @@ -83,6 +83,14 @@ fn build_proto() { "TxnCondition", "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", ) + .type_attribute( + "BooleanExpression", + "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + ) + .type_attribute( + "BooleanExpression.CombiningOperator", + "#[derive(serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + ) .type_attribute( "TxnOp", "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", diff --git a/src/meta/types/proto/meta.proto b/src/meta/types/proto/meta.proto index ade20569ec7a..1235121a15f5 100644 --- a/src/meta/types/proto/meta.proto +++ b/src/meta/types/proto/meta.proto @@ -130,6 +130,28 @@ message TxnCondition { ConditionResult expected = 4; } +// BooleanExpression represents a tree of transaction conditions combined with logical operators. +// It enables complex condition checking by allowing both simple conditions and nested expressions. +message BooleanExpression { + // Logical operator to combine multiple conditions, including sub compound conditions or simple conditions. + enum CombiningOperator { + AND = 0; + OR = 1; + } + + // Operator determining how child expressions and conditions are combined + CombiningOperator operator = 1; + + // Nested boolean expressions, allowing tree-like structure + // Example: (A AND B) OR (C AND D) where A,B,C,D are conditions + repeated BooleanExpression sub_expressions = 2; + + // Leaf-level transaction conditions + // These are the actual checks performed against the state. + repeated TxnCondition conditions = 3; +} + + message TxnOp { oneof request { TxnGetRequest get = 1; @@ -154,6 +176,12 @@ message TxnRequest { // otherwise `else_then` op will be executed. repeated TxnCondition condition = 1; + // Tree of conditions combined with logical operators + // Provides flexible AND/OR combinations of conditions. + // + // For backward compatibility, both `condition` and `condition_tree` must be satisfied to proceed. + BooleanExpression condition_tree = 4; + // `if_then` is a list of operations will be executed when all condition // evaluates to true. repeated TxnOp if_then = 2; diff --git a/src/meta/types/src/cmd/mod.rs b/src/meta/types/src/cmd/mod.rs index 8aa4c90ab2fb..19e1d7f3fa0b 100644 --- a/src/meta/types/src/cmd/mod.rs +++ b/src/meta/types/src/cmd/mod.rs @@ -133,6 +133,7 @@ mod tests { // Transaction let cmd = super::Cmd::Transaction(TxnRequest { condition: vec![TxnCondition::eq_value("k", b("v"))], + condition_tree: None, if_then: vec![TxnOp::put_with_ttl( "k", b("v"), diff --git a/src/meta/types/src/proto_display.rs b/src/meta/types/src/proto_display.rs index 17bd20d69d91..f2892b0b6331 100644 --- a/src/meta/types/src/proto_display.rs +++ b/src/meta/types/src/proto_display.rs @@ -19,6 +19,8 @@ use std::time::Duration; use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; use num_traits::FromPrimitive; +use crate::protobuf::boolean_expression::CombiningOperator; +use crate::protobuf::BooleanExpression; use crate::txn_condition::Target; use crate::txn_op; use crate::txn_op::Request; @@ -98,13 +100,23 @@ impl Display for VecDisplay<'_, T> { impl Display for TxnRequest { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "TxnRequest{{ if:",)?; + write!(f, "{}", VecDisplay::new_at_most(&self.condition, 5),)?; + if let Some(t) = &self.condition_tree { + if !self.condition.is_empty() { + write!(f, " AND ")?; + } + write!(f, "{}", t)?; + } + write!( f, - "TxnRequest{{ if:{} then:{} else:{} }}", - VecDisplay::new_at_most(&self.condition, 5), + "then:{} else:{}", VecDisplay::new_at_most(&self.if_then, 5), VecDisplay::new_at_most(&self.else_then, 5), - ) + )?; + + write!(f, "}}",) } } @@ -287,8 +299,40 @@ impl Display for TxnDeleteByPrefixResponse { } } +impl Display for BooleanExpression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let op = self.operator(); + let op = match op { + CombiningOperator::And => "AND", + CombiningOperator::Or => "OR", + }; + + let mut printed = false; + + for expr in self.sub_expressions.iter() { + if printed { + write!(f, " {} ", op)?; + } + write!(f, "({})", expr)?; + printed = true; + } + + for cond in self.conditions.iter() { + if printed { + write!(f, " {} ", op)?; + } + write!(f, "{}", cond)?; + printed = true; + } + Ok(()) + } +} + #[cfg(test)] mod tests { + use crate::protobuf::BooleanExpression; + use crate::protobuf::TxnCondition; + #[test] fn test_vec_display() { assert_eq!( @@ -311,4 +355,22 @@ mod tests { "[...]" ); } + + #[test] + fn test_tx_display_with_bool_expression() { + let expr = BooleanExpression::from_conditions_and([ + TxnCondition::eq_seq("k1", 1), + TxnCondition::eq_seq("k2", 2), + ]) + .and(BooleanExpression::from_conditions_or([ + TxnCondition::eq_seq("k3", 3), + TxnCondition::eq_seq("k4", 4), + TxnCondition::keys_with_prefix("k5", 10), + ])); + + assert_eq!( + format!("{}", expr), + "(k3 == seq(3) OR k4 == seq(4) OR k5 == keys_with_prefix(10)) AND k1 == seq(1) AND k2 == seq(2)" + ); + } } diff --git a/src/meta/types/src/proto_ext/txn_ext.rs b/src/meta/types/src/proto_ext/txn_ext.rs index 9ef3c5bde9f7..928c1edaee5b 100644 --- a/src/meta/types/src/proto_ext/txn_ext.rs +++ b/src/meta/types/src/proto_ext/txn_ext.rs @@ -14,6 +14,7 @@ use std::time::Duration; +use pb::boolean_expression::CombiningOperator; use pb::txn_condition::ConditionResult; use pb::txn_condition::Target; @@ -27,12 +28,104 @@ impl TxnRequest { pub fn unconditional(ops: Vec) -> Self { Self { condition: vec![], + condition_tree: None, if_then: ops, else_then: vec![], } } } +#[derive(derive_more::From)] +pub enum ExpressionOrCondition { + Expression(#[from] pb::BooleanExpression), + Condition(#[from] pb::TxnCondition), +} + +impl pb::BooleanExpression { + pub fn from_conditions_and(conditions: impl IntoIterator) -> Self { + Self::from_conditions(CombiningOperator::And, conditions) + } + + pub fn from_conditions_or(conditions: impl IntoIterator) -> Self { + Self::from_conditions(CombiningOperator::Or, conditions) + } + + fn from_conditions( + op: CombiningOperator, + conditions: impl IntoIterator, + ) -> Self { + Self { + conditions: conditions.into_iter().collect(), + operator: op as i32, + sub_expressions: vec![], + } + } + + pub fn and(self, expr_or_condition: impl Into) -> Self { + self.merge(CombiningOperator::And, expr_or_condition) + } + + pub fn or(self, expr_or_condition: impl Into) -> Self { + self.merge(CombiningOperator::Or, expr_or_condition) + } + + pub fn and_many(self, others: impl IntoIterator) -> Self { + self.merge_expressions(CombiningOperator::And, others) + } + + pub fn or_many(self, others: impl IntoIterator) -> Self { + self.merge_expressions(CombiningOperator::Or, others) + } + + fn merge( + self, + op: CombiningOperator, + expr_or_condition: impl Into, + ) -> Self { + let x = expr_or_condition.into(); + match x { + ExpressionOrCondition::Expression(expr) => self.merge_expressions(op, [expr]), + ExpressionOrCondition::Condition(cond) => self.merge_conditions(op, [cond]), + } + } + + fn merge_conditions( + mut self, + op: CombiningOperator, + condition: impl IntoIterator, + ) -> Self { + if self.operator == op as i32 { + self.conditions.extend(condition); + self + } else { + pb::BooleanExpression { + operator: op as i32, + sub_expressions: vec![self], + conditions: condition.into_iter().collect(), + } + } + } + + fn merge_expressions( + mut self, + op: CombiningOperator, + other: impl IntoIterator, + ) -> Self { + if self.operator == op as i32 { + self.sub_expressions.extend(other); + self + } else { + let mut expressions = vec![self]; + expressions.extend(other); + Self { + conditions: vec![], + operator: op as i32, + sub_expressions: expressions, + } + } + } +} + impl pb::TxnCondition { /// Create a txn condition that checks if the `seq` matches. pub fn eq_seq(key: impl ToString, seq: u64) -> Self { @@ -77,6 +170,14 @@ impl pb::TxnCondition { target: Some(Target::KeysWithPrefix(count)), } } + + pub fn and(self, other: pb::TxnCondition) -> pb::BooleanExpression { + pb::BooleanExpression::from_conditions_and([self, other]) + } + + pub fn or(self, other: pb::TxnCondition) -> pb::BooleanExpression { + pb::BooleanExpression::from_conditions_or([self, other]) + } } impl pb::TxnOp { @@ -183,3 +284,110 @@ impl pb::TxnGetResponse { } } } + +#[cfg(test)] +mod tests { + use crate::protobuf::BooleanExpression; + use crate::TxnCondition; + + #[test] + fn test_bool_expression() { + use BooleanExpression as Expr; + + let cond = |k: &str, seq| TxnCondition::eq_seq(k, seq); + + // from_conditions_and + let expr = Expr::from_conditions_and([cond("a", 1), cond("b", 2)]); + assert_eq!(expr.to_string(), "a == seq(1) AND b == seq(2)"); + + // from_conditions_or + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]); + assert_eq!(expr.to_string(), "a == seq(1) OR b == seq(2)"); + + // and_condition + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and(cond("c", 3)); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND c == seq(3)" + ); + } + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and(cond("e", 5)); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND e == seq(5)" + ); + } + + // or_condition + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).or(cond("c", 3)); + assert_eq!( + expr.to_string(), + "a == seq(1) OR b == seq(2) OR c == seq(3)" + ); + } + { + let expr = Expr::from_conditions_and([cond("a", 1), cond("b", 2)]).or(cond("e", 5)); + assert_eq!( + expr.to_string(), + "(a == seq(1) AND b == seq(2)) OR e == seq(5)" + ); + } + + // and + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]) + .and(Expr::from_conditions_or([cond("c", 3), cond("d", 4)])); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4))" + ); + } + // or + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]) + .or(Expr::from_conditions_or([cond("c", 3), cond("d", 4)])); + assert_eq!( + expr.to_string(), + "(c == seq(3) OR d == seq(4)) OR a == seq(1) OR b == seq(2)" + ); + } + // and_many + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).and_many([ + Expr::from_conditions_or([cond("c", 3), cond("d", 4)]), + Expr::from_conditions_or([cond("e", 5), cond("f", 6)]), + ]); + assert_eq!( + expr.to_string(), + "(a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4)) AND (e == seq(5) OR f == seq(6))" + ); + } + // or_many + { + let expr = Expr::from_conditions_or([cond("a", 1), cond("b", 2)]).or_many([ + Expr::from_conditions_or([cond("c", 3), cond("d", 4)]), + Expr::from_conditions_or([cond("e", 5), cond("f", 6)]), + ]); + assert_eq!( + expr.to_string(), + "(c == seq(3) OR d == seq(4)) OR (e == seq(5) OR f == seq(6)) OR a == seq(1) OR b == seq(2)" + ); + } + // complex + { + let expr = cond("a", 1) + .or(cond("b", 2)) + .and(cond("c", 3).or(cond("d", 4))) + .or(cond("e", 5) + .or(cond("f", 6)) + .and(cond("g", 7).or(cond("h", 8)))); + assert_eq!( + expr.to_string(), + "((a == seq(1) OR b == seq(2)) AND (c == seq(3) OR d == seq(4))) OR ((e == seq(5) OR f == seq(6)) AND (g == seq(7) OR h == seq(8)))" + ); + } + } +}