From c0f9b80c1b438d3e17cbbc5d91e1eaa5f9886339 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:20:06 +0800 Subject: [PATCH] fix: fix empty ops case in iceberg sink (#12811) Co-authored-by: ZENOTME --- src/connector/src/sink/iceberg.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 56dd3a9452c05..1031c5181d81e 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -544,6 +544,7 @@ impl UpsertWriter { } fn partition_ops(ops: &[Op]) -> Vec<(usize, usize)> { + assert!(!ops.is_empty()); let mut res = vec![]; let mut start = 0; let mut prev_op = ops[0]; @@ -560,6 +561,9 @@ impl UpsertWriter { pub async fn write(&mut self, chunk: StreamChunk) -> Result<()> { let (chunk, ops) = chunk.compact().into_parts(); + if ops.len() == 0 { + return Ok(()); + } let chunk = to_record_batch_with_schema(self.schema.clone(), &chunk.compact()) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; let ranges = Self::partition_ops(&ops);