From f7a4a5dfa9459e117d09fd0246644a87318c5b3d Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Fri, 20 Dec 2024 16:07:16 +0800 Subject: [PATCH] fix(query): double panic if broken state in block builder (#17091) * fix(query): fix call double finish in transform accumulating * fix(query): fix DataBlock corrupted, column length mismatch panic * fix(query): remove useless file --- .../src/processors/transforms/transform_accumulating.rs | 1 + .../src/processors/transforms/transform_sort_merge.rs | 1 + .../stage/src/read/row_based/processors/block_builder.rs | 8 ++++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs index 4c95bd160994..1ffc9dcbcf2e 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs @@ -66,6 +66,7 @@ impl Drop for AccumulatingTransformer { fn drop(&mut self) { drop_guard(move || { if !self.called_on_finish { + self.called_on_finish = true; self.inner.on_finish(false).unwrap(); } }) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index c37c8c27eaae..884c725a0aae 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -223,5 +223,6 @@ pub fn sort_merge( for block in data_blocks { processor.transform(block)?; } + processor.on_finish(true) } diff --git a/src/query/storages/stage/src/read/row_based/processors/block_builder.rs b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs index c127b2034c4d..e150f1e177d2 100644 --- a/src/query/storages/stage/src/read/row_based/processors/block_builder.rs +++ b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs @@ -148,7 +148,11 @@ impl AccumulatingTransform for BlockBuilder { Ok(blocks) } - fn on_finish(&mut self, _output: bool) -> Result> { - self.flush_block(true) + fn on_finish(&mut self, output: bool) -> Result> { + if output { + return self.flush_block(true); + } + + Ok(vec![]) } }