diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index 90e2225d17391..0a344665f77d2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -656,7 +656,7 @@ public Map> listMVRefreshedTaskRunStatus(String dbNa .forEach(addResult); // history - taskRunManager.getTaskRunHistory().lookupHistoryByTaskNames(dbName, taskNames) + taskRunManager.getTaskRunHistory().lookupLastJobOfTasks(dbName, taskNames) .stream() .filter(taskRunFilter) .forEach(addResult); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java index 455eaae21dacc..5b690cea95d3a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java @@ -104,6 +104,20 @@ public List lookupHistoryByTaskNames(String dbName, Set t return result; } + /** + * Return the list of task runs belong to the LAST JOB: + * Each task run has a `startTaskRunId` as JobId, a job may have multiple task runs. + */ + public List lookupLastJobOfTasks(String dbName, Set taskNames) { + List result = getInMemoryHistory().stream() + .filter(x -> x.matchByTaskName(dbName, taskNames)) + .collect(Collectors.toList()); + if (isEnableArchiveHistory()) { + result.addAll(historyTable.lookupLastJobOfTasks(dbName, taskNames)); + } + return result; + } + public List lookupHistory(TGetTasksParams params) { List result = getInMemoryHistory().stream() .filter(x -> x.match(params)) diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java index 2836030ce512c..9444bf0766165 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistoryTable.java @@ -32,7 +32,10 @@ import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.util.Strings; +import org.apache.velocity.VelocityContext; +import org.apache.velocity.app.VelocityEngine; +import java.io.StringWriter; import java.text.MessageFormat; import java.time.ZoneId; import java.util.Collections; @@ -92,6 +95,13 @@ public class TaskRunHistoryTable { private static final String LOOKUP = "SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE "; + private static final VelocityEngine DEFAULT_VELOCITY_ENGINE; + + static { + DEFAULT_VELOCITY_ENGINE = new VelocityEngine(); + DEFAULT_VELOCITY_ENGINE.setProperty(VelocityEngine.RUNTIME_LOG_REFERENCE_LOG_INVALID, false); + } + private static final TableKeeper KEEPER = new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE, () -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24)); @@ -226,4 +236,48 @@ public List lookupByTaskNames(String dbName, Set taskName Collections.sort(result, TaskRunStatus.COMPARATOR_BY_CREATE_TIME_DESC); return result; } + + public List lookupLastJobOfTasks(String dbName, Set taskNames) { + final String template = + "WITH MaxStartRunID AS (" + + " SELECT " + + " task_name, " + + " cast(history_content_json->'startTaskRunId' as string) start_run_id" + + " FROM $tableName " + + " WHERE (task_name, create_time) IN (" + + " SELECT task_name, MAX(create_time)" + + " FROM $tableName" + + " WHERE $taskFilter" + + " GROUP BY task_name" + + " )" + + " )" + + " SELECT t.history_content_json" + + " FROM $tableName t" + + " JOIN MaxStartRunID msr" + + " ON t.task_name = msr.task_name" + + " AND cast(t.history_content_json->'startTaskRunId' as string) = msr.start_run_id" + + " ORDER BY t.create_time DESC"; + + List predicates = Lists.newArrayList("TRUE"); + if (StringUtils.isNotEmpty(dbName)) { + predicates.add(" get_json_string(" + CONTENT_COLUMN + ", 'dbName') = " + + Strings.quote(ClusterNamespace.getFullName(dbName))); + } + if (CollectionUtils.isNotEmpty(taskNames)) { + String values = taskNames.stream().sorted().map(Strings::quote).collect(Collectors.joining(",")); + predicates.add(" task_name IN (" + values + ")"); + } + String where = Joiner.on(" AND ").join(predicates); + + VelocityContext context = new VelocityContext(); + context.put("tableName", TABLE_FULL_NAME); + context.put("taskFilter", where); + + StringWriter sw = new StringWriter(); + DEFAULT_VELOCITY_ENGINE.evaluate(context, sw, "", template); + String sql = sw.toString(); + + List batch = RepoExecutor.getInstance().executeDQL(sql); + return TaskRunStatus.fromResultBatch(batch); + } } diff --git a/test/sql/test_materialized_view/R/test_show_materialized_view b/test/sql/test_materialized_view/R/test_show_materialized_view index 68394a2ecc52c..7fc64fba8e807 100644 --- a/test/sql/test_materialized_view/R/test_show_materialized_view +++ b/test/sql/test_materialized_view/R/test_show_materialized_view @@ -80,6 +80,45 @@ AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) A FROM `test_show_materialized_view`.`user_tags` GROUP BY `user_tags`.`user_id`; -- !result -drop database test_show_materialized_view; +refresh materialized view user_tags_mv1 with sync mode; +select + TABLE_NAME, + LAST_REFRESH_STATE, + LAST_REFRESH_ERROR_CODE, + IS_ACTIVE, + INACTIVE_REASON +from information_schema.materialized_views where table_name = 'user_tags_mv1'; +-- result: +user_tags_mv1 SUCCESS 0 true +-- !result +set @last_refresh_time = ( + select max(last_refresh_start_time) + from information_schema.materialized_views where table_name = 'user_tags_mv1' +); -- result: -- !result +refresh materialized view user_tags_mv1 force with sync mode; +select + TABLE_NAME, + LAST_REFRESH_STATE, + LAST_REFRESH_ERROR_CODE, + IS_ACTIVE, + INACTIVE_REASON +from information_schema.materialized_views where table_name = 'user_tags_mv1'; +-- result: +user_tags_mv1 SUCCESS 0 true +-- !result +set @this_refresh_time = ( + select max(last_refresh_start_time) + from information_schema.materialized_views where table_name = 'user_tags_mv1' +); +-- result: +-- !result +select if(@last_refresh_time != @this_refresh_time, + 'refreshed', concat('no refresh after ', @last_refresh_time)); +-- result: +refreshed +-- !result +drop database test_show_materialized_view; +-- result: +-- !result \ No newline at end of file diff --git a/test/sql/test_materialized_view/T/test_show_materialized_view b/test/sql/test_materialized_view/T/test_show_materialized_view index d6d48d83ed856..569a6edce5d45 100644 --- a/test/sql/test_materialized_view/T/test_show_materialized_view +++ b/test/sql/test_materialized_view/T/test_show_materialized_view @@ -5,10 +5,43 @@ use test_show_materialized_view; create table user_tags (time date, user_id int, user_name varchar(20), tag_id int) partition by range (time) (partition p1 values less than MAXVALUE) distributed by hash(time) buckets 3 properties('replication_num' = '1'); create materialized view user_tags_mv1 distributed by hash(user_id) as select user_id, bitmap_union(to_bitmap(tag_id)) from user_tags group by user_id; + show create materialized view user_tags_mv1; show create table user_tags_mv1; alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600"); alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600"); show create materialized view user_tags_mv1; show create table user_tags_mv1; -drop database test_show_materialized_view; + +-- information_schema.materialized_views +refresh materialized view user_tags_mv1 with sync mode; +select + TABLE_NAME, + LAST_REFRESH_STATE, + LAST_REFRESH_ERROR_CODE, + IS_ACTIVE, + INACTIVE_REASON +from information_schema.materialized_views where table_name = 'user_tags_mv1'; +set @last_refresh_time = ( + select max(last_refresh_start_time) + from information_schema.materialized_views where table_name = 'user_tags_mv1' +); + +-- multiple refresh tasks +refresh materialized view user_tags_mv1 force with sync mode; +select + TABLE_NAME, + LAST_REFRESH_STATE, + LAST_REFRESH_ERROR_CODE, + IS_ACTIVE, + INACTIVE_REASON +from information_schema.materialized_views where table_name = 'user_tags_mv1'; +set @this_refresh_time = ( + select max(last_refresh_start_time) + from information_schema.materialized_views where table_name = 'user_tags_mv1' +); +select if(@last_refresh_time != @this_refresh_time, + 'refreshed', concat('no refresh after ', @last_refresh_time)); + + +drop database test_show_materialized_view; \ No newline at end of file