Skip to content

Commit

Permalink
feat(shannon): selective dynamic offload
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Dec 10, 2024
1 parent 2d9f534 commit c2d71a4
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 39 deletions.
41 changes: 41 additions & 0 deletions sql/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ enum enum_alter_inplace_result {
HA_ALTER_INPLACE_INSTANT
};

/**
* Used to identify which engine executed a SELECT query.
*/
enum class SelectExecutedIn : bool { kPrimaryEngine, kSecondaryEngine };

/* Bits in table_flags() to show what database can do */

#define HA_NO_TRANSACTIONS (1 << 0) /* Doesn't support transactions */
Expand Down Expand Up @@ -2546,6 +2551,35 @@ typedef void (*se_after_commit_t)(void *arg);
// before_rollback hook. Remove after WL#11320 has been completed.
typedef void (*se_before_rollback_t)(void *arg);

/**
Notify plugins when a SELECT query was executed. The plugins will be notified
only if the query is not considered secondary engine relevant, i.e.:
1. for a query with missing secondary_engine_statement_ctx, its estimated cost
is greater than the currently configured 'secondary_engine_cost_threshold'
2. for queries with secondary_engine_statement_ctx, wherever
secondary_engine_statement_ctx::is_primary_engine_optimal() returns False
indicating secondary engine relevance.
*/
using notify_after_select_t = void (*)(THD *thd, SelectExecutedIn executed_in);

/**
* Notify plugins when a table is created.
*/
using notify_create_table_t = void (*)(struct HA_CREATE_INFO *create_info,
const char *db, const char *table_name);

/**
Secondary engine hook called after PRIMARY_TENTATIVELY optimization is
complete, and decides if secondary engine optimization will be performed, and
comparison of primary engine cost and secondary engine cost will determine
which engine to use for execution.
@param[in] thd current thd.
@return :
@retval true When secondary_engine's prepare hook is to be further called
@retval false When secondary_engine's prepare hook is NOT to be further called
*/
using secondary_engine_pre_prepare_hook_t = bool (*)(THD *thd);
/*
Page Tracking : interfaces to handlerton functions which starts/stops page
tracking, and purges/fetches page tracking information.
Expand Down Expand Up @@ -2901,10 +2935,17 @@ struct handlerton {
secondary_engine_check_optimizer_request_t
secondary_engine_check_optimizer_request;

/* Pointer to a function that is called at the end of the PRIMARY_TENTATIVELY
* optimization stage, which also decides that the statement should be
* attempted offloaded to a secondary storage engine. */
secondary_engine_pre_prepare_hook_t secondary_engine_pre_prepare_hook;

se_before_commit_t se_before_commit;
se_after_commit_t se_after_commit;
se_before_rollback_t se_before_rollback;

notify_after_select_t notify_after_select;

