Skip to content

Commit

Permalink
Fix partition_retention_condition for iceberg with partition transforms
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Dec 16, 2024
1 parent 884981f commit 740d10d
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
Expand Down Expand Up @@ -43,6 +44,7 @@
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.analyzer.SetStmtAnalyzer;
import com.starrocks.sql.ast.AlterMaterializedViewStatusClause;
Expand Down Expand Up @@ -106,8 +108,11 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
String ttlRetentionCondition = null;
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION)) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(materializedView.getDbId());
TableName mvTableName = new TableName(db.getFullName(), materializedView.getName());
Map<Expr, Expr> mvPartitionByExprToAdjustMap =
MaterializedViewAnalyzer.getMVPartitionByExprToAdjustMap(mvTableName, materializedView);
ttlRetentionCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(db,
materializedView, properties, true);
materializedView, properties, true, mvPartitionByExprToAdjustMap);
}
int partitionRefreshNumber = INVALID;
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_REFRESH_NUMBER)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2110,30 +2110,4 @@ public synchronized ParseNode getDefineQueryParseNode() {
}
return this.defineQueryParseNode;
}

/**
* For MV, its schema has relations with defined query's schema.`getBaseSchema` should not return the generated columns
* since they are inner columns and not visible to users.
* For now, the generated columns of mv can be created when mv's partition expression is not slot ref and its partition type
* is LIST.
*/
@Override
public List<Column> getBaseSchema() {
if (!hasGeneratedColumn()) {
return getSchemaByIndexId(baseIndexId);
}

List<Column> schema = Lists.newArrayList(getSchemaByIndexId(baseIndexId));
while (schema.size() > 0) {
// check last column is whether is a generated column or not
if (schema.get(schema.size() - 1).isGeneratedColumn()) {
schema.remove(schema.size() - 1);
} else {
break;
}
}
return schema;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ public static int analyzePartitionLiveNumber(Map<String, String> properties, boo
public static String analyzePartitionRetentionCondition(Database db,
OlapTable olapTable,
Map<String, String> properties,
boolean removeProperties) {
boolean removeProperties,
Map<Expr, Expr> exprToAdjustMap) {
String partitionRetentionCondition = "";
if (properties != null && properties.containsKey(PROPERTIES_PARTITION_RETENTION_CONDITION)) {
partitionRetentionCondition = properties.get(PROPERTIES_PARTITION_RETENTION_CONDITION);
Expand All @@ -446,8 +447,9 @@ public static String analyzePartitionRetentionCondition(Database db,
// validate retention condition
TableName tableName = new TableName(db.getFullName(), olapTable.getName());
ConnectContext connectContext = ConnectContext.get() == null ? new ConnectContext(null) : ConnectContext.get();

try {
PartitionSelector.getPartitionIdsByExpr(connectContext, tableName, olapTable, whereExpr, false);
PartitionSelector.getPartitionIdsByExpr(connectContext, tableName, olapTable, whereExpr, false, exprToAdjustMap);
} catch (Exception e) {
throw new SemanticException("Failed to validate retention condition: " + e.getMessage());
}
Expand Down Expand Up @@ -1410,7 +1412,8 @@ public static PeriodDuration analyzeStorageCoolDownTTL(Map<String, String> prope
public static void analyzeMVProperties(Database db,
MaterializedView materializedView,
Map<String, String> properties,
boolean isNonPartitioned) throws DdlException {
boolean isNonPartitioned,
Map<Expr, Expr> exprAdjustedMap) throws DdlException {
try {
// replicated storage
materializedView.setEnableReplicatedStorage(
Expand Down Expand Up @@ -1474,7 +1477,7 @@ public static void analyzeMVProperties(Database db,
+ " is only supported by partitioned materialized-view");
}
String ttlRetentionCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(db, materializedView,
properties, true);
properties, true, exprAdjustedMap);
materializedView.getTableProperty().getProperties()
.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, ttlRetentionCondition);
materializedView.getTableProperty().setPartitionRetentionCondition(ttlRetentionCondition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ private InsertStmt generateInsertAst(Set<String> materializedViewPartitions, Mat
// may be different from the defined query's output.
// so set materialized view's defined outputs as target columns.
List<Integer> queryOutputIndexes = materializedView.getQueryOutputIndices();
List<Column> baseSchema = materializedView.getBaseSchema();
List<Column> baseSchema = materializedView.getBaseSchemaWithoutGeneratedColumn();
if (queryOutputIndexes != null && baseSchema.size() == queryOutputIndexes.size()) {
List<String> targetColumnNames = queryOutputIndexes.stream()
.map(baseSchema::get)
Expand Down
18 changes: 10 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2981,7 +2981,8 @@ public void createMaterializedView(CreateMaterializedViewStatement stmt)

boolean isNonPartitioned = partitionInfo.isUnPartitioned();
DataProperty dataProperty = PropertyAnalyzer.analyzeMVDataProperty(materializedView, properties);
PropertyAnalyzer.analyzeMVProperties(db, materializedView, properties, isNonPartitioned);
PropertyAnalyzer.analyzeMVProperties(db, materializedView, properties, isNonPartitioned,
stmt.getPartitionByExprToAdjustExprMap());
try {
Set<Long> tabletIdSet = new HashSet<>();
// process single partition info
Expand Down Expand Up @@ -3063,12 +3064,8 @@ public static PartitionInfo buildPartitionInfo(CreateMaterializedViewStatement s
for (int i = 0; i < partitionByExprs.size(); i++) {
Expr partitionByExpr = partitionByExprs.get(i);
Column mvPartitionColumn = mvPartitionColumns.get(i);
if (!(partitionByExpr instanceof SlotRef || MvUtils.isFuncCallExpr(partitionByExpr,
FunctionSet.DATE_TRUNC))) {
throw new DdlException("List partition only support partition by slot ref column or date_trunc:"
+ partitionByExpr.toSql());
}
if (!(partitionByExpr instanceof SlotRef)) {
// generate column can be any partition expression.
if (generatedPartitionCols.containsKey(i)) {
Column generatedCol = generatedPartitionCols.get(i);
if (generatedCol == null) {
throw new DdlException("Partition expression for list must be a generated column: "
Expand All @@ -3077,6 +3074,11 @@ public static PartitionInfo buildPartitionInfo(CreateMaterializedViewStatement s
baseSchema.add(generatedCol);
newPartitionColumns.add(generatedCol);
} else {
if (!(partitionByExpr instanceof SlotRef || MvUtils.isFuncCallExpr(partitionByExpr,
FunctionSet.DATE_TRUNC))) {
throw new DdlException("List partition expression can only be ref-base-table's partition " +
"expression but contains: " + partitionByExpr.toSql());
}
newPartitionColumns.add(mvPartitionColumn);
}
}
Expand Down Expand Up @@ -3684,7 +3686,7 @@ private Map<String, Object> validateToBeModifiedProps(Map<String, String> proper
+ "no need to set partition retention condition.");
}
String partitionRetentionCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(
db, table, properties, true);
db, table, properties, true, null);
if (Strings.isNullOrEmpty(partitionRetentionCondition)) {
throw new DdlException("Invalid partition retention condition");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ public Table createTable(LocalMetastore metastore, Database db, CreateTableStmt

// analyze partition retention condition
if (properties != null && properties.containsKey(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION)) {
String ttlCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(db, table, properties, true);
String ttlCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(db, table, properties,
true, null);
if (ttlCondition == null) {
throw new DdlException("Invalid partition retention condition");
}
Expand Down
Loading

0 comments on commit 740d10d

Please sign in to comment.