Skip to content

Commit

Permalink
Refactor mv partition compensate
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 26, 2024
1 parent b17f426 commit 47f9187
Show file tree
Hide file tree
Showing 24 changed files with 828 additions and 751 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MvBaseTableUpdateInfo {
// The partition names of base table that have been updated
private final Set<String> toRefreshPartitionNames = Sets.newHashSet();
// The mapping of partition name to partition range
private final Map<String, PCell> partitonToCells = Maps.newHashMap();
private final Map<String, PCell> partitionToCells = Maps.newHashMap();

// If the base table is a mv, needs to record the mapping of mv partition name to partition range
private final Map<String, PCell> mvPartitionNameToCellMap = Maps.newHashMap();
Expand All @@ -52,8 +52,8 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

public Map<String, PCell> getPartitonToCells() {
return partitonToCells;
public Map<String, PCell> getPartitionToCells() {
return partitionToCells;
}

/**
Expand All @@ -71,22 +71,22 @@ public void addToRefreshPartitionNames(Set<String> toRefreshPartitionNames) {
*/
public void addRangePartitionKeys(String partitionName,
Range<PartitionKey> rangePartitionKey) {
partitonToCells.put(partitionName, new PRangeCell(rangePartitionKey));
partitionToCells.put(partitionName, new PRangeCell(rangePartitionKey));
}

/**
* Add partition name that needs to be refreshed and its associated list partition key
*/
public void addPartitionCells(Map<String, PCell> cells) {
partitonToCells.putAll(cells);
partitionToCells.putAll(cells);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
Map<String, Range<PartitionKey>> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
for (Map.Entry<String, PCell> e : partitionToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PRangeCell rangeCell = (PRangeCell) e.getValue();
result.put(e.getKey(), rangeCell.getRange());
Expand All @@ -99,7 +99,7 @@ public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
*/
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
for (Map.Entry<String, PCell> e : partitionToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PListCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
Expand All @@ -111,7 +111,7 @@ public Map<String, PListCell> getPartitionNameWithLists() {
public String toString() {
return "BaseTableRefreshInfo{" +
", toRefreshPartitionNames=" + toRefreshPartitionNames +
", nameToPartKeys=" + partitonToCells +
", nameToPartKeys=" + partitionToCells +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static boolean needsToRefreshTable(MaterializedView mv, Table table, bool
public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolean isQueryRewrite) {
// Skip check for sync materialized view.
if (mv.getRefreshScheme().isSync()) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}

// check mv's query rewrite consistency mode property only in query rewrite.
Expand All @@ -72,9 +72,9 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea
if (isQueryRewrite) {
switch (mvConsistencyRewriteMode) {
case DISABLE:
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
case NOCHECK:
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
case LOOSE:
case CHECKED:
default:
Expand All @@ -89,7 +89,7 @@ public static MvUpdateInfo getMVTimelinessUpdateInfo(MaterializedView mv, boolea
return timelinessArbiter.getMVTimelinessUpdateInfo(mvConsistencyRewriteMode);
} catch (AnalysisException e) {
logMVPrepare(mv, "Failed to get mv timeliness info: {}", DebugUtil.getStackTrace(e));
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.UNKNOWN);
return MvUpdateInfo.unknown(mv);
}
}

Expand Down
75 changes: 54 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/catalog/MvUpdateInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* Store the update information of MV used for mv rewrite(mv refresh can use it later).
*/
public class MvUpdateInfo {
private final MaterializedView mv;
// The type of mv refresh later
private final MvToRefreshType mvToRefreshType;
// The partition names of mv to refresh
Expand All @@ -55,16 +56,36 @@ public enum MvToRefreshType {
UNKNOWN // unknown type
}

public MvUpdateInfo(MvToRefreshType mvToRefreshType) {
public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType) {
this.mv = mv;
this.mvToRefreshType = mvToRefreshType;
this.queryRewriteConsistencyMode = TableProperty.QueryRewriteConsistencyMode.CHECKED;
}

public MvUpdateInfo(MvToRefreshType mvToRefreshType, TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
public MvUpdateInfo(MaterializedView mv, MvToRefreshType mvToRefreshType,
TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
this.mv = mv;
this.mvToRefreshType = mvToRefreshType;
this.queryRewriteConsistencyMode = queryRewriteConsistencyMode;
}

public static MvUpdateInfo unknown(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.UNKNOWN);
}

public static MvUpdateInfo noRefresh(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.NO_REFRESH);
}

public static MvUpdateInfo fullRefresh(MaterializedView mv) {
return new MvUpdateInfo(mv, MvToRefreshType.FULL);
}

public static MvUpdateInfo partialRefresh(MaterializedView mv,
TableProperty.QueryRewriteConsistencyMode queryRewriteConsistencyMode) {
return new MvUpdateInfo(mv, MvToRefreshType.PARTIAL, queryRewriteConsistencyMode);
}

public MvToRefreshType getMvToRefreshType() {
return mvToRefreshType;
}
Expand Down Expand Up @@ -109,28 +130,40 @@ public Map<String, PCell> getMvPartitionNameToCellMap() {
return mvPartitionNameToCellMap;
}

public MaterializedView getMv() {
return this.mv;
}

@Override
public String toString() {
int maxLength = Config.max_mv_task_run_meta_message_values_length;
return "MvUpdateInfo{" +
"refreshType=" + mvToRefreshType +
", mvToRefreshPartitionNames=" + shrinkToSize(mvToRefreshPartitionNames, maxLength) +
", basePartToMvPartNames=" + shrinkToSize(basePartToMvPartNames, maxLength) +
", mvPartToBasePartNames=" + shrinkToSize(mvPartToBasePartNames, maxLength) +
'}';
}

/**
* @return the detail string of the mv update info
*/
public String toDetailString() {
return "MvUpdateInfo{" +
"refreshType=" + mvToRefreshType +
", mvToRefreshPartitionNames=" + mvToRefreshPartitionNames +
", baseTableUpdateInfos=" + baseTableUpdateInfos +
", basePartToMvPartNames=" + basePartToMvPartNames +
", mvPartToBasePartNames=" + mvPartToBasePartNames +
'}';
StringBuilder sb = new StringBuilder();
sb.append("refreshType=").append(mvToRefreshType);
if (!CollectionUtils.sizeIsEmpty(mvToRefreshPartitionNames)) {
sb.append(", mvToRefreshPartitionNames=").append(shrinkToSize(mvToRefreshPartitionNames, maxLength));
}
if (!CollectionUtils.sizeIsEmpty(basePartToMvPartNames)) {
sb.append(", basePartToMvPartNames=");
for (Map.Entry<Table, Map<String, Set<String>>> entry : basePartToMvPartNames.entrySet()) {
sb.append("[").append(entry.getKey().getName()).append(":");
for (Map.Entry<String, Set<String>> entry1 : shrinkToSize(entry.getValue(), maxLength).entrySet()) {
sb.append(" ").append(entry1.getKey()).append(":").append(shrinkToSize(entry1.getValue(), maxLength));
}
sb.append("]");
}
}
if (!CollectionUtils.sizeIsEmpty(mvPartToBasePartNames)) {
sb.append(", mvPartToBasePartNames=");
for (Map.Entry<String, Map<Table, Set<String>>> entry : mvPartToBasePartNames.entrySet()) {
sb.append("[").append(entry.getKey()).append(":");
for (Map.Entry<Table, Set<String>> entry1 : shrinkToSize(entry.getValue(), maxLength).entrySet()) {
sb.append(" ").append(entry1.getKey().getName()).append(":")
.append(shrinkToSize(entry1.getValue(), maxLength));
}
sb.append("]");
}
}
return sb.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,18 @@ public PartitionDiff getChangedPartitionDiff(MaterializedView mv,
* Only need to check the mv partition existence.
*/
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
Map<Table, Map<String, PCell>> refBaseTablePartitionMap = syncBaseTablePartitions(mv);
if (refBaseTablePartitionMap == null) {
logMVPrepare(mv, "Sync base table partition infos failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}

PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap);
if (diff == null) {
return null;
}
Map<String, PCell> adds = diff.getAdds();
MvUpdateInfo mvUpdateInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.LOOSE);
if (!CollectionUtils.sizeIsEmpty(adds)) {
adds.keySet().stream().forEach(mvPartitionName ->
mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName));
Expand Down Expand Up @@ -318,24 +317,12 @@ public Set<String> getMVRetentionPartitionNames(MaterializedView mv,

/**
* TODO: Optimize performance in loos/force_mv mode
* TODO: in loose mode, ignore partition that both exists in baseTable and mv
*/
protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdateInfo) {
Map<Table, List<Column>> refBaseTableAndColumns = mv.getRefBaseTablePartitionColumns();
// collect & update mv's to refresh partitions based on base table's partition changes
collectBaseTableUpdatePartitionNames(refBaseTableAndColumns, mvUpdateInfo);
Set<Table> refBaseTables = mv.getRefBaseTablePartitionColumns().keySet();
MaterializedView.AsyncRefreshContext context = mv.getRefreshScheme().getAsyncRefreshContext();
for (Table table : refBaseTables) {
Map<String, MaterializedView.BasePartitionInfo> mvBaseTableVisibleVersionMap =
context.getBaseTableVisibleVersionMap()
.computeIfAbsent(table.getId(), k -> Maps.newHashMap());
for (String partitionName : mvBaseTableVisibleVersionMap.keySet()) {
if (mvUpdateInfo.getBaseTableToRefreshPartitionNames(table) != null) {
// in loose mode, ignore partition that both exists in baseTable and mv
mvUpdateInfo.getBaseTableToRefreshPartitionNames(table).remove(partitionName);
}
}
}
}

/**
Expand All @@ -347,15 +334,12 @@ protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdate
public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() {
String retentionCondition = mv.getTableProperty().getPartitionRetentionCondition();
if (Strings.isNullOrEmpty(retentionCondition)) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.FORCE_MV);

Map<Table, Map<String, PCell>> refBaseTablePartitionMap = syncBaseTablePartitions(mv);
if (refBaseTablePartitionMap == null) {
logMVPrepare(mv, "Sync base table partition infos failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}

PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap);
Expand All @@ -366,18 +350,19 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() {
Map<String, PCell> adds = diff.getAdds();
// no partition added
if (CollectionUtils.sizeIsEmpty(adds)) {
return mvUpdateInfo;
return MvUpdateInfo.noRefresh(mv);
}
Set<String> retentionPartitionNames = getMVRetentionPartitionNames(mv, retentionCondition, adds);
if (retentionPartitionNames == null) {
logMVPrepare(mv, "Get expired partitions by retention condition failed");
return null;
}
MvUpdateInfo mvUpdateInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.FORCE_MV);
adds.keySet().stream()
.filter(mvPartitionName -> !retentionPartitionNames.contains(mvPartitionName))
.forEach(mvPartitionName -> mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName));
if (CollectionUtils.isEmpty(mvUpdateInfo.getMvToRefreshPartitionNames())) {
return mvUpdateInfo;
return MvUpdateInfo.noRefresh(mv);
}
collectBaseTableUpdatePartitionNamesInLoose(mvUpdateInfo);
// collect base table's partition infos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.common.ListPartitionDiffer;
import com.starrocks.sql.common.PCell;
Expand Down Expand Up @@ -59,19 +60,19 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio
boolean isRefreshBasedOnNonRefTables = needsRefreshOnNonRefBaseTables(refBaseTablePartitionColumns);
logMVPrepare(mv, "MV refresh based on non-ref base table:{}", isRefreshBasedOnNonRefTables);
if (isRefreshBasedOnNonRefTables) {
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}

// update mv's to refresh partitions based on base table's partition changes
MvUpdateInfo mvTimelinessInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL);
MvUpdateInfo mvTimelinessInfo = MvUpdateInfo.partialRefresh(mv, TableProperty.QueryRewriteConsistencyMode.CHECKED);
Map<Table, Set<String>> baseChangedPartitionNames = collectBaseTableUpdatePartitionNames(refBaseTablePartitionColumns,
mvTimelinessInfo);

// collect base table's partition infos
Map<Table, Map<String, PCell>> refBaseTablePartitionMap = syncBaseTablePartitions(mv);
if (refBaseTablePartitionMap == null) {
logMVPrepare(mv, "Sync base table partition infos failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}
// If base table is materialized view, add partition name to cell mapping into base table partition mapping,
// otherwise base table(mv) may lose partition names of the real base table changed partitions.
Expand All @@ -80,7 +81,7 @@ public MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisExceptio
PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap);
if (diff == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}

// update into mv's to refresh partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() {
// skip check external table if the external does not support rewrite.
if (!table.isNativeTableOrMaterializedView() && isDisableExternalForceQueryRewrite) {
logMVPrepare(mv, "Non-partitioned contains external table, and it's disabled query rewrite");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}

// once mv's base table has updated, refresh the materialized view totally.
Expand All @@ -73,25 +73,25 @@ protected MvUpdateInfo getMVTimelinessUpdateInfoInChecked() {
if (mvBaseTableUpdateInfo != null &&
CollectionUtils.isNotEmpty(mvBaseTableUpdateInfo.getToRefreshPartitionNames())) {
logMVPrepare(mv, "Non-partitioned base table has updated, need refresh totally.");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}
}
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
List<Partition> partitions = Lists.newArrayList(mv.getPartitions());
if (partitions.size() > 0 && partitions.get(0).getDefaultPhysicalPartition().getVisibleVersion() <= 1) {
// the mv is newly created, can not use it to rewrite query.
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
return MvUpdateInfo.fullRefresh(mv);
}
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}

@Override
public MvUpdateInfo getMVTimelinessUpdateInfoInForceMVMode() {
// for force mv mode, always no need to refresh for non-partitioned mv.
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.NO_REFRESH);
return MvUpdateInfo.noRefresh(mv);
}
}
Loading

0 comments on commit 47f9187

Please sign in to comment.