Skip to content

Commit

Permalink
feat(rapid):self load framework
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Dec 23, 2024
1 parent 1082d19 commit 5a94176
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 67 deletions.
4 changes: 2 additions & 2 deletions include/template_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ void my_free_container_pointers(Container_type &container) {
foo *f; bar *b= static_cast<bar*>(static_cast<void*>(f));
*/
template <typename T>
inline T pointer_cast(void *p) {
constexpr T pointer_cast(void *p) {
return static_cast<T>(p);
}

template <typename T>
inline const T pointer_cast(const void *p) {
constexpr T pointer_cast(const void *p) {
return static_cast<T>(p);
}

Expand Down
13 changes: 13 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,17 @@ The following options may be given as the first argument:
--rapid-rapid-pop-buffer-size-max[=#]
Number of memory used for populating the changes in
innodb to rapid engine..
--rapid-self-load-base-relation-fill-percentage[=#]
Percentage of base memory quota above which the self-load
thread rpdserver and rpdmaster. Default value: 70%.
--rapid-self-load-interval-seconds[=#]
Wake-up interval of the Self-Load thread Default value:
86400s (24h). Note that if the interval is changed while
rapid_self_load_enabled=TRUE, the new value might not be
picked up until the next wakeup of the Self-Load Worker.
Therefore, the recommended order of setting the variables
is: 1. rapid_self_load_interval_seconds=<new value>; 2.
rapid_self_load_enabled=TRUE;
--read-buffer-size=#
Each thread that does a sequential scan allocates a
buffer of this size for each table it scans. If you do
Expand Down Expand Up @@ -1961,6 +1972,8 @@ range-alloc-block-size 4096
range-optimizer-max-mem-size 8388608
rapid-rapid-memory-size-max 8589934592
rapid-rapid-pop-buffer-size-max 134217728
rapid-self-load-base-relation-fill-percentage 0.7
rapid-self-load-interval-seconds 86400
read-buffer-size 131072
read-only FALSE
read-rnd-buffer-size 262144
Expand Down
2 changes: 2 additions & 0 deletions mysql-test/r/variables.result
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,8 @@ Warning 1231 'pseudo_replica_mode' change was ineffective.
SET SESSION pseudo_thread_id = @@session.pseudo_thread_id;
SET SESSION rand_seed1 = @@session.rand_seed1;
SET SESSION rand_seed2 = @@session.rand_seed2;
SET SESSION rapid_self_load_enabled = @@session.rapid_self_load_enabled;
SET SESSION rapid_self_load_skip_quiet_check = @@session.rapid_self_load_skip_quiet_check;
SET SESSION rapid_use_dynamic_offload = @@session.rapid_use_dynamic_offload;
SET SESSION require_row_format = @@session.require_row_format;
SET SESSION resultset_metadata = @@session.resultset_metadata;
Expand Down
7 changes: 7 additions & 0 deletions mysql-test/suite/sys_vars/r/all_vars.result
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ rapid_rapid_memory_size_max
rapid_rapid_memory_size_max
rapid_rapid_pop_buffer_size_max
rapid_rapid_pop_buffer_size_max
rapid_self_load_base_relation_fill_percentage
rapid_self_load_base_relation_fill_percentage
rapid_self_load_enabled
rapid_self_load_interval_seconds
rapid_self_load_interval_seconds
rapid_self_load_skip_quiet_check
rapid_use_dynamic_offload
regexp_stack_limit
regexp_stack_limit
regexp_time_limit
Expand Down
10 changes: 7 additions & 3 deletions sql/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5090,12 +5090,16 @@ int handler::ha_create(const char *name, TABLE *form, HA_CREATE_INFO *info,
/**
* Loads a table into its defined secondary storage engine: public interface.
*
* @param table The table to load into the secondary engine. Its read_set tells
* which columns to load.
* @param[in] table - The table to load into the secondary engine. Its read_set
* tells which columns to load.
* @param[out] skip_metadata_update - should the DD metadata be updated for the
* load of this table
*
* @sa handler::load_table()
*/
int handler::ha_load_table(const TABLE &table) { return load_table(table); }
int handler::ha_load_table(const TABLE &table, bool *skip_metadata_update) {
return load_table(table, skip_metadata_update);
}

/**
* Unloads a table from its defined secondary storage engine: public interface.
Expand Down
26 changes: 19 additions & 7 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -2541,15 +2541,15 @@ const handlerton *SecondaryEngineHandlerton(const THD *thd);

// FIXME: Temporary workaround to enable storage engine plugins to use the
// before_commit hook. Remove after WL#11320 has been completed.
typedef void (*se_before_commit_t)(void *arg);
using se_before_commit_t = void (*)(void *arg);

// FIXME: Temporary workaround to enable storage engine plugins to use the
// after_commit hook. Remove after WL#11320 has been completed.
typedef void (*se_after_commit_t)(void *arg);
using se_after_commit_t = void (*)(void *arg);

// FIXME: Temporary workaround to enable storage engine plugins to use the
// before_rollback hook. Remove after WL#11320 has been completed.
typedef void (*se_before_rollback_t)(void *arg);
using se_before_rollback_t = void (*)(void *arg);

/**
Notify plugins when a SELECT query was executed. The plugins will be notified
Expand All @@ -2568,6 +2568,11 @@ using notify_after_select_t = void (*)(THD *thd, SelectExecutedIn executed_in);
using notify_create_table_t = void (*)(struct HA_CREATE_INFO *create_info,
const char *db, const char *table_name);

/**
* Notify plugins when a table is dropped.
*/
using notify_drop_table_t = void (*)(Table_ref *tab);

/**
Secondary engine hook called after PRIMARY_TENTATIVELY optimization is
complete, and decides if secondary engine optimization will be performed, and
Expand Down Expand Up @@ -2945,6 +2950,8 @@ struct handlerton {
se_before_rollback_t se_before_rollback;

notify_after_select_t notify_after_select;
notify_create_table_t notify_create_table;
notify_drop_table_t notify_drop_table;

/** Page tracking interface */
Page_track_t page_track;
Expand Down Expand Up @@ -3019,8 +3026,10 @@ constexpr const decltype(handlerton::flags) HTON_SUPPORTS_ENGINE_ATTRIBUTE{
1 << 17};

/** Engine supports Generated invisible primary key. */
// clang-format off
constexpr const decltype(
handlerton::flags) HTON_SUPPORTS_GENERATED_INVISIBLE_PK{1 << 18};
// clang-format on

/** Whether the secondary engine supports DDLs. No meaning if the engine is not
* secondary. */
Expand Down Expand Up @@ -4893,7 +4902,7 @@ class handler {
int ha_create(const char *name, TABLE *form, HA_CREATE_INFO *info,
dd::Table *table_def);

int ha_load_table(const TABLE &table);
int ha_load_table(const TABLE &table, bool *skip_metadata_update);

int ha_unload_table(const char *db_name, const char *table_name,
bool error_if_not_loaded);
Expand Down Expand Up @@ -6718,12 +6727,15 @@ class handler {
/**
* Loads a table into its defined secondary storage engine.
*
* @param table Table opened in primary storage engine. Its read_set tells
* which columns to load.
* @param[in] table - Table opened in primary storage engine. Its read_set
* tells which columns to load.
* @param[out] skip_metadata_update - should the DD metadata be updated for
* the load of this table
*
* @return 0 if success, error code otherwise.
*/
virtual int load_table(const TABLE &table [[maybe_unused]]) {
virtual int load_table(const TABLE &table [[maybe_unused]],
bool *skip_metadata_update [[maybe_unused]]) {
/* purecov: begin inspected */
assert(false);
return HA_ERR_WRONG_COMMAND;
Expand Down
132 changes: 84 additions & 48 deletions sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,18 @@ static bool rea_create_base_table(
*binlog_to_trx_cache =
(create_info->db_type->flags & HTON_SUPPORTS_ATOMIC_DDL);

std::tuple hook_params{create_info, db, table_name};
plugin_foreach(
thd, ([](THD *, plugin_ref plugin, void *arg) -> bool {
handlerton *hton = plugin_data<handlerton *>(plugin);
if (hton->notify_create_table != nullptr) {
const auto &[c_i, d, t] = *static_cast<
std::tuple<HA_CREATE_INFO *, const char *, const char *> *>(arg);
hton->notify_create_table(c_i, d, t);
}
return false;
}),
MYSQL_STORAGE_ENGINE_PLUGIN, &hook_params);
return false;
}

Expand Down Expand Up @@ -2672,7 +2684,8 @@ static bool validate_secondary_engine_option(THD *thd,
*
* @return True if error, false otherwise.
*/
static bool secondary_engine_load_table(THD *thd, const TABLE &table) {
static bool secondary_engine_load_table(THD *thd, const TABLE &table,
bool *skip_metadata_update) {
assert(thd->mdl_context.owns_equal_or_stronger_lock(
MDL_key::TABLE, table.s->db.str, table.s->table_name.str, MDL_EXCLUSIVE));
assert(table.s->has_secondary_engine());
Expand Down Expand Up @@ -2716,7 +2729,7 @@ static bool secondary_engine_load_table(THD *thd, const TABLE &table) {

// Load table from primary into secondary engine and add to change
// propagation if that is enabled.
if (handler->ha_load_table(table)){
if (handler->ha_load_table(table, skip_metadata_update)){
my_error(ER_SECONDARY_ENGINE, MYF(0),
"secondary storage engine load table failed");
return true;
Expand Down Expand Up @@ -3136,6 +3149,20 @@ static bool drop_base_table(THD *thd, const Drop_tables_ctx &drop_ctx,
if (mdl_locker.ensure_locked(table->db)) return true;
bool result = dd::drop_table(thd, table->db, table->table_name, *table_def);

if (!result) {
// Notify plugins if drop was successful.
plugin_foreach(
thd,
[](THD *, plugin_ref plugin, void *arg) -> bool {
handlerton *ht = plugin_data<handlerton *>(plugin);
if (ht->notify_drop_table != nullptr) {
ht->notify_drop_table(static_cast<Table_ref *>(arg));
}
return false;
},
MYSQL_STORAGE_ENGINE_PLUGIN, table);
}

if (!atomic) result = trans_intermediate_ddl_commit(thd, result);

/*
Expand Down Expand Up @@ -11640,7 +11667,9 @@ bool Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload(

// Load if SECONDARY_LOAD, unload if SECONDARY_UNLOAD
const bool is_load = m_alter_info->flags & Alter_info::ALTER_SECONDARY_LOAD;

/* If set, secondary_load value will not be updated, and also no bin log
* entries will be recorded. */
bool skip_metadata_update = false;
// Initiate loading into or unloading from secondary engine.
if (is_load) {
DEBUG_SYNC(thd, "before_secondary_engine_load_table");
Expand All @@ -11649,7 +11678,8 @@ bool Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload(
"Simulated failure of secondary_load()"),
true),
false) ||
secondary_engine_load_table(thd, *table_list->table))
secondary_engine_load_table(thd, *table_list->table,
&skip_metadata_update))
return true;
//start population thread if table loaded successfully.
ShannonBase::Populate::Populator::start_change_populate_threads();
Expand Down Expand Up @@ -11677,55 +11707,61 @@ bool Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload(
return true;
});

// Update the secondary_load flag based on the current operation.
if (DBUG_EVALUATE_IF("sim_fail_metadata_update",
(my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure during metadata update"),
true),
false) ||
table_def->options().set("secondary_load", is_load) ||
thd->dd_client()->update(table_def)) {
LogErr(ERROR_LEVEL, ER_SECONDARY_ENGINE_DDL_FAILED, full_tab_name,
(is_load ? "secondary_load" : "secondary_unload"),
"metadata update failed");
return true;
}
if (skip_metadata_update) {
DBUG_PRINT(
"sec_load_unload",
("secondary_load flag update is skipped for table %s", full_tab_name));
} else {
// Update the secondary_load flag based on the current operation.
if (DBUG_EVALUATE_IF("sim_fail_metadata_update",
(my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure during metadata update"),
true),
false) ||
table_def->options().set("secondary_load", is_load) ||
thd->dd_client()->update(table_def)) {
LogErr(ERROR_LEVEL, ER_SECONDARY_ENGINE_DDL_FAILED, full_tab_name,
(is_load ? "secondary_load" : "secondary_unload"),
"metadata update failed");
return true;
}

DBUG_PRINT("sec_load_unload", ("secondary_load flag %s for table %s",
(is_load ? "set" : "reset"), full_tab_name));
DBUG_PRINT("sec_load_unload", ("secondary_load flag %s for table %s",
(is_load ? "set" : "reset"), full_tab_name));

DBUG_EXECUTE_IF("sim_fail_before_write_bin_log", {
DBUG_PRINT("sec_load_unload", ("Force exit before binlog write"));
my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure of sec_{un}load before write_bin_log()");
return true;
});
DBUG_EXECUTE_IF("sim_fail_before_write_bin_log", {
DBUG_PRINT("sec_load_unload", ("Force exit before binlog write"));
my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure of sec_{un}load before write_bin_log()");
return true;
});

/* Write binlog to maintain replication consistency. Read-Replica's may not
* have binlog enabled. write_bin_log API takes care of such cases. */
if (DBUG_EVALUATE_IF("sim_fail_binlog_write",
(my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure during binlog write"),
true),
false) ||
(write_bin_log(thd, true, thd->query().str, thd->query().length, true) !=
0)) {
LogErr(ERROR_LEVEL, ER_SECONDARY_ENGINE_DDL_FAILED, full_tab_name,
(is_load ? "secondary_load" : "secondary_unload"),
"binlog write failed");
return true;
}
/* Write binlog to maintain replication consistency. Read-Replica's may not
* have binlog enabled. write_bin_log API takes care of such cases. */
if (DBUG_EVALUATE_IF("sim_fail_binlog_write",
(my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure during binlog write"),
true),
false) ||
(write_bin_log(thd, true, thd->query().str, thd->query().length,
true) != 0)) {
LogErr(ERROR_LEVEL, ER_SECONDARY_ENGINE_DDL_FAILED, full_tab_name,
(is_load ? "secondary_load" : "secondary_unload"),
"binlog write failed");
return true;
}

DBUG_PRINT("sec_load_unload",
("binlog entry added for alter table %s secondary_%s",
full_tab_name, (is_load ? "load" : "unload")));
DBUG_PRINT("sec_load_unload",
("binlog entry added for alter table %s secondary_%s",
full_tab_name, (is_load ? "load" : "unload")));

DBUG_EXECUTE_IF("sim_fail_after_write_bin_log", {
DBUG_PRINT("sec_load_unload", ("Force exit after binlog write"));
my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure of sec_{un}load after write_bin_log()");
return true;
});
DBUG_EXECUTE_IF("sim_fail_after_write_bin_log", {
DBUG_PRINT("sec_load_unload", ("Force exit after binlog write"));
my_error(ER_SECONDARY_ENGINE, MYF(0),
"Simulated failure of sec_{un}load after write_bin_log()");
return true;
});
}

// Close primary table.
close_all_tables_for_name(thd, table_list->table->s, false, nullptr);
Expand Down
Loading

0 comments on commit 5a94176

Please sign in to comment.