Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 11, 2024
1 parent 1233be0 commit 122f317
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ protected Set<String> getMVToRefreshPartitionNames(
* @param refBaseTableAndColumns ref base table and columns of mv
* @return ref base table's changed partition names
*/
protected Map<Table, Set<String>> collectMVToBaseTablePartitionNames(Map<Table, List<Column>> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table, List<Column>> refBaseTableAndColumns,
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Table baseTable : refBaseTableAndColumns.keySet()) {
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio

// update mv's to refresh partitions based on base table's partition changes
MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL);
Map<Table, Set<String>> baseChangedPartitionNames = collectMVToBaseTablePartitionNames(refBaseTablePartitionColumns,
Map<Table, Set<String>> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTablePartitionColumns,
mvTimelinessInfo);

// collect base table's partition infos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExcep
// record the relation of partitions between materialized view and base partition table
MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL);
// collect & update mv's to refresh partitions based on base table's partition changes
Map<Table, Set<String>> baseChangedPartitionNames = collectMVToBaseTablePartitionNames(refBaseTablePartitionColumns,
Map<Table, Set<String>> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTablePartitionColumns,
mvTimelinessInfo);

// collect all ref base table's partition range map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.HivePartitionKey;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.Table;
Expand Down Expand Up @@ -743,6 +744,42 @@ public static Map<String, PCell> getMVPartitionToCells(Table table,
return partitionListMap;
}

/**
* Get MV's partition column indexes in ref base table's partition columns.
* NOTE:MV's partition columns may not be the same with ref base table's partition columns which may be less than ref base
* table's partition columns or not in the same order.
*/
public static List<Integer> getRefBaseTablePartitionColumIndexes(MaterializedView mv,
Table refBaseTable) {
Map<Table, List<Column>> mvRefBaseTablePartitionColumns = mv.getRefBaseTablePartitionColumns();
if (!mvRefBaseTablePartitionColumns.containsKey(refBaseTable)) {
return null;
}
List<Column> mvRefBaseTablePartitionCols = mvRefBaseTablePartitionColumns.get(refBaseTable);
if (mvRefBaseTablePartitionCols.size() > refBaseTable.getPartitionColumns().size()) {
return null;
}
List<Column> refBaseTablePartitionColumns = refBaseTable.getPartitionColumns();
return mvRefBaseTablePartitionCols.stream()
.map(col -> refBaseTablePartitionColumns.indexOf(col))
.collect(Collectors.toList());
}

/**
* Return the partition key of the selected partition columns. colIndexes is the index of selected partition columns.
*/
public static PartitionKey getSelectedPartitionKey(PartitionKey partitionKey,
List<Integer> colIndexes) {
if (partitionKey.getKeys().size() <= 1 || colIndexes == null) {
return partitionKey;
}
List<LiteralExpr> newPartitionKeys =
colIndexes.stream().map(partitionKey.getKeys()::get).collect(Collectors.toList());
List<PrimitiveType> newPartitionTypes =
colIndexes.stream().map(partitionKey.getTypes()::get).collect(Collectors.toList());
return new PartitionKey(newPartitionKeys, newPartitionTypes);
}

