Skip to content

Commit

Permalink
Refactor Range<PartitionKey> into PCell
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 9, 2024
1 parent ad5973a commit d952f0b
Show file tree
Hide file tree
Showing 33 changed files with 640 additions and 703 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.DescriptorTable.ReferencedPartitionInfo;
Expand Down Expand Up @@ -73,6 +72,7 @@
import com.starrocks.sql.analyzer.Scope;
import com.starrocks.sql.analyzer.SelectAnalyzer;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.MvRewritePreprocessor;
Expand Down Expand Up @@ -1540,13 +1540,13 @@ public Map<Table, List<Column>> getRefBaseTablePartitionColumns() {
* we also need to consider other ref table partitions(p0); otherwise, the mv's final result will lose data.
*/
public boolean isCalcPotentialRefreshPartition(List<TableWithPartitions> baseChangedPartitionNames,
Map<Table, Map<String, Range<PartitionKey>>> refBaseTableRangePartitionMap,
Map<Table, Map<String, PCell>> refBaseTableRangePartitionMap,
Set<String> mvPartitions,
Map<String, Range<PartitionKey>> mvPartitionNameToRangeMap) {
Map<String, ? extends PCell> mvPartitionNameToRangeMap) {
List<PRangeCell> mvSortedPartitionRanges =
TableWithPartitions.getSortedPartitionRanges(mvPartitionNameToRangeMap, mvPartitions);
for (TableWithPartitions baseTableWithPartition : baseChangedPartitionNames) {
Map<String, Range<PartitionKey>> baseRangePartitionMap =
Map<String, PCell> baseRangePartitionMap =
refBaseTableRangePartitionMap.get(baseTableWithPartition.getTable());
List<PRangeCell> baseSortedPartitionRanges =
baseTableWithPartition.getSortedPartitionRanges(baseRangePartitionMap);
Expand Down Expand Up @@ -2134,4 +2134,6 @@ public List<Column> getBaseSchema() {
}
return schema;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MvBaseTableUpdateInfo {
// The partition names of base table that have been updated
private final Set<String> toRefreshPartitionNames = Sets.newHashSet();
// The mapping of partition name to partition range
private final Map<String, PCell> nameToPartKeys = Maps.newHashMap();
private final Map<String, PCell> partitonToCells = Maps.newHashMap();

// If the base table is a mv, needs to record the mapping of mv partition name to partition range
private final Map<String, PCell> mvPartitionNameToCellMap = Maps.newHashMap();
Expand All @@ -52,8 +52,8 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

public Map<String, PCell> getNameToPartKeys() {
return nameToPartKeys;
public Map<String, PCell> getPartitonToCells() {
return partitonToCells;
}

/**
Expand All @@ -71,22 +71,22 @@ public void addToRefreshPartitionNames(Set<String> toRefreshPartitionNames) {
*/
public void addRangePartitionKeys(String partitionName,
Range<PartitionKey> rangePartitionKey) {
nameToPartKeys.put(partitionName, new PRangeCell(rangePartitionKey));
partitonToCells.put(partitionName, new PRangeCell(rangePartitionKey));
}

/**
* Add partition name that needs to be refreshed and its associated list partition key
*/
public void addListPartitionKeys(Map<String, PListCell> listPartitionKeys) {
nameToPartKeys.putAll(listPartitionKeys);
public void addPartitionCells(Map<String, PCell> cells) {
partitonToCells.putAll(cells);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
Map<String, Range<PartitionKey>> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PRangeCell rangeCell = (PRangeCell) e.getValue();
result.put(e.getKey(), rangeCell.getRange());
Expand All @@ -99,7 +99,7 @@ public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
*/
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PListCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
Expand All @@ -111,7 +111,7 @@ public Map<String, PListCell> getPartitionNameWithLists() {
public String toString() {
return "BaseTableRefreshInfo{" +
", toRefreshPartitionNames=" + toRefreshPartitionNames +
", nameToPartKeys=" + nameToPartKeys +
", nameToPartKeys=" + partitonToCells +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.starrocks.catalog.mv.MVTimelinessRangePartitionArbiter;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.UnsupportedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,8 +34,8 @@
import java.util.Optional;
import java.util.Set;

import static com.starrocks.connector.PartitionUtil.getMVPartitionNameWithList;
import static com.starrocks.connector.PartitionUtil.getMVPartitionNameWithRange;
import static com.starrocks.connector.PartitionUtil.getMVPartitionToCells;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
Expand Down Expand Up @@ -198,9 +198,9 @@ public static MvBaseTableUpdateInfo getMvBaseTableUpdateInfo(MaterializedView mv
List<Column> refPartitionColumns = refBaseTablePartitionColumns.get(baseTable);
PartitionInfo mvPartitionInfo = mv.getPartitionInfo();
if (mvPartitionInfo.isListPartition()) {
Map<String, PListCell> mvPartitionNameWithList = getMVPartitionNameWithList(baseTable,
Map<String, PCell> mvPartitionNameWithList = getMVPartitionToCells(baseTable,
refPartitionColumns, updatedPartitionNamesList);
baseTableUpdateInfo.addListPartitionKeys(mvPartitionNameWithList);
baseTableUpdateInfo.addPartitionCells(mvPartitionNameWithList);
baseTableUpdateInfo.addToRefreshPartitionNames(mvPartitionNameWithList.keySet());
} else if (mvPartitionInfo.isRangePartition()) {
Preconditions.checkArgument(refPartitionColumns.size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public TableProperty.QueryRewriteConsistencyMode getQueryRewriteConsistencyMode(
return queryRewriteConsistencyMode;
}

public void addMVPartitionNameToCellMap(Map<String, PCell> m) {
public void addMVPartitionNameToCellMap(Map<String, ? extends PCell> m) {
mvPartitionNameToCellMap.putAll(m);
}

Expand Down
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@
import com.starrocks.sql.ast.IndexDef.IndexType;
import com.starrocks.sql.ast.PartitionValue;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.common.SyncPartitionUtils;
import com.starrocks.sql.optimizer.rule.mv.MVUtils;
import com.starrocks.sql.optimizer.statistics.IDictManager;
Expand Down Expand Up @@ -3677,4 +3679,25 @@ public long getLastCollectProfileTime() {
public void updateLastCollectProfileTime() {
this.lastCollectProfileTime = System.currentTimeMillis();
}

public Map<String, PCell> getPartitionCells() {
PartitionInfo partitionInfo = this.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return null;
}
if (partitionInfo.isRangePartition()) {
Map<String, Range<PartitionKey>> rangeMap = getRangePartitionMap();
if (rangeMap == null) {
return null;
}
return rangeMap.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> new PRangeCell(x.getValue())));
} else if (partitionInfo.isListPartition()) {
Map<String, PListCell> listMap = getListPartitionItems();
if (listMap == null) {
return null;
}
return listMap.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.catalog.BaseTableInfo;
Expand All @@ -22,22 +23,28 @@
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.common.ListPartitionDiffer;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PartitionDiff;
import com.starrocks.sql.common.PartitionDiffResult;
import com.starrocks.sql.common.RangePartitionDiffer;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
* {@link MVTimelinessArbiter} is the base class of all materialized view timeliness arbiters which is used to determine the mv's
Expand Down Expand Up @@ -77,10 +84,11 @@ public MVTimelinessArbiter(MaterializedView mv, boolean isQueryRewrite) {
* @return : partitioned materialized view's all need updated partition names.
*/
public MvUpdateInfo getMVTimelinessUpdateInfo(TableProperty.QueryRewriteConsistencyMode mode) throws AnalysisException {
if (mode == TableProperty.QueryRewriteConsistencyMode.LOOSE) {
return getMVTimelinessUpdateInfoInLoose();
} else {
return getMVTimelinessUpdateInfoInChecked();
switch (mode) {
case LOOSE:
return getMVTimelinessUpdateInfoInLoose();
default:
return getMVTimelinessUpdateInfoInChecked();
}
}

Expand All @@ -90,12 +98,6 @@ public MvUpdateInfo getMVTimelinessUpdateInfo(TableProperty.QueryRewriteConsiste
*/
protected abstract MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisException;

/**
* In Loose mode, do not need to check mv partition's data is consistent with base table's partition's data.
* Only need to check the mv partition existence.
*/
protected abstract MvUpdateInfo getMVTimelinessUpdateInfoInLoose();

/**
* Determine the refresh type of the materialized view.
* @param refBaseTablePartitionCols ref base table partition infos
Expand Down Expand Up @@ -175,39 +177,24 @@ protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table
return baseChangedPartitionNames;
}

protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdateInfo) {
Map<Table, List<Column>> refBaseTableAndColumns = mv.getRefBaseTablePartitionColumns();
// collect & update mv's to refresh partitions based on base table's partition changes
collectBaseTableUpdatePartitionNames(refBaseTableAndColumns, mvUpdateInfo);
Set<Table> refBaseTables = mv.getRefBaseTablePartitionColumns().keySet();
MaterializedView.AsyncRefreshContext context = mv.getRefreshScheme().getAsyncRefreshContext();
for (Table table : refBaseTables) {
Map<String, MaterializedView.BasePartitionInfo> mvBaseTableVisibleVersionMap =
context.getBaseTableVisibleVersionMap()
.computeIfAbsent(table.getId(), k -> Maps.newHashMap());
for (String partitionName : mvBaseTableVisibleVersionMap.keySet()) {
if (mvUpdateInfo.getBaseTableToRefreshPartitionNames(table) != null) {
// in loose mode, ignore partition that both exists in baseTable and mv
mvUpdateInfo.getBaseTableToRefreshPartitionNames(table).remove(partitionName);
}
}
}
}

/**
* If base table is materialized view, add partition name to cell mapping into base table partition mapping;
* otherwise base table(mv) may lose partition names of the real base table changed partitions.
* @param baseTableUpdateInfoMap base table update info from MvTimelinessInfo
* @return the base table to its changed partition and cell map if it's mv, empty else
*/
protected void collectExtraBaseTableChangedPartitions(
Map<Table, MvBaseTableUpdateInfo> baseTableUpdateInfoMap,
Consumer<Map.Entry<Table, Map<String, PCell>>> consumer) {
protected void collectExtraBaseTableChangedPartitions(Map<Table, MvBaseTableUpdateInfo> baseTableUpdateInfoMap,
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap) {
Map<Table, Map<String, PCell>> extraChangedPartitions = baseTableUpdateInfoMap.entrySet().stream()
.filter(e -> !e.getValue().getMvPartitionNameToCellMap().isEmpty())
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getMvPartitionNameToCellMap()));
for (Map.Entry<Table, Map<String, PCell>> entry : extraChangedPartitions.entrySet()) {
consumer.accept(entry);
Table baseTable = entry.getKey();
Preconditions.checkState(basePartitionNameToRangeMap.containsKey(baseTable));
Map<String, PCell> refBaseTablePartitionRangeMap = (Map<String, PCell>) basePartitionNameToRangeMap.get(baseTable);
Map<String, PCell> basePartitionNameToRanges = entry.getValue();
basePartitionNameToRanges.entrySet().forEach(e ->
refBaseTablePartitionRangeMap.put(e.getKey(), e.getValue()));
}
}

Expand All @@ -224,4 +211,88 @@ protected void addEmptyPartitionsToRefresh(MvUpdateInfo mvUpdateInfo) {
}
});
}

public Map<Table, Map<String, PCell>> syncBaseTablePartitions(MaterializedView mv) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return null;
}
if (partitionInfo.isRangePartition()) {
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap =
RangePartitionDiffer.syncBaseTablePartitionInfos(mv);
if (CollectionUtils.sizeIsEmpty(basePartitionNameToRangeMap)) {
return null;
}
return basePartitionNameToRangeMap.keySet().stream()
.map(baseTable -> Maps.immutableEntry(baseTable, basePartitionNameToRangeMap.get(baseTable)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else if (partitionInfo.isListPartition()) {
Map<Table, Map<String, PCell>> listRefBaseTablePartitionMap = Maps.newHashMap();
if (!ListPartitionDiffer.syncBaseTablePartitionInfos(mv, listRefBaseTablePartitionMap)) {
logMVPrepare(mv, "Sync base table partition infos failed");
return null;
}
return listRefBaseTablePartitionMap;
} else {
return null;
}
}

public PartitionDiff getMVChangedPartitionDiff(MaterializedView mv,
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
try {
if (partitionInfo.isUnPartitioned()) {
return null;
} else if (partitionInfo.isRangePartition()) {
PartitionDiffResult differ = RangePartitionDiffer.computeRangePartitionDiff(mv, null,
basePartitionNameToRangeMap, isQueryRewrite);
if (differ == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return null;
}
return differ.diff;
} else if (partitionInfo.isListPartition()) {
Map<String, PCell> allBasePartitionItems =
ListPartitionDiffer.collectBasePartitionCells(basePartitionNameToRangeMap);
PartitionDiffResult result = ListPartitionDiffer.computeListPartitionDiff(mv,
basePartitionNameToRangeMap, allBasePartitionItems, isQueryRewrite);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return null;
}
return result.diff;
}
} catch (Exception e) {
LOG.warn("Materialized view compute partition difference with base table failed.", e);
}
return null;
}

/**
* In Loose mode, do not need to check mv partition's data is consistent with base table's partition's data.
* Only need to check the mv partition existence.
*/
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
Map<Table, Map<String, PCell>> refBaseTablePartitionMap = syncBaseTablePartitions(mv);
if (refBaseTablePartitionMap == null) {
logMVPrepare(mv, "Sync base table partition infos failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

PartitionDiff diff = getMVChangedPartitionDiff(mv, refBaseTablePartitionMap);
if (diff == null) {
return null;
}
Map<String, PCell> adds = diff.getAdds();
if (CollectionUtils.sizeIsEmpty(adds)) {
return mvUpdateInfo;
}

adds.keySet().stream().forEach(mvPartitionName -> mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName));
addEmptyPartitionsToRefresh(mvUpdateInfo);
return mvUpdateInfo;
}
}
Loading

0 comments on commit d952f0b

Please sign in to comment.