From 8eafc6a5d1201ce78a20661339d5e8a41d2fa87a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 20 Dec 2024 16:00:14 +0800 Subject: [PATCH 1/9] save work: need to locate the minio access issue --- Cargo.lock | 60 ++++++++++++------- .../filesystem/opendal_source/s3_source.rs | 4 ++ .../source/iceberg/parquet_file_handler.rs | 30 ++++++++++ src/frontend/src/expr/table_function.rs | 28 ++++++--- .../src/object/opendal_engine/opendal_s3.rs | 3 +- 5 files changed, 95 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f11864048ceb3..1bf647d4b4c4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,23 +1349,23 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.6.2" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33e4a55b03f8780ba55041bc7be91a2a8ec8c03517b0379d2d6c96d2c30d95" +checksum = "f409eb70b561706bf8abba8ca9c112729c481595893fd06a2dd9af8ed8441148" dependencies = [ "aws-lc-sys", - "mirai-annotations", "paste", "zeroize", ] [[package]] name = "aws-lc-sys" -version = "0.13.3" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ede3d6e360a48436fee127cb81710834407b1ec0c48a001cc29dec9005f73e" +checksum = "8478a5c29ead3f3be14aff8a202ad965cf7da6856860041bfca271becf8ba48b" dependencies = [ - "bindgen", + "bindgen 0.69.5", + "cc", "cmake", "dunce", "fs_extra", @@ -2081,14 +2081,14 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.69.4" +version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -2096,12 +2096,30 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.87", "which", ] +[[package]] +name = "bindgen" +version = "0.71.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "itertools 0.10.5", + "proc-macro2", + "quote", + "regex", + "rustc-hash 2.1.0", + "shlex", + "syn 2.0.87", +] + [[package]] name = "bit-set" version = "0.8.0" @@ -2993,7 +3011,7 @@ dependencies = [ "hashbrown 0.14.5", "log", "regalloc2", - "rustc-hash", + "rustc-hash 1.1.0", "smallvec", "target-lexicon", ] @@ -4252,7 +4270,7 @@ dependencies = [ "lazy_static", "mintex", "parking_lot 0.12.1", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "thousands", @@ -7643,12 +7661,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "mirai-annotations" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" - [[package]] name = "mockall" version = "0.11.4" @@ -7834,7 +7846,7 @@ checksum = "8a60cb978c0a1d654edcc1460f8d6092dacf21346ed6017d81fb76a23ef5a8de" dependencies = [ "base64 0.21.7", "bigdecimal 0.4.5", - "bindgen", + "bindgen 0.71.1", "bitflags 2.6.0", "bitvec", "btoi", @@ -9657,7 +9669,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.87", @@ -10155,7 +10167,7 @@ checksum = "ad156d539c879b7a24a363a2016d77961786e71f48f2e2fc8302a92abd2429a6" dependencies = [ "hashbrown 0.13.2", "log", - "rustc-hash", + "rustc-hash 1.1.0", "slice-group-by", "smallvec", ] @@ -12522,6 +12534,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" + [[package]] name = "rustc_version" version = "0.2.3" diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 7230e28dfd4f0..14333c868e322 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -37,10 +37,12 @@ impl OpendalEnumerator { .region(&s3_properties.region_name); if let Some(endpoint_url) = s3_properties.endpoint_url { + println!("这里 endpoint_url = {:?}", endpoint_url); builder = builder.endpoint(&endpoint_url); } if let Some(access) = s3_properties.access { + println!("这里 access = {:?}", access); builder = builder.access_key_id(&access); } else { tracing::error!( @@ -50,6 +52,7 @@ impl OpendalEnumerator { } if let Some(secret) = s3_properties.secret { + println!("这里 secret = {:?}", secret); builder = builder.secret_access_key(&secret); } else { tracing::error!( @@ -59,6 +62,7 @@ impl OpendalEnumerator { } if let Some(assume_role) = assume_role { + println!("多了一个assume_role = {:?}", assume_role); builder = builder.role_arn(&assume_role); } diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 49b6d9a425276..e700fbf6bb3d5 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -110,6 +110,7 @@ pub fn new_s3_operator( s3_secret_key: String, bucket: String, ) -> ConnectorResult { + println!("这里还在用s3"); // Create s3 builder. let mut builder = S3::default().bucket(&bucket).region(&s3_region); builder = builder.secret_access_key(&s3_access_key); @@ -127,6 +128,32 @@ pub fn new_s3_operator( Ok(op) } +pub fn new_minio_operator( + minio_region: String, + minio_access_key: String, + minio_secret_key: String, + bucket: String, +) -> ConnectorResult { + // Create s3 builder. + println!("minio_region = {} minio_access_key = {} minio_secret_key = {} bucket = {} ", minio_region, minio_access_key, minio_secret_key, bucket); + let mut builder = S3::default().bucket(&bucket).region(&minio_region); + builder = builder.secret_access_key(&minio_access_key); + builder = builder.secret_access_key(&minio_secret_key); + builder = builder.endpoint("http://127.0.0.1:9301"); + + + builder = builder.disable_ec2_metadata(); + builder = builder.disable_config_load(); + builder = builder.allow_anonymous(); + // builder = builder.enable_virtual_host_style(); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(op) +} + pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { let url = Url::parse(location)?; let bucket = url @@ -149,6 +176,7 @@ pub async fn list_s3_directory( s3_secret_key: String, dir: String, ) -> Result, anyhow::Error> { + println!("list"); let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { @@ -244,6 +272,7 @@ pub async fn read_parquet_file( ) -> ConnectorResult< Pin> + Send>>, > { + println!("这里 file name = {:?}", file_name); let mut reader: tokio_util::compat::Compat = op .reader_with(&file_name) .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. @@ -302,6 +331,7 @@ pub async fn get_parquet_fields( op: Operator, file_name: String, ) -> ConnectorResult { + println!("file_name = {:?}", file_name); let mut reader: tokio_util::compat::Compat = op .reader_with(&file_name) .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index d49b4332b117f..81b11a5bad287 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,7 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_minio_operator, new_s3_operator }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -133,7 +133,7 @@ impl TableFunction { .into()); } - if !"s3".eq_ignore_ascii_case(&eval_args[1]) { + if !"s3".eq_ignore_ascii_case(&eval_args[1]) && !"minio".eq_ignore_ascii_case(&eval_args[1]){ return Err(BindError( "file_scan function only accepts 's3' as storage type".to_owned(), ) @@ -182,12 +182,24 @@ impl TableFunction { None => eval_args[5].clone(), }; let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )?; + let op = if "s3".eq_ignore_ascii_case(&eval_args[1]){ + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + }else if "minio".eq_ignore_ascii_case(&eval_args[1]){ + new_minio_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + }else{ + unreachable!() + }; + let fields = get_parquet_fields(op, file_name).await?; diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index d44b1f745df75..51f78e055cd4a 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -78,7 +78,8 @@ impl OpendalObjectStore { "http://" }; let (address, bucket) = rest.split_once('/').unwrap(); - + let a = &format!("{}{}", endpoint_prefix, address); + println!("这里 {:?}", a); let builder = S3::default() .bucket(bucket) .region("custom") From 9a13b61e14cd8b6cb5f9524ee9c34df43bdeb324 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 13:53:01 +0800 Subject: [PATCH 2/9] save work --- .../src/source/filesystem/opendal_source/s3_source.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 14333c868e322..7230e28dfd4f0 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -37,12 +37,10 @@ impl OpendalEnumerator { .region(&s3_properties.region_name); if let Some(endpoint_url) = s3_properties.endpoint_url { - println!("这里 endpoint_url = {:?}", endpoint_url); builder = builder.endpoint(&endpoint_url); } if let Some(access) = s3_properties.access { - println!("这里 access = {:?}", access); builder = builder.access_key_id(&access); } else { tracing::error!( @@ -52,7 +50,6 @@ impl OpendalEnumerator { } if let Some(secret) = s3_properties.secret { - println!("这里 secret = {:?}", secret); builder = builder.secret_access_key(&secret); } else { tracing::error!( @@ -62,7 +59,6 @@ impl OpendalEnumerator { } if let Some(assume_role) = assume_role { - println!("多了一个assume_role = {:?}", assume_role); builder = builder.role_arn(&assume_role); } From 42ea589995f4393aa9c87176830675db05b79cab Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 14:37:04 +0800 Subject: [PATCH 3/9] revert change in cargo.lock --- Cargo.lock | 182 ++++++++---------- .../source/iceberg/parquet_file_handler.rs | 4 +- 2 files changed, 80 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1bf647d4b4c4c..4152238f77019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1023,8 +1023,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.0", - "zstd-safe 7.0.0", + "zstd 0.13.2", + "zstd-safe 7.2.1", ] [[package]] @@ -1333,39 +1333,32 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.60.1" +version = "0.60.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e4199d5d62ab09be6a64650c06cc5c4aa45806fed4c74bc4a5c8eaf039a6fa" +checksum = "1eda156637dc4fd310cd05b2a35f963a591684b02b13694bd790df48f5349ee0" dependencies = [ - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes", - "http 0.2.9", - "http-body 0.4.5", - "pin-project-lite", - "tracing", + "aws-runtime", ] [[package]] name = "aws-lc-rs" -version = "1.12.0" +version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f409eb70b561706bf8abba8ca9c112729c481595893fd06a2dd9af8ed8441148" +checksum = "df33e4a55b03f8780ba55041bc7be91a2a8ec8c03517b0379d2d6c96d2c30d95" dependencies = [ "aws-lc-sys", + "mirai-annotations", "paste", "zeroize", ] [[package]] name = "aws-lc-sys" -version = "0.24.0" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8478a5c29ead3f3be14aff8a202ad965cf7da6856860041bfca271becf8ba48b" +checksum = "37ede3d6e360a48436fee127cb81710834407b1ec0c48a001cc29dec9005f73e" dependencies = [ - "bindgen 0.69.5", - "cc", + "bindgen", "cmake", "dunce", "fs_extra", @@ -1597,7 +1590,7 @@ dependencies = [ "hex", "hmac", "http 0.2.9", - "http 1.1.0", + "http 1.2.0", "once_cell", "p256 0.11.1", "percent-encoding", @@ -1729,7 +1722,7 @@ dependencies = [ "aws-smithy-types", "bytes", "http 0.2.9", - "http 1.1.0", + "http 1.2.0", "pin-project-lite", "tokio", "tracing", @@ -1747,7 +1740,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.9", - "http 1.1.0", + "http 1.2.0", "http-body 0.4.5", "http-body 1.0.0", "http-body-util", @@ -1806,7 +1799,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "hyper 1.4.1", @@ -1839,7 +1832,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "mime", @@ -1859,7 +1852,7 @@ checksum = "077959a7f8cf438676af90b483304528eb7e16eadadb7f44e9ada4f9dceb9e62" dependencies = [ "axum-core", "chrono", - "http 1.1.0", + "http 1.2.0", "mime_guess", "rust-embed", "tower-service", @@ -1875,7 +1868,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "mime", @@ -2081,14 +2074,14 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.69.5" +version = "0.69.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -2096,30 +2089,12 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash 1.1.0", + "rustc-hash", "shlex", "syn 2.0.87", "which", ] -[[package]] -name = "bindgen" -version = "0.71.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" -dependencies = [ - "bitflags 2.6.0", - "cexpr", - "clang-sys", - "itertools 0.10.5", - "proc-macro2", - "quote", - "regex", - "rustc-hash 2.1.0", - "shlex", - "syn 2.0.87", -] - [[package]] name = "bit-set" version = "0.8.0" @@ -3011,7 +2986,7 @@ dependencies = [ "hashbrown 0.14.5", "log", "regalloc2", - "rustc-hash 1.1.0", + "rustc-hash", "smallvec", "target-lexicon", ] @@ -3616,7 +3591,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -4270,7 +4245,7 @@ dependencies = [ "lazy_static", "mintex", "parking_lot 0.12.1", - "rustc-hash 1.1.0", + "rustc-hash", "serde", "serde_json", "thousands", @@ -5248,7 +5223,7 @@ dependencies = [ "thiserror 2.0.3", "tracing", "twox-hash 2.1.0", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -5664,7 +5639,7 @@ checksum = "cae77099e2399aea466bba05f0d23a150b4f34ed7ce535835e71d91399e65b58" dependencies = [ "anyhow", "async-trait", - "http 1.1.0", + "http 1.2.0", "thiserror 1.0.63", "tokio", "tonic", @@ -5747,7 +5722,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3eaaad103912825594d674a4b1e556ccbb05a13a6cac17dcfd871997fb760a" dependencies = [ "google-cloud-token", - "http 1.1.0", + "http 1.2.0", "thiserror 1.0.63", "tokio", "tokio-retry", @@ -5877,7 +5852,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.1.0", + "http 1.2.0", "indexmap 2.7.0", "slab", "tokio", @@ -6076,9 +6051,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" dependencies = [ "bytes", "fnv", @@ -6103,7 +6078,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.1.0", + "http 1.2.0", ] [[package]] @@ -6114,7 +6089,7 @@ checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", "futures-core", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "pin-project-lite", ] @@ -6177,7 +6152,7 @@ dependencies = [ "futures-channel", "futures-util", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "httparse", "httpdate", @@ -6212,7 +6187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 1.1.0", + "http 1.2.0", "hyper 1.4.1", "hyper-util", "rustls 0.22.4", @@ -6273,7 +6248,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "hyper 1.4.1", "pin-project-lite", @@ -6391,7 +6366,7 @@ source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb214 dependencies = [ "async-trait", "chrono", - "http 1.1.0", + "http 1.2.0", "iceberg", "itertools 0.13.0", "log", @@ -7383,7 +7358,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http 1.1.0", + "http 1.2.0", "madsim", "spin 0.9.8", "tracing", @@ -7661,6 +7636,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mirai-annotations" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" + [[package]] name = "mockall" version = "0.11.4" @@ -7846,7 +7827,7 @@ checksum = "8a60cb978c0a1d654edcc1460f8d6092dacf21346ed6017d81fb76a23ef5a8de" dependencies = [ "base64 0.21.7", "bigdecimal 0.4.5", - "bindgen 0.71.1", + "bindgen", "bitflags 2.6.0", "bitvec", "btoi", @@ -7875,7 +7856,7 @@ dependencies = [ "thiserror 1.0.63", "time", "uuid", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -8324,7 +8305,7 @@ dependencies = [ "flagset", "futures", "getrandom", - "http 1.1.0", + "http 1.2.0", "log", "md-5", "once_cell", @@ -8356,7 +8337,7 @@ dependencies = [ "flagset", "futures", "getrandom", - "http 1.1.0", + "http 1.2.0", "log", "md-5", "once_cell", @@ -8498,7 +8479,7 @@ checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" dependencies = [ "async-trait", "futures-core", - "http 1.1.0", + "http 1.2.0", "opentelemetry", "opentelemetry-proto", "opentelemetry_sdk", @@ -8796,7 +8777,7 @@ dependencies = [ "thrift", "tokio", "twox-hash 1.6.3", - "zstd 0.13.0", + "zstd 0.13.2", "zstd-sys", ] @@ -8831,7 +8812,7 @@ dependencies = [ "thrift", "tokio", "twox-hash 1.6.3", - "zstd 0.13.0", + "zstd 0.13.2", "zstd-sys", ] @@ -9669,7 +9650,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.87", @@ -10167,7 +10148,7 @@ checksum = "ad156d539c879b7a24a363a2016d77961786e71f48f2e2fc8302a92abd2429a6" dependencies = [ "hashbrown 0.13.2", "log", - "rustc-hash 1.1.0", + "rustc-hash", "slice-group-by", "smallvec", ] @@ -10246,7 +10227,7 @@ dependencies = [ "hex", "hmac", "home", - "http 1.1.0", + "http 1.2.0", "jsonwebtoken", "log", "once_cell", @@ -10318,7 +10299,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "hyper 1.4.1", @@ -10365,7 +10346,7 @@ checksum = "a45d100244a467870f6cb763c4484d010a6bed6bd610b3676e3825d93fb4cfbd" dependencies = [ "anyhow", "async-trait", - "http 1.1.0", + "http 1.2.0", "reqwest 0.12.4", "serde", "thiserror 1.0.63", @@ -10757,7 +10738,7 @@ dependencies = [ "governor", "hashbrown 0.14.5", "hex", - "http 1.1.0", + "http 1.2.0", "http-body 0.4.5", "humantime", "hytra", @@ -10874,7 +10855,7 @@ dependencies = [ "easy-ext", "futures", "http 0.2.9", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "hyper 0.14.27", "hyper 1.4.1", @@ -10936,7 +10917,7 @@ dependencies = [ "axum", "axum-extra", "futures", - "http 1.1.0", + "http 1.2.0", "madsim-tokio", "madsim-tonic", "prometheus", @@ -11017,7 +10998,7 @@ dependencies = [ "foyer", "futures", "futures-async-stream", - "http 1.1.0", + "http 1.2.0", "hyper 1.4.1", "itertools 0.13.0", "madsim-tokio", @@ -11409,7 +11390,7 @@ dependencies = [ "tonic", "tracing", "workspace-hack", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -11510,7 +11491,7 @@ dependencies = [ "url", "uuid", "workspace-hack", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -11709,7 +11690,7 @@ dependencies = [ "function_name", "futures", "hex", - "http 1.1.0", + "http 1.2.0", "itertools 0.13.0", "jsonbb", "madsim-tokio", @@ -11975,7 +11956,7 @@ dependencies = [ "either", "futures", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "hyper 1.4.1", "itertools 0.13.0", "lru 0.7.6", @@ -12224,7 +12205,7 @@ dependencies = [ "workspace-hack", "xorf", "xxhash-rust", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -12534,12 +12515,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc-hash" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" - [[package]] name = "rustc_version" version = "0.2.3" @@ -13790,9 +13765,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.23.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec31dce96f489e2247a165837f49bbce4912b0cbcf127b79b4fdd87503022ae9" +checksum = "2f8b0fbdd5d7cb140384bcf8607d8dc52b9296c64654606be5986aa04526b069" dependencies = [ "async-trait", "educe", @@ -14783,7 +14758,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http 1.1.0", + "http 1.2.0", "httparse", "rand", "ring 0.17.5", @@ -14877,7 +14852,7 @@ dependencies = [ "bytes", "flate2", "h2 0.4.7", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "hyper 1.4.1", @@ -14956,7 +14931,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http 1.1.0", + "http 1.2.0", "http-body 1.0.0", "http-body-util", "http-range-header", @@ -15852,7 +15827,7 @@ dependencies = [ "sha2", "toml 0.8.12", "windows-sys 0.52.0", - "zstd 0.13.0", + "zstd 0.13.2", ] [[package]] @@ -16744,11 +16719,11 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.0" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe 7.0.0", + "zstd-safe 7.2.1", ] [[package]] @@ -16763,20 +16738,19 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.0.0" +version = "7.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.8+zstd.1.5.5" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ "cc", - "libc", "pkg-config", ] diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index e700fbf6bb3d5..9fb18b25aa6a5 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -142,9 +142,9 @@ pub fn new_minio_operator( builder = builder.endpoint("http://127.0.0.1:9301"); - builder = builder.disable_ec2_metadata(); + // builder = builder.disable_ec2_metadata(); builder = builder.disable_config_load(); - builder = builder.allow_anonymous(); + // builder = builder.allow_anonymous(); // builder = builder.enable_virtual_host_style(); let op: Operator = Operator::new(builder)? From 1fe643987d25cd92c6da03ad936477f280157806 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 16:31:46 +0800 Subject: [PATCH 4/9] add minio file scan --- proto/batch_plan.proto | 1 + .../executors/src/executor/s3_file_scan.rs | 35 ++++++++++--- .../source/iceberg/parquet_file_handler.rs | 49 +++++-------------- src/frontend/src/expr/table_function.rs | 42 ++++++++++------ .../optimizer/plan_node/batch_file_scan.rs | 1 + .../optimizer/plan_node/generic/file_scan.rs | 1 + .../optimizer/plan_node/logical_file_scan.rs | 12 +++-- .../rule/table_function_to_file_scan_rule.rs | 7 ++- .../src/object/opendal_engine/opendal_s3.rs | 2 - 9 files changed, 84 insertions(+), 66 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2373b7d483e30..ebe4abced5134 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -88,6 +88,7 @@ message FileScanNode { enum StorageType { STORAGE_TYPE_UNSPECIFIED = 0; S3 = 1; + MINIO = 2; } repeated plan_common.ColumnDesc columns = 1; diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 8140011dfcfce..82e68c48879c6 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, new_s3_operator, read_parquet_file, + extract_bucket_and_file_name, new_minio_operator, new_s3_operator, read_parquet_file, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; @@ -41,6 +41,7 @@ pub struct S3FileScanExecutor { batch_size: usize, schema: Schema, identity: String, + is_minio: bool, } impl Executor for S3FileScanExecutor { @@ -67,6 +68,7 @@ impl S3FileScanExecutor { batch_size: usize, schema: Schema, identity: String, + is_minio: bool, ) -> Self { Self { file_format, @@ -77,6 +79,7 @@ impl S3FileScanExecutor { batch_size, schema, identity, + is_minio, } } @@ -85,12 +88,20 @@ impl S3FileScanExecutor { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { let (bucket, file_name) = extract_bucket_and_file_name(&file)?; - let op = new_s3_operator( - self.s3_region.clone(), - self.s3_access_key.clone(), - self.s3_secret_key.clone(), - bucket.clone(), - )?; + let op = match self.is_minio { + true => new_minio_operator( + self.s3_region.clone(), + self.s3_access_key.clone(), + self.s3_secret_key.clone(), + bucket.clone(), + )?, + false => new_s3_operator( + self.s3_region.clone(), + self.s3_access_key.clone(), + self.s3_secret_key.clone(), + bucket.clone(), + )?, + }; let chunk_stream = read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; #[for_await] @@ -115,7 +126,14 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { NodeBody::FileScan )?; - assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32); + let storage_type = file_scan_node.storage_type; + let is_minio = if storage_type == (StorageType::S3 as i32) { + false + } else if storage_type == (StorageType::Minio as i32) { + true + } else { + todo!() + }; Ok(Box::new(S3FileScanExecutor::new( match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() { @@ -129,6 +147,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder { source.context().get_config().developer.chunk_size, Schema::from_iter(file_scan_node.columns.iter().map(Field::from)), source.plan_node().get_identity().clone(), + is_minio, ))) } } diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 9fb18b25aa6a5..a621ed88f17c9 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -110,7 +110,6 @@ pub fn new_s3_operator( s3_secret_key: String, bucket: String, ) -> ConnectorResult { - println!("这里还在用s3"); // Create s3 builder. let mut builder = S3::default().bucket(&bucket).region(&s3_region); builder = builder.secret_access_key(&s3_access_key); @@ -135,22 +134,19 @@ pub fn new_minio_operator( bucket: String, ) -> ConnectorResult { // Create s3 builder. - println!("minio_region = {} minio_access_key = {} minio_secret_key = {} bucket = {} ", minio_region, minio_access_key, minio_secret_key, bucket); - let mut builder = S3::default().bucket(&bucket).region(&minio_region); - builder = builder.secret_access_key(&minio_access_key); - builder = builder.secret_access_key(&minio_secret_key); - builder = builder.endpoint("http://127.0.0.1:9301"); - - - // builder = builder.disable_ec2_metadata(); + let mut builder = S3::default(); + builder = builder + .region(&minio_region) + .access_key_id(&minio_access_key) + .secret_access_key(&minio_secret_key) + .bucket(&bucket); + builder = builder.endpoint(&format!("http://{}.127.0.0.1:9301", bucket)); builder = builder.disable_config_load(); - // builder = builder.allow_anonymous(); - // builder = builder.enable_virtual_host_style(); - let op: Operator = Operator::new(builder)? - .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) - .finish(); + let op: Operator = Operator::new(builder)? + // .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); Ok(op) } @@ -170,30 +166,11 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, Ok((bucket, file_name)) } -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { +pub async fn list_s3_directory(op: Operator, dir: String) -> Result, anyhow::Error> { println!("list"); let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(&bucket); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - op.list(&file_name) .await .map_err(|e| anyhow!(e)) @@ -272,7 +249,6 @@ pub async fn read_parquet_file( ) -> ConnectorResult< Pin> + Send>>, > { - println!("这里 file name = {:?}", file_name); let mut reader: tokio_util::compat::Compat = op .reader_with(&file_name) .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. @@ -331,7 +307,6 @@ pub async fn get_parquet_fields( op: Operator, file_name: String, ) -> ConnectorResult { - println!("file_name = {:?}", file_name); let mut reader: tokio_util::compat::Compat = op .reader_with(&file_name) .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 81b11a5bad287..74dcc52ccd5a9 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,8 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_minio_operator, new_s3_operator + extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_minio_operator, + new_s3_operator, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -133,7 +134,9 @@ impl TableFunction { .into()); } - if !"s3".eq_ignore_ascii_case(&eval_args[1]) && !"minio".eq_ignore_ascii_case(&eval_args[1]){ + if !"s3".eq_ignore_ascii_case(&eval_args[1]) + && !"minio".eq_ignore_ascii_case(&eval_args[1]) + { return Err(BindError( "file_scan function only accepts 's3' as storage type".to_owned(), ) @@ -151,18 +154,30 @@ impl TableFunction { let files = if eval_args[5].ends_with('/') { let files = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { - let files = list_s3_directory( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - eval_args[5].clone(), - ) - .await?; + let (bucket, _file_name) = + extract_bucket_and_file_name(&eval_args[5].clone())?; + let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { + new_minio_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } else { + unreachable!() + }; + let files = list_s3_directory(op, eval_args[5].clone()).await?; Ok::, anyhow::Error>(files) }) })?; - if files.is_empty() { return Err(BindError( "file_scan function only accepts non-empty directory".to_owned(), @@ -182,24 +197,23 @@ impl TableFunction { None => eval_args[5].clone(), }; let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = if "s3".eq_ignore_ascii_case(&eval_args[1]){ + let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { new_s3_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), bucket.clone(), )? - }else if "minio".eq_ignore_ascii_case(&eval_args[1]){ + } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { new_minio_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), bucket.clone(), )? - }else{ + } else { unreachable!() }; - let fields = get_parquet_fields(op, file_name).await?; diff --git a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs index 649c178855ef9..949286ef47d43 100644 --- a/src/frontend/src/optimizer/plan_node/batch_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_file_scan.rs @@ -89,6 +89,7 @@ impl ToBatchPb for BatchFileScan { }, storage_type: match self.core.storage_type { generic::StorageType::S3 => StorageType::S3 as i32, + generic::StorageType::Minio => StorageType::Minio as i32, }, s3_region: self.core.s3_region.clone(), s3_access_key: self.core.s3_access_key.clone(), diff --git a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs index 975151d89c797..eadd74fdd151a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/file_scan.rs @@ -27,6 +27,7 @@ pub enum FileFormat { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum StorageType { S3, + Minio, } #[derive(Debug, Clone, Educe)] diff --git a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs index abe8e40a8224f..14cfedb845e04 100644 --- a/src/frontend/src/optimizer/plan_node/logical_file_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_file_scan.rs @@ -50,12 +50,18 @@ impl LogicalFileScan { file_location: Vec, ) -> Self { assert!("parquet".eq_ignore_ascii_case(&file_format)); - assert!("s3".eq_ignore_ascii_case(&storage_type)); - + assert!( + "s3".eq_ignore_ascii_case(&storage_type) || "minio".eq_ignore_ascii_case(&storage_type) + ); + let storage_type = if "s3".eq_ignore_ascii_case(&storage_type) { + generic::StorageType::S3 + } else { + generic::StorageType::Minio + }; let core = generic::FileScan { schema, file_format: generic::FileFormat::Parquet, - storage_type: generic::StorageType::S3, + storage_type, s3_region, s3_access_key, s3_secret_key, diff --git a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs index ab538fb223bd7..7a68e9b53a0a0 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_file_scan_rule.rs @@ -58,7 +58,10 @@ impl Rule for TableFunctionToFileScanRule { } } assert!("parquet".eq_ignore_ascii_case(&eval_args[0])); - assert!("s3".eq_ignore_ascii_case(&eval_args[1])); + assert!( + ("s3".eq_ignore_ascii_case(&eval_args[1]) + || "minio".eq_ignore_ascii_case(&eval_args[1])) + ); let s3_region = eval_args[2].clone(); let s3_access_key = eval_args[3].clone(); let s3_secret_key = eval_args[4].clone(); @@ -69,7 +72,7 @@ impl Rule for TableFunctionToFileScanRule { logical_table_function.ctx(), schema, "parquet".to_owned(), - "s3".to_owned(), + eval_args[1].to_owned(), s3_region, s3_access_key, s3_secret_key, diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 51f78e055cd4a..7a71ada5ca4b8 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -78,8 +78,6 @@ impl OpendalObjectStore { "http://" }; let (address, bucket) = rest.split_once('/').unwrap(); - let a = &format!("{}{}", endpoint_prefix, address); - println!("这里 {:?}", a); let builder = S3::default() .bucket(bucket) .region("custom") From 3cef9a79055eec387d6d79d7290a2b16f99fab80 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 16:39:02 +0800 Subject: [PATCH 5/9] minor refactor --- .../source/iceberg/parquet_file_handler.rs | 5 +- src/frontend/src/expr/table_function.rs | 58 +++++++------------ 2 files changed, 22 insertions(+), 41 deletions(-) diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index a621ed88f17c9..a3cedd56eb92b 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -133,7 +133,7 @@ pub fn new_minio_operator( minio_secret_key: String, bucket: String, ) -> ConnectorResult { - // Create s3 builder. + // Create minio builder. let mut builder = S3::default(); builder = builder .region(&minio_region) @@ -144,7 +144,7 @@ pub fn new_minio_operator( builder = builder.disable_config_load(); let op: Operator = Operator::new(builder)? - // .layer(LoggingLayer::default()) + .layer(LoggingLayer::default()) .layer(RetryLayer::default()) .finish(); Ok(op) @@ -167,7 +167,6 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, } pub async fn list_s3_directory(op: Operator, dir: String) -> Result, anyhow::Error> { - println!("list"); let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 74dcc52ccd5a9..35ed4dab4d697 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -151,29 +151,28 @@ impl TableFunction { #[cfg(not(madsim))] { + let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?; + let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { + new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { + new_minio_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + )? + } else { + unreachable!() + }; let files = if eval_args[5].ends_with('/') { let files = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { - let (bucket, _file_name) = - extract_bucket_and_file_name(&eval_args[5].clone())?; - let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { - new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { - new_minio_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else { - unreachable!() - }; - let files = list_s3_directory(op, eval_args[5].clone()).await?; + let files = list_s3_directory(op.clone(), eval_args[5].clone()).await?; Ok::, anyhow::Error>(files) }) @@ -196,24 +195,7 @@ impl TableFunction { Some(files) => files[0].clone(), None => eval_args[5].clone(), }; - let (bucket, file_name) = extract_bucket_and_file_name(&location)?; - let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { - new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { - new_minio_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else { - unreachable!() - }; + let (_, file_name) = extract_bucket_and_file_name(&location)?; let fields = get_parquet_fields(op, file_name).await?; From 38e155fbb7a23c5a1555a68a811f8eaee0bb6d1c Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 17:08:33 +0800 Subject: [PATCH 6/9] add file scan test --- e2e_test/s3/file_sink.py | 55 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index a64f40d0692df..23129e5631376 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix): def _table(): return 's3_test_parquet' + print("test table function file scan") + cur.execute(f''' + SELECT + id, + name, + sex, + mark, + test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns + FROM file_scan( + 'parquet', + 'minio', + 'custom', + 'hummockadmin', + 'hummockadmin', + 's3://hummock001/test_file_scan/test_file_scan.parquet' + );''') + result = cur.fetchone() + assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.' + + print("file scan test pass") # Execute a SELECT statement cur.execute(f'''CREATE TABLE {_table()}( id bigint primary key, @@ -491,7 +531,22 @@ def _assert_greater(field, got, expect): _s3(idx), _local(idx) ) + # put parquet file to test table function file scan + if data: + first_file_data = data[0] + first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data)) + + first_file_name = f"test_file_scan.parquet" + first_file_path = f"test_file_scan/{first_file_name}" + + pq.write_table(first_table, "data_0.parquet") + client.fput_object( + "hummock001", + first_file_path, + "data_0.parquet" + ) + # do test do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) From b2e7afc432cf00ecc1be46237a288a97a4e0b865 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 17:27:18 +0800 Subject: [PATCH 7/9] refactor --- .../executors/src/executor/s3_file_scan.rs | 23 ++++------- .../source/iceberg/parquet_file_handler.rs | 40 ++++++------------- src/frontend/src/expr/table_function.rs | 28 +++++-------- 3 files changed, 29 insertions(+), 62 deletions(-) diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 82e68c48879c6..39df883697249 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -17,7 +17,7 @@ use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, new_minio_operator, new_s3_operator, read_parquet_file, + extract_bucket_and_file_name, new_s3_operator, read_parquet_file, }; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; @@ -88,20 +88,13 @@ impl S3FileScanExecutor { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { let (bucket, file_name) = extract_bucket_and_file_name(&file)?; - let op = match self.is_minio { - true => new_minio_operator( - self.s3_region.clone(), - self.s3_access_key.clone(), - self.s3_secret_key.clone(), - bucket.clone(), - )?, - false => new_s3_operator( - self.s3_region.clone(), - self.s3_access_key.clone(), - self.s3_secret_key.clone(), - bucket.clone(), - )?, - }; + let op = new_s3_operator( + self.s3_region.clone(), + self.s3_access_key.clone(), + self.s3_secret_key.clone(), + bucket.clone(), + self.is_minio, + )?; let chunk_stream = read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; #[for_await] diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index a3cedd56eb92b..b493458f558a2 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -109,44 +109,28 @@ pub fn new_s3_operator( s3_access_key: String, s3_secret_key: String, bucket: String, + is_minio: bool, ) -> ConnectorResult { // Create s3 builder. - let mut builder = S3::default().bucket(&bucket).region(&s3_region); - builder = builder.secret_access_key(&s3_access_key); - builder = builder.secret_access_key(&s3_secret_key); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - - let op: Operator = Operator::new(builder)? - .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) - .finish(); - - Ok(op) -} - -pub fn new_minio_operator( - minio_region: String, - minio_access_key: String, - minio_secret_key: String, - bucket: String, -) -> ConnectorResult { - // Create minio builder. let mut builder = S3::default(); builder = builder - .region(&minio_region) - .access_key_id(&minio_access_key) - .secret_access_key(&minio_secret_key) + .region(&s3_region) + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) .bucket(&bucket); - builder = builder.endpoint(&format!("http://{}.127.0.0.1:9301", bucket)); + builder = match is_minio { + true => builder.endpoint(&format!("http://{}.127.0.0.1:9301", bucket)), + false => builder.endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )), + }; builder = builder.disable_config_load(); - let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) .finish(); + Ok(op) } diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 35ed4dab4d697..eef50c95e30a8 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,8 +21,7 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_minio_operator, - new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -152,23 +151,14 @@ impl TableFunction { #[cfg(not(madsim))] { let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?; - let op = if "s3".eq_ignore_ascii_case(&eval_args[1]) { - new_s3_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else if "minio".eq_ignore_ascii_case(&eval_args[1]) { - new_minio_operator( - eval_args[2].clone(), - eval_args[3].clone(), - eval_args[4].clone(), - bucket.clone(), - )? - } else { - unreachable!() - }; + + let op = new_s3_operator( + eval_args[2].clone(), + eval_args[3].clone(), + eval_args[4].clone(), + bucket.clone(), + "minio".eq_ignore_ascii_case(&eval_args[1]), + )?; let files = if eval_args[5].ends_with('/') { let files = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { From d430cf5572d6045b6229a633759d1a8db007d106 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 17:31:58 +0800 Subject: [PATCH 8/9] revert change in cargo.lock --- Cargo.lock | 281 ++++++++++------------------------------------------- 1 file changed, 54 insertions(+), 227 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4152238f77019..38f0bddbb20bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1331,15 +1331,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "aws-http" -version = "0.60.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eda156637dc4fd310cd05b2a35f963a591684b02b13694bd790df48f5349ee0" -dependencies = [ - "aws-runtime", -] - [[package]] name = "aws-lc-rs" version = "1.6.2" @@ -1987,26 +1978,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bcc" -version = "0.0.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce860f38082f1544a557dfa447838143e1b0bfa061c0369e407ebadf640001d1" -dependencies = [ - "bcc-sys", - "bitflags 1.3.2", - "byteorder", - "libc", - "socket2 0.4.9", - "thiserror 1.0.63", -] - -[[package]] -name = "bcc-sys" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f40afb3abbf90895dda3ddbc6d8734d24215130a22d646067690f5e318f81bc" - [[package]] name = "beef" version = "0.5.2" @@ -2359,15 +2330,6 @@ dependencies = [ "either", ] -[[package]] -name = "bytesize" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" -dependencies = [ - "serde", -] - [[package]] name = "bzip2" version = "0.4.4" @@ -3499,19 +3461,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core 0.9.8", -] - [[package]] name = "dashmap" version = "6.1.0" @@ -3554,7 +3503,7 @@ dependencies = [ "bytes", "bzip2", "chrono", - "dashmap 6.1.0", + "dashmap", "datafusion-catalog", "datafusion-common", "datafusion-common-runtime", @@ -3647,7 +3596,7 @@ checksum = "799e70968c815b611116951e3dd876aef04bf217da31b72eec01ee6a959336a1" dependencies = [ "arrow 52.2.0", "chrono", - "dashmap 6.1.0", + "dashmap", "datafusion-common", "datafusion-expr", "futures", @@ -4043,7 +3992,7 @@ dependencies = [ "bytes", "cfg-if", "chrono", - "dashmap 6.1.0", + "dashmap", "datafusion", "datafusion-common", "datafusion-expr", @@ -5339,21 +5288,6 @@ dependencies = [ "libc", ] -[[package]] -name = "function_name" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1ab577a896d09940b5fe12ec5ae71f9d8211fff62c919c03a3750a9901e98a7" -dependencies = [ - "function_name-proc-macro", -] - -[[package]] -name = "function_name-proc-macro" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "673464e1e314dd67a0fd9544abc99e8eb28d0c7e3b69b033bcff9b2d00b87333" - [[package]] name = "funty" version = "2.0.0" @@ -5788,14 +5722,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ "cfg-if", - "dashmap 5.5.3", "futures", "futures-timer", "no-std-compat", "nonzero_ext", "parking_lot 0.12.1", "portable-atomic", - "rand", "smallvec", "spinning_top", ] @@ -5944,7 +5876,6 @@ checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ "base64 0.13.1", "byteorder", - "crossbeam-channel", "flate2", "nom", "num-traits", @@ -6298,15 +6229,17 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "array-init", "arrow-arith 53.0.0", "arrow-array 53.2.0", + "arrow-buffer 53.2.0", "arrow-cast 53.2.0", "arrow-ord 53.0.0", + "arrow-row 53.0.0", "arrow-schema 53.2.0", "arrow-select 53.2.0", "arrow-string 53.0.0", @@ -6345,7 +6278,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" dependencies = [ "anyhow", "async-trait", @@ -6362,7 +6295,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=53f786fb2141b51d10a173cbcb5595edd5aa52a6#53f786fb2141b51d10a173cbcb5595edd5aa52a6" +source = "git+https://github.com/risingwavelabs/iceberg-rust.git?rev=2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21#2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" dependencies = [ "async-trait", "chrono", @@ -7435,7 +7368,7 @@ checksum = "f271a476bbaa9d2139e1e1a5beb869c6119e805a0b67ad2b2857e4a8785b111a" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.13.1", + "prost-build 0.13.4", "quote", "syn 2.0.87", "tonic-build", @@ -9587,6 +9520,15 @@ dependencies = [ "prost-derive 0.13.1", ] +[[package]] +name = "prost" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "bytes", + "prost-derive 0.13.4", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -9611,20 +9553,18 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" dependencies = [ - "bytes", - "heck 0.5.0", - "itertools 0.13.0", + "heck 0.4.1", + "itertools 0.10.5", "log", - "multimap 0.10.0", + "multimap 0.8.3", "once_cell", "petgraph", "prettyplease 0.2.15", - "prost 0.13.1", - "prost-types 0.13.1", + "prost 0.13.4", + "prost-types 0.13.4", "regex", "syn 2.0.87", "tempfile", @@ -9663,7 +9603,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "anyhow", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.87", @@ -9712,6 +9664,14 @@ dependencies = [ "prost 0.13.1", ] +[[package]] +name = "prost-types" +version = "0.13.4" +source = "git+https://github.com/xxchan/prost.git?rev=0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163#0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" +dependencies = [ + "prost 0.13.4", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -10448,7 +10408,6 @@ dependencies = [ "regex", "reqwest 0.12.4", "serde", - "serde_json", "serde_with 3.8.1", "serde_yaml", "sqlx", @@ -10492,7 +10451,6 @@ version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", - "bincode 1.3.3", "bytes", "itertools 0.13.0", "parking_lot 0.12.1", @@ -10515,7 +10473,6 @@ dependencies = [ "anyhow", "async-recursion", "async-trait", - "criterion", "either", "futures", "futures-async-stream", @@ -10545,7 +10502,6 @@ dependencies = [ "tempfile", "thiserror 1.0.63", "thiserror-ext", - "tikv-jemallocator", "tokio-postgres", "tokio-stream 0.1.15", "tracing", @@ -10604,40 +10560,19 @@ name = "risingwave_bench" version = "2.3.0-alpha" dependencies = [ "anyhow", - "async-trait", - "aws-config", - "aws-sdk-s3", - "aws-smithy-http", - "aws-smithy-types", - "bcc", - "bytes", - "bytesize", "clap", "futures", "futures-async-stream", - "hdrhistogram", "itertools 0.13.0", - "libc", "madsim-tokio", "nix 0.29.0", - "opentelemetry", - "parking_lot 0.12.1", "plotters", - "prometheus", - "rand", "risingwave_common", "risingwave_connector", - "risingwave_pb", - "risingwave_rt", - "risingwave_storage", "risingwave_stream", "serde", "serde_yaml", "thiserror-ext", - "tokio-stream 0.1.15", - "toml 0.8.12", - "tracing", - "tracing-subscriber", "workspace-hack", ] @@ -10647,7 +10582,6 @@ version = "2.3.0-alpha" dependencies = [ "clap", "madsim-tokio", - "prometheus", "risingwave_batch_executors", "risingwave_common", "risingwave_compactor", @@ -10666,14 +10600,12 @@ dependencies = [ name = "risingwave_cmd_all" version = "2.3.0-alpha" dependencies = [ - "anyhow", "clap", "console", "const-str", "expect-test", "home", "madsim-tokio", - "prometheus", "risingwave_batch_executors", "risingwave_cmd", "risingwave_common", @@ -10687,7 +10619,6 @@ dependencies = [ "shell-words", "strum 0.26.3", "strum_macros 0.26.4", - "tempfile", "thiserror-ext", "tikv-jemallocator", "tracing", @@ -10741,7 +10672,6 @@ dependencies = [ "http 1.2.0", "http-body 0.4.5", "humantime", - "hytra", "itertools 0.13.0", "itoa", "jsonbb", @@ -10822,7 +10752,6 @@ dependencies = [ "ethnum", "fixedbitset 0.5.0", "jsonbb", - "lru 0.7.6", "risingwave_common_proc_macro", "rust_decimal", "serde_json", @@ -10832,7 +10761,6 @@ dependencies = [ name = "risingwave_common_heap_profiling" version = "2.3.0-alpha" dependencies = [ - "anyhow", "chrono", "madsim-tokio", "parking_lot 0.12.1", @@ -10938,21 +10866,14 @@ name = "risingwave_compaction_test" version = "2.3.0-alpha" dependencies = [ "anyhow", - "async-trait", "bytes", "clap", "foyer", - "futures", "madsim-tokio", - "prometheus", - "rand", "risingwave_common", "risingwave_compactor", "risingwave_hummock_sdk", - "risingwave_hummock_test", - "risingwave_meta", "risingwave_meta_node", - "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_rt", @@ -10971,7 +10892,6 @@ dependencies = [ "jsonbb", "madsim-tokio", "madsim-tonic", - "parking_lot 0.12.1", "prost 0.13.1", "risingwave_common", "risingwave_common_heap_profiling", @@ -10999,7 +10919,6 @@ dependencies = [ "futures", "futures-async-stream", "http 1.2.0", - "hyper 1.4.1", "itertools 0.13.0", "madsim-tokio", "madsim-tonic", @@ -11029,7 +10948,6 @@ dependencies = [ "tokio-stream 0.1.15", "tower 0.5.0", "tracing", - "uuid", "workspace-hack", ] @@ -11069,6 +10987,7 @@ dependencies = [ "deltalake", "duration-str", "easy-ext", + "either", "elasticsearch", "enum-as-inner 0.6.0", "expect-test", @@ -11081,7 +11000,7 @@ dependencies = [ "google-cloud-gax", "google-cloud-googleapis", "google-cloud-pubsub", - "http 0.2.9", + "governor", "iceberg", "iceberg-catalog-glue", "iceberg-catalog-rest", @@ -11089,8 +11008,6 @@ dependencies = [ "indexmap 2.7.0", "itertools 0.13.0", "jni", - "jsonbb", - "jsonwebtoken", "madsim-rdkafka", "madsim-tokio", "madsim-tonic", @@ -11125,7 +11042,6 @@ dependencies = [ "risingwave_common_estimate_size", "risingwave_connector_codec", "risingwave_jni_core", - "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "rumqttc", @@ -11185,7 +11101,7 @@ dependencies = [ "madsim-tokio", "num-bigint", "prost 0.13.1", - "prost-build 0.13.1", + "prost-build 0.13.4", "prost-reflect", "prost-types 0.13.1", "protox", @@ -11216,7 +11132,6 @@ dependencies = [ "itertools 0.13.0", "madsim-tokio", "madsim-tonic", - "memcomparable", "prost 0.13.1", "regex", "risingwave_common", @@ -11225,7 +11140,6 @@ dependencies = [ "risingwave_hummock_sdk", "risingwave_meta", "risingwave_meta_model", - "risingwave_meta_model_migration", "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", @@ -11247,21 +11161,15 @@ name = "risingwave_dml" version = "2.3.0-alpha" dependencies = [ "assert_matches", - "criterion", "futures", "futures-async-stream", "itertools 0.13.0", "madsim-tokio", "parking_lot 0.12.1", "paste", - "rand", "risingwave_common", - "risingwave_connector", - "risingwave_pb", - "rw_futures_util", "tempfile", "thiserror 1.0.63", - "thiserror-ext", "tracing", "workspace-hack", ] @@ -11287,7 +11195,6 @@ version = "2.3.0-alpha" dependencies = [ "anyhow", "bincode 1.3.3", - "bytes", "easy-ext", "madsim-tonic", "serde", @@ -11305,7 +11212,6 @@ dependencies = [ "async-trait", "auto_impl", "await-tree", - "cfg-or-panic", "chrono", "const-currying", "downcast-rs", @@ -11467,7 +11373,6 @@ dependencies = [ "risingwave_expr_impl", "risingwave_frontend_macro", "risingwave_hummock_sdk", - "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "risingwave_sqlparser", @@ -11491,7 +11396,6 @@ dependencies = [ "url", "uuid", "workspace-hack", - "zstd 0.13.2", ] [[package]] @@ -11508,16 +11412,13 @@ name = "risingwave_hummock_sdk" version = "2.3.0-alpha" dependencies = [ "bytes", - "easy-ext", "hex", "itertools 0.13.0", "parse-display", - "prost 0.13.1", "risingwave_common", "risingwave_common_estimate_size", "risingwave_pb", "serde", - "serde_bytes", "tracing", "workspace-hack", ] @@ -11549,7 +11450,6 @@ dependencies = [ "risingwave_rpc_client", "risingwave_storage", "risingwave_test_runner", - "serde", "serial_test", "sync-point", "workspace-hack", @@ -11612,21 +11512,14 @@ dependencies = [ "cfg-or-panic", "chrono", "expect-test", - "foyer", "fs-err", "futures", - "itertools 0.13.0", "jni", "madsim-tokio", "paste", "prost 0.13.1", "risingwave_common", - "risingwave_expr", - "risingwave_hummock_sdk", "risingwave_pb", - "rw_futures_util", - "serde", - "serde_json", "thiserror 1.0.63", "thiserror-ext", "tracing", @@ -11651,15 +11544,10 @@ dependencies = [ name = "risingwave_mem_table_spill_test" version = "2.3.0-alpha" dependencies = [ - "async-trait", - "bytes", - "futures", - "futures-async-stream", "madsim-tokio", "risingwave_common", "risingwave_hummock_sdk", "risingwave_hummock_test", - "risingwave_storage", "risingwave_stream", ] @@ -11671,10 +11559,8 @@ dependencies = [ "arc-swap", "assert_matches", "async-trait", - "aws-config", "axum", "base64-url", - "bincode 1.3.3", "bytes", "chrono", "clap", @@ -11686,8 +11572,6 @@ dependencies = [ "enum-as-inner 0.6.0", "expect-test", "fail", - "flate2", - "function_name", "futures", "hex", "http 1.2.0", @@ -11696,9 +11580,6 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "maplit", - "memcomparable", - "mime_guess", - "moka", "notify", "num-integer", "num-traits", @@ -11722,7 +11603,6 @@ dependencies = [ "risingwave_rpc_client", "risingwave_sqlparser", "risingwave_test_runner", - "rw-aws-sdk-ec2", "rw_futures_util", "scopeguard", "sea-orm", @@ -11739,7 +11619,6 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", - "url", "uuid", "workspace-hack", ] @@ -11771,7 +11650,6 @@ version = "2.3.0-alpha" dependencies = [ "prost 0.13.1", "risingwave_common", - "risingwave_hummock_sdk", "risingwave_pb", "sea-orm", "serde", @@ -11795,13 +11673,9 @@ dependencies = [ name = "risingwave_meta_node" version = "2.3.0-alpha" dependencies = [ - "anyhow", "clap", "educe", - "either", - "futures", "hex", - "itertools 0.13.0", "madsim-tokio", "madsim-tonic", "otlp-embedded", @@ -11812,12 +11686,10 @@ dependencies = [ "risingwave_common_heap_profiling", "risingwave_common_service", "risingwave_meta", - "risingwave_meta_model_migration", "risingwave_meta_service", "risingwave_pb", "risingwave_rpc_client", "sea-orm", - "serde", "serde_json", "thiserror-ext", "tracing", @@ -11830,12 +11702,10 @@ version = "2.3.0-alpha" dependencies = [ "anyhow", "async-trait", - "either", "futures", "itertools 0.13.0", "madsim-tokio", "madsim-tonic", - "prost 0.13.1", "rand", "regex", "risingwave_common", @@ -11846,7 +11716,6 @@ dependencies = [ "risingwave_pb", "sea-orm", "serde_json", - "sync-point", "thiserror-ext", "tokio-stream 0.1.15", "tracing", @@ -11867,12 +11736,10 @@ dependencies = [ "aws-smithy-types", "bytes", "crc32fast", - "either", "fail", "futures", "hyper 0.14.27", "hyper-rustls 0.24.2", - "hyper-tls 0.5.0", "itertools 0.13.0", "madsim", "madsim-aws-sdk-s3", @@ -11900,10 +11767,11 @@ dependencies = [ "pbjson", "pbjson-build", "prost 0.13.1", - "prost-build 0.13.1", + "prost-build 0.13.4", "prost-helpers", "risingwave_error", "serde", + "static_assertions", "strum 0.26.3", "thiserror 1.0.63", "walkdir", @@ -11955,10 +11823,7 @@ dependencies = [ "easy-ext", "either", "futures", - "h2 0.4.7", "http 1.2.0", - "hyper 1.4.1", - "itertools 0.13.0", "lru 0.7.6", "madsim-tokio", "madsim-tonic", @@ -11977,7 +11842,6 @@ dependencies = [ "tokio-stream 0.1.15", "tower 0.5.0", "tracing", - "url", "workspace-hack", ] @@ -12029,14 +11893,12 @@ dependencies = [ "itertools 0.13.0", "lru 0.7.6", "madsim", - "madsim-aws-sdk-s3", "madsim-rdkafka", "madsim-tokio", "maplit", "paste", "pin-project", "pretty_assertions", - "prometheus", "rand", "rand_chacha", "risingwave_batch_executors", @@ -12052,7 +11914,6 @@ dependencies = [ "risingwave_meta_node", "risingwave_object_store", "risingwave_pb", - "risingwave_rpc_client", "risingwave_sqlparser", "risingwave_sqlsmith", "serde", @@ -12062,7 +11923,6 @@ dependencies = [ "tempfile", "tikv-jemallocator", "tokio-postgres", - "tokio-stream 0.1.15", "tracing", "tracing-subscriber", "uuid", @@ -12107,7 +11967,6 @@ dependencies = [ "risingwave_expr", "risingwave_expr_impl", "risingwave_frontend", - "risingwave_pb", "risingwave_sqlparser", "similar", "thiserror-ext", @@ -12125,7 +11984,6 @@ dependencies = [ "clap", "futures", "madsim-tokio", - "prometheus", "regex", "risingwave_rt", "serde", @@ -12141,7 +11999,6 @@ dependencies = [ name = "risingwave_storage" version = "2.3.0-alpha" dependencies = [ - "ahash 0.8.11", "anyhow", "arc-swap", "async-trait", @@ -12150,9 +12007,7 @@ dependencies = [ "bincode 1.3.3", "bytes", "criterion", - "crossbeam", "darwin-libproc", - "dashmap 6.1.0", "dyn-clone", "either", "enum-as-inner 0.6.0", @@ -12267,7 +12122,6 @@ dependencies = [ "serde_yaml", "smallvec", "static_assertions", - "strum 0.26.3", "strum_macros 0.26.4", "thiserror 1.0.63", "thiserror-ext", @@ -12289,10 +12143,8 @@ dependencies = [ "prost 0.13.1", "reqwest 0.12.4", "risingwave_pb", - "thiserror 1.0.63", "thiserror-ext", "tracing", - "uuid", ] [[package]] @@ -12716,30 +12568,6 @@ dependencies = [ "wait-timeout", ] -[[package]] -name = "rw-aws-sdk-ec2" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80dba3602b267a7f9dcc546ccbf1d05752447773146253c7e344e2a320630b7f" -dependencies = [ - "aws-credential-types", - "aws-http", - "aws-runtime", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-query", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "fastrand", - "http 0.2.9", - "regex", - "tracing", -] - [[package]] name = "rw_futures_util" version = "0.0.0" @@ -13765,9 +13593,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.23.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f8b0fbdd5d7cb140384bcf8607d8dc52b9296c64654606be5986aa04526b069" +checksum = "48c03edcabfda1ab894cc63a115b9f014bfc6875916b850ab7498d3cb92daed9" dependencies = [ "async-trait", "educe", @@ -14881,7 +14709,7 @@ checksum = "fe4ee8877250136bd7e3d2331632810a4df4ea5e004656990d8d66d2f5ee8a67" dependencies = [ "prettyplease 0.2.15", "proc-macro2", - "prost-build 0.13.1", + "prost-build 0.13.4", "quote", "syn 2.0.87", ] @@ -16554,7 +16382,6 @@ dependencies = [ name = "with_options" version = "2.3.0-alpha" dependencies = [ - "proc-macro2", "quote", "syn 2.0.87", ] From 4d577820f93504535c82089001b64d5c50a675c4 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 27 Dec 2024 17:56:38 +0800 Subject: [PATCH 9/9] fmt --- e2e_test/s3/file_sink.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index 23129e5631376..d09217ffe09e2 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -532,12 +532,12 @@ def _assert_greater(field, got, expect): _local(idx) ) # put parquet file to test table function file scan - if data: + if data: first_file_data = data[0] first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data)) - - first_file_name = f"test_file_scan.parquet" - first_file_path = f"test_file_scan/{first_file_name}" + + first_file_name = f"test_file_scan.parquet" + first_file_path = f"test_file_scan/{first_file_name}" pq.write_table(first_table, "data_0.parquet") @@ -546,7 +546,7 @@ def _assert_greater(field, got, expect): first_file_path, "data_0.parquet" ) - + # do test do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)