/** Page tracking interface */
Page_track_t page_track;
};
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6755,7 +6755,7 @@ static bool open_secondary_engine_tables(THD *thd, uint flags) {
// future executions of the statement, since these properties will
// not change between executions.
const LEX_CSTRING *secondary_engine =
sql_cmd->eligible_secondary_storage_engine();
sql_cmd->eligible_secondary_storage_engine(thd);
const plugin_ref secondary_engine_plugin =
secondary_engine == nullptr
? nullptr
Expand Down
7 changes: 7 additions & 0 deletions sql/sql_class.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,11 @@ void THD::set_transaction(Transaction_ctx *transaction_ctx) {
m_transaction.reset(transaction_ctx);
}

void THD::set_secondary_engine_statement_context(
std::unique_ptr<Secondary_engine_statement_context> context) {
m_secondary_engine_statement_context = std::move(context);
}

bool THD::set_db(const LEX_CSTRING &new_db) {
bool result;
/*
Expand Down Expand Up @@ -1432,6 +1437,8 @@ THD::~THD() {
DBUG_TRACE;
DBUG_PRINT("info", ("THD dtor, this %p", this));

assert(m_secondary_engine_statement_context == nullptr);

if (!release_resources_done()) release_resources();

clear_next_event_pos();
Expand Down
34 changes: 34 additions & 0 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,23 @@ using Event_tracking_data =
std::pair<Event_tracking_class, Event_tracking_information *>;
using Event_tracking_data_stack = std::stack<Event_tracking_data>;

/**
Base class for secondary engine statement context objects. Secondary
storage engines may create classes derived from this one which
contain state they need to preserve in lifecycle of this query.
*/
class Secondary_engine_statement_context {
public:
/**
Destructs the secondary engine statement context object. It is
called after the query execution has completed. Secondary engines
may override the destructor in subclasses and add code that
performs cleanup tasks that are needed after query execution.
*/
virtual ~Secondary_engine_statement_context() = default;
virtual bool is_primary_engine_optimal() const { return true; }
};

/**
@class THD
For each client connection we create a separate thread with THD serving as
Expand Down Expand Up @@ -1056,6 +1073,12 @@ class THD : public MDL_context_owner,
*/
String m_rewritten_query;

/**
Current query's secondary engine statement context.
*/
std::unique_ptr<Secondary_engine_statement_context>
m_secondary_engine_statement_context;

public:
/* Used to execute base64 coded binlog events in MySQL server */
Relay_log_info *rli_fake;
Expand All @@ -1081,6 +1104,17 @@ class THD : public MDL_context_owner,
*/
void rpl_detach_engine_ha_data();

/*
Set secondary_engine_statement_context to new context.
This function assumes existing m_secondary_engine_statement_context is empty,
such that there's only context throughout the query's lifecycle.
*/
void set_secondary_engine_statement_context(
std::unique_ptr<Secondary_engine_statement_context> context);
Secondary_engine_statement_context *secondary_engine_statement_context() {
return m_secondary_engine_statement_context.get();
}

/**
When the thread is a binlog or slave applier it reattaches the engine
ha_data associated with it and memorizes the fact of that.
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ class Sql_cmd {
the statement is not eligible for execution in a secondary storage
engine
*/
virtual const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine() const {
virtual const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine(
THD *) const {
return nullptr;
}

/**
Disable use of secondary storage engines in this statement. After
a call to this function, the statement will not try to use a
Expand Down
4 changes: 3 additions & 1 deletion sql/sql_cmd_ddl_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ static bool populate_table(THD *thd, LEX *lex) {

if (unit->execute(thd)) return true;

notify_plugins_after_select(thd, lex->m_sql_cmd);

return false;
}

Expand Down Expand Up @@ -462,7 +464,7 @@ bool Sql_cmd_create_table::execute(THD *thd) {
}

const MYSQL_LEX_CSTRING *
Sql_cmd_create_table::eligible_secondary_storage_engine() const {
Sql_cmd_create_table::eligible_secondary_storage_engine(THD *) const {
// Now check if the opened tables are available in a secondary
// storage engine. Only use the secondary tables if all the tables
// have a secondary tables, and they are all in the same secondary
Expand Down
3 changes: 2 additions & 1 deletion sql/sql_cmd_ddl_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class Sql_cmd_create_table final : public Sql_cmd_ddl_table {
return SQLCOM_CREATE_TABLE;
}

const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine() const override;
const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine(
THD *thd) const override;

bool execute(THD *thd) override;
bool prepare(THD *thd) override;
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_cmd_dml.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class Sql_cmd_dml : public Sql_cmd {
@return nullptr if not eligible or the name of the engine otherwise
*/
const MYSQL_LEX_CSTRING *get_eligible_secondary_engine() const;
const MYSQL_LEX_CSTRING *get_eligible_secondary_engine(THD *thd) const;

protected:
LEX *lex; ///< Pointer to LEX for this statement
Expand Down
3 changes: 2 additions & 1 deletion sql/sql_do.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class Sql_cmd_do final : public Sql_cmd_select {

enum_sql_command sql_command_code() const override { return SQLCOM_DO; }

const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine() const override {
const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine(
THD *) const override {
return nullptr;
}
};
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_insert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3350,12 +3350,12 @@ bool Sql_cmd_insert_base::accept(THD *thd, Select_lex_visitor *visitor) {
}

const MYSQL_LEX_CSTRING *
Sql_cmd_insert_select::eligible_secondary_storage_engine() const {
Sql_cmd_insert_select::eligible_secondary_storage_engine(THD *thd) const {
// ON DUPLICATE KEY UPDATE cannot be offloaded
if (!update_field_list.empty()) return nullptr;

// Don't use secondary storage engines for REPLACE INTO SELECT statements
if (is_replace) return nullptr;

return get_eligible_secondary_engine();
return get_eligible_secondary_engine(thd);
}
3 changes: 2 additions & 1 deletion sql/sql_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ class Sql_cmd_insert_select : public Sql_cmd_insert_base {
enum_sql_command sql_command_code() const override {
return is_replace ? SQLCOM_REPLACE_SELECT : SQLCOM_INSERT_SELECT;
}
const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine() const override;
const MYSQL_LEX_CSTRING *eligible_secondary_storage_engine(
THD *thd) const override;
};

#endif /* SQL_INSERT_INCLUDED */
1 change: 1 addition & 0 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,7 @@ bool dispatch_command(THD *thd, const COM_DATA *com_data,

thd->bind_parameter_values = nullptr;
thd->bind_parameter_values_count = 0;
thd->set_secondary_engine_statement_context(nullptr);

/* Need to set error to true for graceful shutdown */
if ((thd->lex->sql_command == SQLCOM_SHUTDOWN) &&
Expand Down
3 changes: 3 additions & 0 deletions sql/sql_prepare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2999,6 +2999,9 @@ bool Prepared_statement::execute_loop(THD *thd, String *expanded_query,
bool error;
bool reprepared_for_types [[maybe_unused]] = false;

auto scope_guard = create_scope_guard(
[thd] { thd->set_secondary_engine_statement_context(nullptr); });

/* Check if we got an error when sending long data */
if (m_arena.get_state() == Query_arena::STMT_ERROR) {
my_message(m_last_errno, m_last_error, MYF(0));
Expand Down
Loading

0 comments on commit c2d71a4

Please sign in to comment.