Skip to content

Commit

Permalink
do not change cargo.lock
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Dec 13, 2024
1 parent dcf3360 commit a65ff03
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 38 deletions.
12 changes: 8 additions & 4 deletions src/batch/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file};
use risingwave_connector::source::iceberg::{
extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -81,13 +83,15 @@ impl S3FileScanExecutor {
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);
for file in self.file_location {
let (bucket, file_name) = extract_bucket_and_file_name(&file)?;
let op = new_s3_operator(
self.s3_region.clone(),
self.s3_access_key.clone(),
self.s3_secret_key.clone(),
file.clone(),
bucket.clone(),
)?;
let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
#[for_await]
for stream_chunk in chunk_stream {
let stream_chunk = stream_chunk?;
Expand Down Expand Up @@ -126,4 +130,4 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
source.plan_node().get_identity().clone(),
)))
}
}
}
39 changes: 19 additions & 20 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ pub fn new_s3_operator(
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
location: String,
bucket: String,
) -> ConnectorResult<Operator> {
// Create s3 builder.
let bucket = extract_bucket(&location);
let mut builder = S3::default().bucket(&bucket).region(&s3_region);
builder = builder.secret_access_key(&s3_access_key);
builder = builder.secret_access_key(&s3_secret_key);
Expand All @@ -130,13 +129,20 @@ pub fn new_s3_operator(
Ok(op)
}

fn extract_bucket(location: &str) -> String {
let prefix = "s3://";
let start = prefix.len();
let end = location[start..]
.find('/')
.unwrap_or(location.len() - start);
location[start..start + end].to_string()
pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> {
let url = Url::parse(location)?;
let bucket = url
.host_str()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {}, missing bucket", location),
)
})?
.to_owned();
let prefix = format!("s3://{}/", bucket);
let file_name = location[prefix.len()..].to_string();
Ok((bucket, file_name))
}

pub async fn list_s3_directory(
Expand All @@ -145,27 +151,20 @@ pub async fn list_s3_directory(
s3_secret_key: String,
dir: String,
) -> Result<Vec<String>, anyhow::Error> {
let url = Url::parse(&dir)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid s3 url: {}, missing bucket", dir),
)
})?;

let (bucket, file_name) = extract_bucket_and_file_name(&dir)?;
let prefix = format!("s3://{}/", bucket);
if dir.starts_with(&prefix) {
let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(bucket);
.bucket(&bucket);
let op = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.list(&dir[prefix.len()..])
op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
.map(|list| {
Expand Down Expand Up @@ -386,4 +385,4 @@ fn is_parquet_schema_match_source_schema(
| (ArrowDateType::Struct(_), RwDataType::Struct(_))
| (ArrowDateType::Map(_, _), RwDataType::Map(_))
)
}
}
24 changes: 10 additions & 14 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use mysql_async::prelude::*;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_connector::source::iceberg::{
get_parquet_fields, list_s3_directory, new_s3_operator,
extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator,
};
pub use risingwave_pb::expr::table_function::PbType as TableFunctionType;
use risingwave_pb::expr::PbTableFunction;
Expand Down Expand Up @@ -177,23 +177,19 @@ impl TableFunction {

let schema = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(async {
let location = match files.as_ref() {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
};
let (bucket, file_name) = extract_bucket_and_file_name(&location)?;
let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
match files.as_ref() {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
},
bucket.clone(),
)?;
let fields = get_parquet_fields(
op,
match files.as_ref() {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
},
)
.await?;

let fields = get_parquet_fields(op, file_name).await?;

let mut rw_types = vec![];
for field in &fields {
Expand Down Expand Up @@ -564,4 +560,4 @@ impl Expr for TableFunction {
fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
unreachable!("Table function should not be converted to ExprNode")
}
}
}

0 comments on commit a65ff03

Please sign in to comment.