Skip to content

Commit

Permalink
Fix mv union rewrite and partition prune bug
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 25, 2024
1 parent 33da390 commit 04ef3f9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ protected OptExpression createUnion(OptExpression queryInput,
List<ColumnRefOperator> originalOutputColumns = new ArrayList<>(queryColumnRefMap.keySet());
// rewrite query
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(materializationContext);
// reset original partition predicates to prune partitions/tablets again
OptExpression newQueryInput = duplicator.duplicate(queryInput, true);
// don't reset selected partition ids for query input, because query's partition ranges should not be extended.
OptExpression newQueryInput = duplicator.duplicate(queryInput, false);
List<ColumnRefOperator> newQueryOutputColumns = duplicator.getMappedColumns(originalOutputColumns);

Projection projection = getMvOptExprProjection(viewInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2230,11 +2230,10 @@ protected OptExpression createUnion(OptExpression queryInput, OptExpression view
// keys of queryColumnRefMap and mvColumnRefMap are the same
List<ColumnRefOperator> originalOutputColumns =
queryColumnRefMap.keySet().stream().collect(Collectors.toList());

// rewrite query
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(materializationContext);
// NOTE: selected partitions and tablets should be deduced again.
OptExpression newQueryInput = duplicator.duplicate(queryInput, true);
// don't reset selected partition ids for query input, because query's partition ranges should not be extended.
OptExpression newQueryInput = duplicator.duplicate(queryInput, false);
List<ColumnRefOperator> newQueryOutputColumns = duplicator.getMappedColumns(originalOutputColumns);

// rewrite viewInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,4 +1168,65 @@ public void testViewBaseMvRewriteWithPartitionExpr() throws Exception {
}
connectContext.getSessionVariable().setEnableViewBasedMvRewrite(false);
}

@Test
public void testMVPartitionRefreshRewrite() throws Exception {
connectContext.getSessionVariable().setEnableMaterializedViewTransparentUnionRewrite(false);
sql("CREATE TABLE test_base_table1(\n" +
" `col0` int(11) NULL,\n" +
" `col2` date NULL,\n" +
" `col3` varchar(32) NULL,\n" +
" `id` bigint(20) NULL,\n" +
" `col1` bigint(20) NULL\n" +
") DUPLICATE KEY(col0, col2, col3)\n" +
" PARTITION BY RANGE(col2)(\n" +
" START (\"2022-04-01\") END (\"2022-04-10\") EVERY (INTERVAL 1 day))\n" +
" DISTRIBUTED BY HASH(col0)\n" +
" PROPERTIES(\n" +
" \"replication_num\" = \"1\"\n" +
");");
sql("INSERT INTO test_base_table1 (col0, col2, col3) VALUES " +
"(987654321, '2022-04-01', 'Fujian1')," +
"(987654321, '2022-04-02', 'Fujian2')," +
"(987654321, '2022-04-03', 'Guangdong')," +
"(987654321, '2022-04-04', 'Fujian');");
sql("CREATE MATERIALIZED VIEW test_mv1 \n" +
"partition by (col2)\n" +
"REFRESH DEFERRED MANUAL\n" +
"AS\n" +
"SELECT * from (select col2,col3,col0,id,col1 FROM test_base_table1 " +
"where col3 = 'Guangdong' and col0 = 123456789) tmp;\n");

sql("refresh materialized view test_mv1 partition start('2022-04-01') end ('2022-04-04') with sync mode;");

String query = "select col1, col2, 'server' source_meta_type, count(1) " +
"from test_base_table1 where col2 between '2022-04-01' and '2022-04-05' group by col1, col2;\n";
{
String plan = getFragmentPlan(query);
PlanTestBase.assertContains(plan, "UNION");
// TODO: This should be rewritten since the partition range is not changed but it increased union operator,
// TODO: so the rewritten plan's performance is not better than the original plan.
// input query's partition range is [2022-04-01, 2022-04-05] and should not be changed.
PlanTestBase.assertContains(plan, " TABLE: test_base_table1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: ((13: col0 != 123456789) OR (14: col2 < '2022-04-01')) " +
"OR ((14: col2 >= '2022-04-04') OR (15: col3 != 'Guangdong'))\n" +
" partitions=5/9");
}

{
sql("INSERT INTO test_base_table1 partition('p20220405') VALUES (123456789, '2022-04-05', 'Guangdong', 1, 10001)");
String plan = getFragmentPlan(query);
PlanTestBase.assertContains(plan, "UNION");
// input query's partition range is [2022-04-01, 2022-04-05] and should not be changed.
PlanTestBase.assertContains(plan, " TABLE: test_base_table1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: ((13: col0 != 123456789) OR (14: col2 < '2022-04-01')) " +
"OR ((14: col2 >= '2022-04-04') OR (15: col3 != 'Guangdong'))\n" +
" partitions=5/9");
}
connectContext.getSessionVariable().setEnableMaterializedViewTransparentUnionRewrite(true);
starRocksAssert.dropTable("test_base_table1");
starRocksAssert.dropMaterializedView("test_mv1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static void beforeClass() throws Exception {
connectContext = UtFrameUtils.createDefaultCtx();
starRocksAssert = new StarRocksAssert(connectContext);
starRocksAssert.withDatabase(DB_NAME).useDatabase(DB_NAME);
connectContext.setDatabase(DB_NAME);

// set default config for async mvs
UtFrameUtils.setDefaultConfigForAsyncMVTest(connectContext);
Expand Down Expand Up @@ -245,6 +246,10 @@ public static void executeInsertSql(ConnectContext connectContext, String sql) t
StmtExecutor.newInternalExecutor(connectContext, statement).execute();
}

public static void sql(String sql) throws Exception {
cluster.runSql(DB_NAME, sql);
}

/**
* Add list partition with one value
* @param tbl table name
Expand Down

0 comments on commit 04ef3f9

Please sign in to comment.