Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): add minio file scan type and enhance test #19950

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,46 @@ def do_test(config, file_num, item_num_per_file, prefix):
def _table():
return 's3_test_parquet'

print("test table function file scan")
cur.execute(f'''
SELECT
id,
name,
sex,
mark,
test_int,
test_int8,
test_uint8,
test_uint16,
test_uint32,
test_uint64,
test_float_16,
test_real,
test_double_precision,
test_varchar,
test_bytea,
test_date,
test_time,
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
FROM file_scan(
'parquet',
'minio',
'custom',
'hummockadmin',
'hummockadmin',
's3://hummock001/test_file_scan/test_file_scan.parquet'
);''')
result = cur.fetchone()
assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.'

print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
Expand Down Expand Up @@ -491,6 +531,21 @@ def _assert_greater(field, got, expect):
_s3(idx),
_local(idx)
)
# put parquet file to test table function file scan
if data:
first_file_data = data[0]
first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data))

first_file_name = f"test_file_scan.parquet"
first_file_path = f"test_file_scan/{first_file_name}"

pq.write_table(first_table, "data_0.parquet")

client.fput_object(
"hummock001",
first_file_path,
"data_0.parquet"
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)
Expand Down
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message FileScanNode {
enum StorageType {
STORAGE_TYPE_UNSPECIFIED = 0;
S3 = 1;
MINIO = 2;
}

repeated plan_common.ColumnDesc columns = 1;
Expand Down
14 changes: 13 additions & 1 deletion src/batch/executors/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct S3FileScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
is_minio: bool,
}

impl Executor for S3FileScanExecutor {
Expand All @@ -67,6 +68,7 @@ impl S3FileScanExecutor {
batch_size: usize,
schema: Schema,
identity: String,
is_minio: bool,
) -> Self {
Self {
file_format,
Expand All @@ -77,6 +79,7 @@ impl S3FileScanExecutor {
batch_size,
schema,
identity,
is_minio,
}
}

Expand All @@ -90,6 +93,7 @@ impl S3FileScanExecutor {
self.s3_access_key.clone(),
self.s3_secret_key.clone(),
bucket.clone(),
self.is_minio,
)?;
let chunk_stream =
read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?;
Expand All @@ -115,7 +119,14 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
NodeBody::FileScan
)?;

assert_eq!(file_scan_node.storage_type, StorageType::S3 as i32);
let storage_type = file_scan_node.storage_type;
let is_minio = if storage_type == (StorageType::S3 as i32) {
false
} else if storage_type == (StorageType::Minio as i32) {
true
} else {
todo!()
};

Ok(Box::new(S3FileScanExecutor::new(
match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
Expand All @@ -129,6 +140,7 @@ impl BoxedExecutorBuilder for FileScanExecutorBuilder {
source.context().get_config().developer.chunk_size,
Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
source.plan_node().get_identity().clone(),
is_minio,
)))
}
}
44 changes: 16 additions & 28 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,23 @@ pub fn new_s3_operator(
s3_access_key: String,
s3_secret_key: String,
bucket: String,
is_minio: bool,
) -> ConnectorResult<Operator> {
// Create s3 builder.
let mut builder = S3::default().bucket(&bucket).region(&s3_region);
builder = builder.secret_access_key(&s3_access_key);
builder = builder.secret_access_key(&s3_secret_key);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));

let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = match is_minio {
true => builder.endpoint(&format!("http://{}.127.0.0.1:9301", bucket)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow users to provide s3 endpoint instead of hardcoding the endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think minio is only for test, so I hardcode it. IMO, we don't public file_scan(minio) to user, what do you think?

Copy link
Contributor Author

@wcy-fdu wcy-fdu Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow users to provide s3 endpoint instead of hardcoding the endpoint?

And this will require another parameter in file scan, which is a bit redudant. I prefer to provide users with only the file scan backend that is consistent with Cloud(s3, gcs, azblob).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the s3 region parameter position to allow users to set the endpoint as well, i.e. (region or endpoint)?

Copy link
Contributor Author

@wcy-fdu wcy-fdu Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

region is a required field. If we change region to endpoint, the user's endpoint must include region. Is this a bit strange?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When testing minio, the region is custom, the endpoint is http://bucket/127.0.0.1:9301, they have no intersection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an endpoint is specified, the region field is useless (Correct?). I mean file_scan('parquet', 's3', region_or_endpoint, ...) the region_or_endpoint parameter could be region or endpoint, we can detect it simply by http prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a try.

false => builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
)),
};
builder = builder.disable_config_load();
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand All @@ -143,29 +150,10 @@ pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String,
Ok((bucket, file_name))
}

