Skip to content

Commit

Permalink
feat(shannon): dynamic offload impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Dec 20, 2024
1 parent b97d230 commit a1a9429
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 5 deletions.
5 changes: 5 additions & 0 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,11 @@ void Open_tables_state::reset_open_tables_state() {
reset_reprepare_observers();
}

//To cache all info need by secondary engine at RapidPrepareEstimateQueryCosts stage.
void Secondary_engine_statement_context::cache_primary_plan_info(JOIN* join) {
m_primary_plan = join;
}

THD::THD(bool enable_plugins)
: Query_arena(&main_mem_root, STMT_REGULAR_EXECUTION),
mark_used_columns(MARK_COLUMNS_READ),
Expand Down
6 changes: 4 additions & 2 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,12 @@ class Secondary_engine_statement_context {
*/
virtual ~Secondary_engine_statement_context() = default;
virtual bool is_primary_engine_optimal() const { return true; }
virtual void cache_primary_plan_info(JOIN* join) {
m_primary_plan = join;
virtual void cache_primary_plan_info(JOIN* join);
virtual JOIN* get_cached_primary_plan_info() const {
return m_primary_plan;
}
private:
double m_primary_cost {0};
JOIN* m_primary_plan {nullptr};
};

Expand Down
71 changes: 68 additions & 3 deletions storage/rapid_engine/handler/ha_shannon_rapid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,45 @@ static void rapid_kill_connection(handlerton *hton, /*!< in: innobase handlerto
}
}

// In this function, Dynamic offload combines mysql plan features
// retrieved from rapid_statement_context
// and RAPID info such as rapid base table cardinality,
// dict encoding projection, varlen projection size, rapid queue
// size in to decide if query should be offloaded to RAPID.
// returns true, goes to innodb for execution.
// returns false, goes to next phase for secondary engine execution.
static bool RapidPrepareEstimateQueryCosts(THD *thd, LEX *lex) {
if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF)
return true;
else if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_FORCED)
return false;

// gets the shannon statement context from thd, which stores in SecondaryEnginePrePrepareHook.
if (!thd->variables.rapid_use_dynamic_offload) {
return false;
}

auto shannon_statement_context = thd->secondary_engine_statement_context();
auto primary_plan_info = shannon_statement_context->get_cached_primary_plan_info();

// 1: to check whether the sys_pop_data_sz has too many data to populate.
uint64 too_much_pop_threshold = static_cast<uint64_t>(0.8 * ShannonBase::rpd_pop_buff_sz_max);
if (ShannonBase::Populate::sys_pop_data_sz > too_much_pop_threshold) {
return true;
}

// 2: to check whether there're changes in sys_pop_buff, which will be used for query.
// if there're still do populating, then goes to innodb. and gets cardinality of tables.
for (uint i = primary_plan_info->tables; i < primary_plan_info->tables; i++) {
std::string db_tb;
if (ShannonBase::Populate::Populator::check_populating_status(db_tb)) return true;
}

// 3: checks dict encoding projection, and varlen project size, etc.

return false;
}

static bool PrepareSecondaryEngine(THD *thd, LEX *lex) {
DBUG_EXECUTE_IF("secondary_engine_rapid_prepare_error", {
my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "");
Expand All @@ -552,7 +591,7 @@ static bool PrepareSecondaryEngine(THD *thd, LEX *lex) {
// optimization. But, it is not to producce an optima count query plan.
lex->add_statement_options(OPTION_NO_CONST_TABLES | OPTION_NO_SUBQUERY_DURING_OPTIMIZATION);

return false;
return RapidPrepareEstimateQueryCosts(thd, lex);
}

// caches primary info.
Expand All @@ -565,11 +604,24 @@ static bool RapidCachePrimaryInfoAtPrimaryTentativelyStep(THD *thd) {
}

auto shannon_statement_context = thd->secondary_engine_statement_context();
shannon_statement_context->cache_primary_plan_info(thd->lex->query_block->join);
Query_expression *const unit = thd->lex->unit;
shannon_statement_context->cache_primary_plan_info(unit->first_query_block()->join);
return false;
}

// If dynamic offload is enabled and query is not "very fast":
// This caches features from mysql plan in rapid_statement_context
// to be used for dynamic offload.
// If dynamic offload is disabled or the query is "very fast":
// This function invokes standary mysql cost threshold classifier,
// which decides if query needs further RAPID optimisation.
// returns true, goes to secondary engine.
// returns false, goes to innodb engine.
bool SecondaryEnginePrePrepareHook(THD *thd) {
if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF)
return false;
else if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_FORCED)
return true;
/**
* If dynamic offload is enabled and query is not "very fast":
This caches features from mysql plan in rapid_statement_context
Expand Down Expand Up @@ -627,6 +679,19 @@ static void AssertSupportedPath(const AccessPath *path) {
assert(path->secondary_engine_data == nullptr);
}

// In this function, Dynamic offload retrieves info from
// rapid_statement_context and additionally looks at Change
// propagation lag to decide if query should be offloaded to rapid
// returns true, goes to innodb engine.
// return false, goes to secondary engine.
static bool RapidOptimize(THD *thd [[maybe_unused]], LEX *lex) {
if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF)
return true;
else if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_FORCED)
return false;

return false;
}
static bool OptimizeSecondaryEngine(THD *thd [[maybe_unused]], LEX *lex) {
// The context should have been set by PrepareSecondaryEngine.
assert(lex->secondary_engine_execution_context() != nullptr);
Expand All @@ -646,7 +711,7 @@ static bool OptimizeSecondaryEngine(THD *thd [[maybe_unused]], LEX *lex) {
});
}

return false;
return RapidOptimize(thd, lex);
}

static bool CompareJoinCost(THD *thd, const JOIN &join, double optimizer_cost, bool *use_best_so_far, bool *cheaper,
Expand Down
3 changes: 3 additions & 0 deletions storage/rapid_engine/populate/populate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace Populate {
std::unordered_map<uint64_t, mtr_log_rec> sys_pop_buff;

std::atomic<bool> sys_pop_started{false};
// how many data was in sys_pop_buff?
std::atomic<ulonglong> sys_pop_data_sz{0};
static ulint sys_rapid_loop_count{0};

Expand Down Expand Up @@ -175,5 +176,7 @@ void Populator::rapid_print_thread_info(FILE *file) { /* in: output stream */
ShannonBase::Populate::sys_pop_started ? "running" : "stopped", ShannonBase::Populate::sys_rapid_loop_count);
}

bool Populator::check_populating_status(std::string &table_name) { return false; }

} // namespace Populate
} // namespace ShannonBase
2 changes: 2 additions & 0 deletions storage/rapid_engine/populate/populate.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class Populator {
static void end_change_populate_threads();
// to print thread infos.
static void rapid_print_thread_info(FILE *file);
// to check whether the specific table are still do populating.
static bool check_populating_status(std::string &table_name);
};

} // namespace Populate
Expand Down

0 comments on commit a1a9429

Please sign in to comment.