private static List<List<String>> generateMVPartitionList(PartitionKey partitionKey) {
List<List<String>> partitionKeyList = Lists.newArrayList();
List<String> partitionItem = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,15 @@ private MVCompensation getMVCompensationForExternal(Table refBaseTable,
Set<String> refTablePartitionNamesToRefresh,
LogicalScanOperator refScanOperator) {
SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable();
MaterializedView mv = mvContext.getMv();
try {
ScanOperatorPredicates scanOperatorPredicates = refScanOperator.getScanOperatorPredicates();
Collection<Long> selectPartitionIds = scanOperatorPredicates.getSelectedPartitionIds();
List<PartitionKey> selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys();
// For scan operator which support prune partitions with OptExternalPartitionPruner,
// we could only compensate partitions which selected partitions need to refresh.
if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) {
if (Objects.isNull(selectPartitionIds) || selectPartitionIds.isEmpty()) {
if (CollectionUtils.isEmpty(selectPartitionIds)) {
// see OptExternalPartitionPruner#computePartitionInfo:
// it's not the same meaning when selectPartitionIds is null and empty for hive and other tables
if (refScanOperator.getOpType() == OperatorType.LOGICAL_HIVE_SCAN) {
Expand All @@ -344,9 +345,18 @@ private MVCompensation getMVCompensationForExternal(Table refBaseTable,
}
}

if (selectPartitionKeys.stream()
// NOTE: ref base table's partition keys may contain multi columns, but mv may only contain one column.
List<Integer> colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mv, refBaseTable);
if (colIndexes == null) {
return MVCompensation.createUnkownState(sessionVariable);
}
List<PartitionKey> newPartitionKeys = selectPartitionKeys.stream()
.map(partitionKey -> PartitionUtil.getSelectedPartitionKey(partitionKey, colIndexes))
.collect(Collectors.toList());
Set<String> selectPartitionNames = newPartitionKeys.stream()
.map(PartitionUtil::generateMVPartitionName)
.noneMatch(refTablePartitionNamesToRefresh::contains)) {
.collect(Collectors.toSet());
if (selectPartitionNames.stream().noneMatch(refTablePartitionNamesToRefresh::contains)) {
return MVCompensation.createNoCompensateState(sessionVariable);
}
}
Expand Down Expand Up @@ -409,7 +419,6 @@ private List<PartitionKey> getMVCompensatePartitionsOfExternal(Table refBaseTabl
return getMVCompensatePartitionsOfExternalWithoutPartitionPruner(refBaseTable, refTablePartitionNamesToRefresh);
}
}

private List<PartitionKey> getMVCompensatePartitionsOfExternalWithPartitionPruner(
Set<String> refTablePartitionNamesToRefresh,
LogicalScanOperator refScanOperator) {
Expand All @@ -428,10 +437,16 @@ private List<PartitionKey> getMVCompensatePartitionsOfExternalWithPartitionPrune
if (selectPartitionKeys.isEmpty() && refScanOperator.getOpType() != OperatorType.LOGICAL_HIVE_SCAN) {
return null;
}
Table refBaseTable = refScanOperator.getTable();
List<Integer> colIndexes = PartitionUtil.getRefBaseTablePartitionColumIndexes(mvContext.getMv(), refBaseTable);
if (colIndexes == null) {
return null;
}
for (PartitionKey partitionKey : selectPartitionKeys) {
String partitionName = generateMVPartitionName(partitionKey);
PartitionKey newPartitionKey = PartitionUtil.getSelectedPartitionKey(partitionKey, colIndexes);
String partitionName = generateMVPartitionName(newPartitionKey);
if (refTablePartitionNamesToRefresh.contains(partitionName)) {
refTableCompensatePartitionKeys.add(partitionKey);
refTableCompensatePartitionKeys.add(newPartitionKey);
}
}
return refTableCompensatePartitionKeys;
Expand All @@ -444,7 +459,7 @@ private List<PartitionKey> getMVCompensatePartitionsOfExternalWithoutPartitionPr
if (baseTableUpdateInfo == null) {
return null;
}

// use update info's partition to cells since it's accurate.
Map<String, PCell> nameToPartitionKeys = baseTableUpdateInfo.getPartitonToCells();
List<PartitionKey> partitionKeys = Lists.newArrayList();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableSet;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.common.FeConstants;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.ScanOperatorPredicates;
import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator;
Expand Down Expand Up @@ -347,6 +348,8 @@ public void testPartitionedHiveMVWithLooseMode_MultiColumn() throws Exception {
"WHERE l_shipdate='1998-01-03'\n" +
"GROUP BY " +
"`l_orderkey`, `l_suppkey`, `l_shipdate`;").explainContains(mvName);

FeConstants.enablePruneEmptyOutputScan = true;
starRocksAssert.query("SELECT `l_orderkey`, `l_suppkey`, `l_shipdate`, sum(l_orderkey) " +
"FROM `hive0`.`partitioned_db`.`lineitem_mul_par3` as a \n " +
"WHERE l_shipdate='1998-01-01'\n" +
Expand All @@ -357,6 +360,7 @@ public void testPartitionedHiveMVWithLooseMode_MultiColumn() throws Exception {
"WHERE l_shipdate='1998-01-05'\n" +
"GROUP BY " +
"`l_orderkey`, `l_suppkey`, `l_shipdate`;").explainWithout(mvName);
FeConstants.enablePruneEmptyOutputScan = false;

dropMv("test", "hive_partitioned_mv");
}
Expand Down

0 comments on commit 122f317

Please sign in to comment.