diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs index 4c95bd160994..1ffc9dcbcf2e 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating.rs @@ -66,6 +66,7 @@ impl Drop for AccumulatingTransformer { fn drop(&mut self) { drop_guard(move || { if !self.called_on_finish { + self.called_on_finish = true; self.inner.on_finish(false).unwrap(); } }) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index c37c8c27eaae..884c725a0aae 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -223,5 +223,6 @@ pub fn sort_merge( for block in data_blocks { processor.transform(block)?; } + processor.on_finish(true) } diff --git a/src/query/storages/stage/src/read/row_based/processors/block_builder.rs b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs index c127b2034c4d..e150f1e177d2 100644 --- a/src/query/storages/stage/src/read/row_based/processors/block_builder.rs +++ b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs @@ -148,7 +148,11 @@ impl AccumulatingTransform for BlockBuilder { Ok(blocks) } - fn on_finish(&mut self, _output: bool) -> Result> { - self.flush_block(true) + fn on_finish(&mut self, output: bool) -> Result> { + if output { + return self.flush_block(true); + } + + Ok(vec![]) } }