pub async fn list_s3_directory(
s3_region: String,
s3_access_key: String,
s3_secret_key: String,
dir: String,
) -> Result<Vec<String>, anyhow::Error> {
pub async fn list_s3_directory(op: Operator, dir: String) -> Result<Vec<String>, anyhow::Error> {
let (bucket, file_name) = extract_bucket_and_file_name(&dir)?;
let prefix = format!("s3://{}/", bucket);
if dir.starts_with(&prefix) {
let mut builder = S3::default();
builder = builder
.region(&s3_region)
.access_key_id(&s3_access_key)
.secret_access_key(&s3_secret_key)
.bucket(&bucket);
builder = builder.endpoint(&format!(
"https://{}.s3.{}.amazonaws.com",
bucket, s3_region
));
let op = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
Expand Down
30 changes: 14 additions & 16 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ impl TableFunction {
.into());
}

if !"s3".eq_ignore_ascii_case(&eval_args[1]) {
if !"s3".eq_ignore_ascii_case(&eval_args[1])
&& !"minio".eq_ignore_ascii_case(&eval_args[1])
{
return Err(BindError(
"file_scan function only accepts 's3' as storage type".to_owned(),
)
Expand All @@ -148,21 +150,23 @@ impl TableFunction {

#[cfg(not(madsim))]
{
let (bucket, _) = extract_bucket_and_file_name(&eval_args[5].clone())?;

let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
"minio".eq_ignore_ascii_case(&eval_args[1]),
)?;
let files = if eval_args[5].ends_with('/') {
let files = tokio::task::block_in_place(|| {
FRONTEND_RUNTIME.block_on(async {
let files = list_s3_directory(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;
let files = list_s3_directory(op.clone(), eval_args[5].clone()).await?;

Ok::<Vec<String>, anyhow::Error>(files)
})
})?;

if files.is_empty() {
return Err(BindError(
"file_scan function only accepts non-empty directory".to_owned(),
Expand All @@ -181,13 +185,7 @@ impl TableFunction {
Some(files) => files[0].clone(),
None => eval_args[5].clone(),
};
let (bucket, file_name) = extract_bucket_and_file_name(&location)?;
let op = new_s3_operator(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
bucket.clone(),
)?;
let (_, file_name) = extract_bucket_and_file_name(&location)?;

let fields = get_parquet_fields(op, file_name).await?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl ToBatchPb for BatchFileScan {
},
storage_type: match self.core.storage_type {
generic::StorageType::S3 => StorageType::S3 as i32,
generic::StorageType::Minio => StorageType::Minio as i32,
},
s3_region: self.core.s3_region.clone(),
s3_access_key: self.core.s3_access_key.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/generic/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum FileFormat {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum StorageType {
S3,
Minio,
}

#[derive(Debug, Clone, Educe)]
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ impl LogicalFileScan {
file_location: Vec<String>,
) -> Self {
assert!("parquet".eq_ignore_ascii_case(&file_format));
assert!("s3".eq_ignore_ascii_case(&storage_type));

assert!(
"s3".eq_ignore_ascii_case(&storage_type) || "minio".eq_ignore_ascii_case(&storage_type)
);
let storage_type = if "s3".eq_ignore_ascii_case(&storage_type) {
generic::StorageType::S3
} else {
generic::StorageType::Minio
};
let core = generic::FileScan {
schema,
file_format: generic::FileFormat::Parquet,
storage_type: generic::StorageType::S3,
storage_type,
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl Rule for TableFunctionToFileScanRule {
}
}
assert!("parquet".eq_ignore_ascii_case(&eval_args[0]));
assert!("s3".eq_ignore_ascii_case(&eval_args[1]));
assert!(
("s3".eq_ignore_ascii_case(&eval_args[1])
|| "minio".eq_ignore_ascii_case(&eval_args[1]))
);
let s3_region = eval_args[2].clone();
let s3_access_key = eval_args[3].clone();
let s3_secret_key = eval_args[4].clone();
Expand All @@ -69,7 +72,7 @@ impl Rule for TableFunctionToFileScanRule {
logical_table_function.ctx(),
schema,
"parquet".to_owned(),
"s3".to_owned(),
eval_args[1].to_owned(),
s3_region,
s3_access_key,
s3_secret_key,
Expand Down
1 change: 0 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl OpendalObjectStore {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let builder = S3::default()
.bucket(bucket)
.region("custom")
Expand Down
Loading