Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] improve performance of show materialized views statement #54374

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ public Map<String, List<TaskRunStatus>> listMVRefreshedTaskRunStatus(String dbNa
.forEach(addResult);

// history
taskRunManager.getTaskRunHistory().lookupHistoryByTaskNames(dbName, taskNames)
taskRunManager.getTaskRunHistory().lookupLastJobOfTasks(dbName, taskNames)
.stream()
.filter(taskRunFilter)
.forEach(addResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ public List<TaskRunStatus> lookupHistoryByTaskNames(String dbName, Set<String> t
return result;
}

/**
* Return the list of task runs belong to the LAST JOB:
* Each task run has a `startTaskRunId` as JobId, a job may have multiple task runs.
*/
public List<TaskRunStatus> lookupLastJobOfTasks(String dbName, Set<String> taskNames) {
List<TaskRunStatus> result = getInMemoryHistory().stream()
.filter(x -> x.matchByTaskName(dbName, taskNames))
.collect(Collectors.toList());
if (isEnableArchiveHistory()) {
result.addAll(historyTable.lookupLastJobOfTasks(dbName, taskNames));
}
return result;
}

public List<TaskRunStatus> lookupHistory(TGetTasksParams params) {
List<TaskRunStatus> result = getInMemoryHistory().stream()
.filter(x -> x.match(params))
murphyatwork marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.Strings;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;

import java.io.StringWriter;
import java.text.MessageFormat;
import java.time.ZoneId;
import java.util.Collections;
Expand Down Expand Up @@ -92,6 +95,13 @@ public class TaskRunHistoryTable {
private static final String LOOKUP =
"SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE ";

private static final VelocityEngine DEFAULT_VELOCITY_ENGINE;

static {
DEFAULT_VELOCITY_ENGINE = new VelocityEngine();
DEFAULT_VELOCITY_ENGINE.setProperty(VelocityEngine.RUNTIME_LOG_REFERENCE_LOG_INVALID, false);
}

private static final TableKeeper KEEPER =
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE,
() -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24));
Expand Down Expand Up @@ -226,4 +236,48 @@ public List<TaskRunStatus> lookupByTaskNames(String dbName, Set<String> taskName
Collections.sort(result, TaskRunStatus.COMPARATOR_BY_CREATE_TIME_DESC);
return result;
}

public List<TaskRunStatus> lookupLastJobOfTasks(String dbName, Set<String> taskNames) {
final String template =
"WITH MaxStartRunID AS (" +
" SELECT " +
" task_name, " +
" cast(history_content_json->'startTaskRunId' as string) start_run_id" +
" FROM $tableName " +
" WHERE (task_name, create_time) IN (" +
" SELECT task_name, MAX(create_time)" +
" FROM $tableName" +
" WHERE $taskFilter" +
" GROUP BY task_name" +
" )" +
" )" +
" SELECT t.history_content_json" +
" FROM $tableName t" +
" JOIN MaxStartRunID msr" +
" ON t.task_name = msr.task_name" +
" AND cast(t.history_content_json->'startTaskRunId' as string) = msr.start_run_id" +
" ORDER BY t.create_time DESC";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this, the max size that history table will output will be task's number, am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems reseaonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


List<String> predicates = Lists.newArrayList("TRUE");
if (StringUtils.isNotEmpty(dbName)) {
predicates.add(" get_json_string(" + CONTENT_COLUMN + ", 'dbName') = "
+ Strings.quote(ClusterNamespace.getFullName(dbName)));
}
if (CollectionUtils.isNotEmpty(taskNames)) {
String values = taskNames.stream().sorted().map(Strings::quote).collect(Collectors.joining(","));
predicates.add(" task_name IN (" + values + ")");
}
String where = Joiner.on(" AND ").join(predicates);

VelocityContext context = new VelocityContext();
context.put("tableName", TABLE_FULL_NAME);
context.put("taskFilter", where);

StringWriter sw = new StringWriter();
DEFAULT_VELOCITY_ENGINE.evaluate(context, sw, "", template);
String sql = sw.toString();

List<TResultBatch> batch = RepoExecutor.getInstance().executeDQL(sql);
return TaskRunStatus.fromResultBatch(batch);
}
}
murphyatwork marked this conversation as resolved.
Show resolved Hide resolved
41 changes: 40 additions & 1 deletion test/sql/test_materialized_view/R/test_show_materialized_view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,45 @@ AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) A
FROM `test_show_materialized_view`.`user_tags`
GROUP BY `user_tags`.`user_id`;
-- !result
drop database test_show_materialized_view;
refresh materialized view user_tags_mv1 with sync mode;
select
TABLE_NAME,
LAST_REFRESH_STATE,
LAST_REFRESH_ERROR_CODE,
IS_ACTIVE,
INACTIVE_REASON
from information_schema.materialized_views where table_name = 'user_tags_mv1';
-- result:
user_tags_mv1 SUCCESS 0 true
-- !result
set @last_refresh_time = (
select max(last_refresh_start_time)
from information_schema.materialized_views where table_name = 'user_tags_mv1'
);
-- result:
-- !result
refresh materialized view user_tags_mv1 force with sync mode;
select
TABLE_NAME,
LAST_REFRESH_STATE,
LAST_REFRESH_ERROR_CODE,
IS_ACTIVE,
INACTIVE_REASON
from information_schema.materialized_views where table_name = 'user_tags_mv1';
-- result:
user_tags_mv1 SUCCESS 0 true
-- !result
set @this_refresh_time = (
select max(last_refresh_start_time)
from information_schema.materialized_views where table_name = 'user_tags_mv1'
);
-- result:
-- !result
select if(@last_refresh_time != @this_refresh_time,
'refreshed', concat('no refresh after ', @last_refresh_time));
-- result:
refreshed
-- !result
drop database test_show_materialized_view;
-- result:
-- !result
35 changes: 34 additions & 1 deletion test/sql/test_materialized_view/T/test_show_materialized_view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,43 @@ use test_show_materialized_view;
create table user_tags (time date, user_id int, user_name varchar(20), tag_id int) partition by range (time) (partition p1 values less than MAXVALUE) distributed by hash(time) buckets 3 properties('replication_num' = '1');
create materialized view user_tags_mv1 distributed by hash(user_id) as select user_id, bitmap_union(to_bitmap(tag_id)) from user_tags group by user_id;


show create materialized view user_tags_mv1;
show create table user_tags_mv1;
alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600");
alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600");
show create materialized view user_tags_mv1;
show create table user_tags_mv1;
drop database test_show_materialized_view;

-- information_schema.materialized_views
refresh materialized view user_tags_mv1 with sync mode;
select
TABLE_NAME,
LAST_REFRESH_STATE,
LAST_REFRESH_ERROR_CODE,
IS_ACTIVE,
INACTIVE_REASON
from information_schema.materialized_views where table_name = 'user_tags_mv1';
set @last_refresh_time = (
select max(last_refresh_start_time)
from information_schema.materialized_views where table_name = 'user_tags_mv1'
);

-- multiple refresh tasks
refresh materialized view user_tags_mv1 force with sync mode;
select
TABLE_NAME,
LAST_REFRESH_STATE,
LAST_REFRESH_ERROR_CODE,
IS_ACTIVE,
INACTIVE_REASON
from information_schema.materialized_views where table_name = 'user_tags_mv1';
set @this_refresh_time = (
select max(last_refresh_start_time)
from information_schema.materialized_views where table_name = 'user_tags_mv1'
);
select if(@last_refresh_time != @this_refresh_time,
'refreshed', concat('no refresh after ', @last_refresh_time));


drop database test_show_materialized_view;
Loading