From a2fccc57159f16bbd3ed08016b1197d6f056c8a0 Mon Sep 17 00:00:00 2001 From: shannon data ai Date: Thu, 6 Jun 2024 10:37:33 +0800 Subject: [PATCH] feat(shannon): parallel query processing ii part ii of parallel query processing. --- mysql-test/r/mysqld--help-notwin.result | 19 + sql/field.cc | 8 +- sql/field.h | 2 +- sql/item.cc | 73 ++-- sql/item.h | 21 +- sql/item_func.cc | 1 + sql/item_strfunc.h | 1 + sql/item_sum.cc | 13 +- sql/item_sum.h | 3 +- sql/item_timefunc.h | 5 +- sql/mysqld.cc | 16 + sql/mysqld.h | 3 + sql/query_result.cc | 20 +- sql/sql_class.h | 3 +- sql/sql_lex.cc | 7 +- sql/sql_lex.h | 17 +- sql/sql_parallel.h | 3 +- sql/sql_pq_condition.cc | 528 ++++++++++++++++++++++-- sql/sql_pq_condition.h | 6 +- sql/sql_prepare.cc | 5 +- sql/sql_resolver.cc | 1 + sql/sql_tmp_table.cc | 31 ++ sql/sys_vars.cc | 49 +++ 23 files changed, 731 insertions(+), 104 deletions(-) diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 4b77e03af..afa7820df 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -400,6 +400,8 @@ The following options may be given as the first argument: --flush Flush MyISAM tables to disk between SQL commands --flush-time=# A dedicated thread is created to flush all tables at the given interval + --force-parallel-execute + force parallel execute in session --ft-boolean-syntax=name List of operators for MATCH ... AGAINST ( ... IN BOOLEAN MODE) @@ -842,6 +844,17 @@ The following options may be given as the first argument: Maximum allowed cumulated size of stored optimizer traces --optimizer-trace-offset=# Offset of first optimizer trace to show; see manual + --parallel-cost-threshold=# + Cost threshold for parallel query. + --parallel-default-dop=# + default degree of parallel query. + --parallel-max-threads=# + max running threads of parallel query. + --parallel-memory-limit=# + upper limit memory size that parallel query can use + --parallel-queue-timeout=# + queue timeout for parallel query when resource is not + enough .the unit is microseconds --parser-max-mem-size=# Maximum amount of memory available to the parser --partial-revokes Access of database objects can be restricted, even if @@ -1754,6 +1767,7 @@ explicit-defaults-for-timestamp TRUE external-locking FALSE flush FALSE flush-time 0 +force-parallel-execute FALSE ft-boolean-syntax + -><()~*:""&| ft-max-word-len 84 ft-min-word-len 4 @@ -1881,6 +1895,11 @@ optimizer-trace-features greedy_search=on,range_optimizer=on,dynamic_range=on,re optimizer-trace-limit 1 optimizer-trace-max-mem-size 1048576 optimizer-trace-offset -1 +parallel-cost-threshold 1000 +parallel-default-dop 4 +parallel-max-threads 64 +parallel-memory-limit 104857600 +parallel-queue-timeout 0 parser-max-mem-size 18446744073709551615 partial-revokes #### password-history 0 diff --git a/sql/field.cc b/sql/field.cc index c92514a16..577c43d66 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -2757,9 +2757,11 @@ Field_new_decimal::Field_new_decimal(uint32 len_arg, bool is_nullable_arg, bin_size = my_decimal_get_binary_size(precision, dec); } -Field *Field_new_decimal::create_from_item(const Item *item) { +Field *Field_new_decimal::create_from_item(const Item *item, MEM_ROOT *root) { + MEM_ROOT *pq_check_root = root ? root : *THR_MALLOC; + uint8 dec = item->decimals; - const uint8 intg = item->decimal_precision() - dec; + uint8 intg = item->decimal_precision() - dec; uint32 len = item->max_char_length(); assert(item->result_type() == DECIMAL_RESULT); @@ -2793,7 +2795,7 @@ Field *Field_new_decimal::create_from_item(const Item *item) { /* Corrected value fits. */ len = required_length; } - return new (*THR_MALLOC) + return new (pq_check_root) Field_new_decimal(len, item->is_nullable(), item->item_name.ptr(), dec, item->unsigned_flag); } diff --git a/sql/field.h b/sql/field.h index bdf6c75e6..75b0ce8eb 100644 --- a/sql/field.h +++ b/sql/field.h @@ -2167,7 +2167,7 @@ class Field_new_decimal : public Field_num { return new (mem_root) Field_new_decimal(*this); } const uchar *unpack(uchar *to, const uchar *from, uint param_data) final; - static Field *create_from_item(const Item *item); + static Field *create_from_item(const Item *item, MEM_ROOT *root = nullptr); bool send_to_protocol(Protocol *protocol) const final; void set_keep_precision(bool arg) { m_keep_precision = arg; } }; diff --git a/sql/item.cc b/sql/item.cc index 41300095b..59ef5523b 100644 --- a/sql/item.cc +++ b/sql/item.cc @@ -6467,19 +6467,21 @@ bool Item::eq_by_collation(Item *item, bool binary_cmp, @param table Table for which the field is created */ -Field *Item::make_string_field(TABLE *table) const { +Field *Item::make_string_field(TABLE *table, MEM_ROOT *root) const { Field *field; + MEM_ROOT *pq_check_root = root ? root : *THR_MALLOC; + assert(collation.collation); if (data_type() == MYSQL_TYPE_JSON) field = - new (*THR_MALLOC) Field_json(max_length, m_nullable, item_name.ptr()); + new (pq_check_root) Field_json(max_length, m_nullable, item_name.ptr()); else if (data_type() == MYSQL_TYPE_GEOMETRY) { - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_geom(max_length, m_nullable, item_name.ptr(), Field::GEOM_GEOMETRY, std::optional()); } else if (max_length / collation.collation->mbmaxlen > CONVERT_IF_BIGGER_TO_BLOB) - field = new (*THR_MALLOC) Field_blob( + field = new (pq_check_root) Field_blob( max_length, m_nullable, item_name.ptr(), collation.collation, true); /* Item_type_holder holds the exact type, do not change it */ else if (max_length > 0 && @@ -6502,70 +6504,67 @@ Field *Item::make_string_field(TABLE *table) const { @return Created field @retval NULL error */ - -Field *Item::tmp_table_field_from_field_type(TABLE *table, - bool fixed_length) const { - /* - The field functions defines a field to be not null if null_ptr is not 0 - */ +Field *Item::tmp_table_field_from_field_type(TABLE *table, bool fixed_length, + MEM_ROOT *root) const { + /*The field functions defines a field to be not null if null_ptr is not 0*/ Field *field; + MEM_ROOT *pq_check_root = root ? root : *THR_MALLOC; switch (data_type()) { case MYSQL_TYPE_DECIMAL: case MYSQL_TYPE_NEWDECIMAL: - field = Field_new_decimal::create_from_item(this); + field = Field_new_decimal::create_from_item(this, root); break; case MYSQL_TYPE_TINY: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_tiny(max_length, m_nullable, item_name.ptr(), unsigned_flag); break; case MYSQL_TYPE_SHORT: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_short(max_length, m_nullable, item_name.ptr(), unsigned_flag); break; case MYSQL_TYPE_LONG: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_long(max_length, m_nullable, item_name.ptr(), unsigned_flag); break; case MYSQL_TYPE_LONGLONG: - field = new (*THR_MALLOC) Field_longlong(max_length, m_nullable, - item_name.ptr(), unsigned_flag); + field = new (pq_check_root) Field_longlong( + max_length, m_nullable, item_name.ptr(), unsigned_flag); break; case MYSQL_TYPE_FLOAT: - field = new (*THR_MALLOC) Field_float( + field = new (pq_check_root) Field_float( max_length, m_nullable, item_name.ptr(), decimals, unsigned_flag); break; case MYSQL_TYPE_DOUBLE: - field = new (*THR_MALLOC) Field_double( + field = new (pq_check_root) Field_double( max_length, m_nullable, item_name.ptr(), decimals, unsigned_flag); break; case MYSQL_TYPE_INT24: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_medium(max_length, m_nullable, item_name.ptr(), unsigned_flag); break; case MYSQL_TYPE_DATE: case MYSQL_TYPE_NEWDATE: - field = new (*THR_MALLOC) Field_newdate(m_nullable, item_name.ptr()); + field = new (pq_check_root) Field_newdate(m_nullable, item_name.ptr()); break; case MYSQL_TYPE_TIME: - field = - new (*THR_MALLOC) Field_timef(m_nullable, item_name.ptr(), decimals); + field = new (pq_check_root) + Field_timef(m_nullable, item_name.ptr(), decimals); break; case MYSQL_TYPE_TIMESTAMP: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_timestampf(m_nullable, item_name.ptr(), decimals); break; case MYSQL_TYPE_DATETIME: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_datetimef(m_nullable, item_name.ptr(), decimals); break; case MYSQL_TYPE_YEAR: assert(max_length == 4); // Field_year is only for length 4. - assert(decimal_precision() == 4); - field = new (*THR_MALLOC) Field_year(m_nullable, item_name.ptr()); + field = new (pq_check_root) Field_year(m_nullable, item_name.ptr()); break; case MYSQL_TYPE_BIT: - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_bit_as_char(max_length, m_nullable, item_name.ptr()); break; case MYSQL_TYPE_INVALID: @@ -6578,7 +6577,7 @@ Field *Item::tmp_table_field_from_field_type(TABLE *table, case MYSQL_TYPE_STRING: case MYSQL_TYPE_NULL: if (fixed_length && max_length <= CONVERT_IF_BIGGER_TO_BLOB) { - field = new (*THR_MALLOC) Field_string( + field = new (pq_check_root) Field_string( max_length, m_nullable, item_name.ptr(), collation.collation); break; } @@ -6588,26 +6587,28 @@ Field *Item::tmp_table_field_from_field_type(TABLE *table, case MYSQL_TYPE_SET: case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_VARCHAR: - return make_string_field(table); + field = make_string_field(table, root); + // if (field) field->set_pseudo(is_pseudo); + return field; case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: case MYSQL_TYPE_LONG_BLOB: case MYSQL_TYPE_BLOB: if (this->type() == Item::TYPE_HOLDER) - field = new (*THR_MALLOC) Field_blob( + field = new (pq_check_root) Field_blob( max_length, m_nullable, item_name.ptr(), collation.collation, true); else - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_blob(max_length, m_nullable, item_name.ptr(), collation.collation, false); break; // Blob handled outside of case case MYSQL_TYPE_GEOMETRY: - field = new (*THR_MALLOC) Field_geom( + field = new (pq_check_root) Field_geom( max_length, m_nullable, item_name.ptr(), get_geometry_type(), {}); break; case MYSQL_TYPE_JSON: - field = - new (*THR_MALLOC) Field_json(max_length, m_nullable, item_name.ptr()); + field = new (pq_check_root) + Field_json(max_length, m_nullable, item_name.ptr()); } if (field) field->init(table); return field; @@ -11026,4 +11027,8 @@ bool AllItemsAreEqual(const Item *const *a, const Item *const *b, int num_items, return true; } +void set_has_notsupported_func_true(void) { + current_thd->lex->has_notsupported_func = true; +} + #include "sql/sql_pq_clone_item.inc" \ No newline at end of file diff --git a/sql/item.h b/sql/item.h index 5b5323980..9494ce0ab 100644 --- a/sql/item.h +++ b/sql/item.h @@ -1239,7 +1239,8 @@ class Item : public Parse_tree_node { */ virtual void notify_removal() {} virtual void make_field(Send_field *field); - virtual Field *make_string_field(TABLE *table) const; + virtual Field *make_string_field(TABLE *table, + MEM_ROOT *root = nullptr) const; virtual bool fix_fields(THD *, Item **); /** Fix after tables have been moved from one query_block level to the parent @@ -3092,7 +3093,9 @@ class Item : public Parse_tree_node { // used in row subselects to get value of elements virtual void bring_value() {} - Field *tmp_table_field_from_field_type(TABLE *table, bool fixed_length) const; + Field *tmp_table_field_from_field_type(TABLE *table, bool fixed_length, + MEM_ROOT *root = nullptr) const; + virtual Item_field *field_for_view_update() { return nullptr; } /** Informs an item that it is wrapped in a truth test, in case it wants to @@ -5870,16 +5873,15 @@ class Item_ref : public Item_ident { }; PQ_copy_type copy_type; + + /// Indirect pointer to the referenced item. + Item **m_ref_item{nullptr}; private: /// True if referenced item has been unlinked, used during item tree removal bool m_unlinked{false}; Field *result_field{nullptr}; /* Save result here */ - protected: - /// Indirect pointer to the referenced item. - Item **m_ref_item{nullptr}; - public: Item_ref(Name_resolution_context *context_arg, const char *db_name_arg, const char *table_name_arg, const char *field_name_arg) @@ -5911,8 +5913,8 @@ class Item_ref : public Item_ident { /* Constructor need to process subselect with temporary tables (see Item) */ Item_ref(THD *thd, Item_ref *item) : Item_ident(thd, item), - result_field(item->result_field), - m_ref_item(item->m_ref_item) {} + m_ref_item(item->m_ref_item), + result_field(item->result_field) {} /// @returns the item referenced by this object Item *ref_item() const { return *m_ref_item; } @@ -6097,7 +6099,8 @@ class Item_ref : public Item_ident { return ref_item()->check_column_in_group_by(arg); } bool collect_item_field_or_ref_processor(uchar *arg) override; - Item *pq_clone(THD *thd, Query_block *select) override; + + Item *pq_clone(THD *thd, Query_block *select) override; }; /** diff --git a/sql/item_func.cc b/sql/item_func.cc index a8a45c16e..2c44f1e5a 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -8036,6 +8036,7 @@ bool Item_func_sp::do_itemize(Parse_context *pc, Item **res) { context = lex->current_context(); lex->safe_to_cache_query = false; + lex->has_sp = true; if (m_name->m_db.str == nullptr) { if (thd->lex->copy_db_to(&m_name->m_db.str, &m_name->m_db.length)) { diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h index d2980fa7f..cd6042172 100644 --- a/sql/item_strfunc.h +++ b/sql/item_strfunc.h @@ -786,6 +786,7 @@ class Item_func_make_set final : public Item_str_func { void print(const THD *thd, String *str, enum_query_type query_type) const override; Item *pq_clone(THD *thd, Query_block *select) override; + Item* get_item() { return item; } }; class Item_func_format final : public Item_str_ascii_func { diff --git a/sql/item_sum.cc b/sql/item_sum.cc index 4a6571f3a..5af5db8fe 100644 --- a/sql/item_sum.cc +++ b/sql/item_sum.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2000, 2023, Oracle and/or its affiliates. + Copyright (c) 2021, Huawei Technologies Co., Ltd. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, @@ -18,7 +19,9 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ /** @file @@ -4314,8 +4317,10 @@ void Item_func_group_concat::cleanup() { row_count = 0; } -Field *Item_func_group_concat::make_string_field(TABLE *table_arg) const { +Field *Item_func_group_concat::make_string_field(TABLE *table_arg, + MEM_ROOT *root) const { Field *field; + MEM_ROOT *pq_check_root = root ? root : *THR_MALLOC; assert(collation.collation); /* Use mbminlen to determine maximum number of characters. @@ -4335,11 +4340,11 @@ Field *Item_func_group_concat::make_string_field(TABLE *table_arg) const { UINT_MAX32); if (max_characters > CONVERT_IF_BIGGER_TO_BLOB) - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_blob(field_length, is_nullable(), item_name.ptr(), collation.collation, true); else - field = new (*THR_MALLOC) + field = new (pq_check_root) Field_varstring(field_length, is_nullable(), item_name.ptr(), table_arg->s, collation.collation); diff --git a/sql/item_sum.h b/sql/item_sum.h index e7017e65a..76c1065e2 100644 --- a/sql/item_sum.h +++ b/sql/item_sum.h @@ -2207,7 +2207,8 @@ class Item_func_group_concat final : public Item_sum { enum Sumfunctype sum_func() const override { return GROUP_CONCAT_FUNC; } const char *func_name() const override { return "group_concat"; } Item_result result_type() const override { return STRING_RESULT; } - Field *make_string_field(TABLE *table_arg) const override; + Field *make_string_field(TABLE *table_arg, + MEM_ROOT *root = nullptr) const override; void clear() override; bool add() override; void reset_field() override { assert(0); } // not used diff --git a/sql/item_timefunc.h b/sql/item_timefunc.h index f0ddde31c..1823f6d11 100644 --- a/sql/item_timefunc.h +++ b/sql/item_timefunc.h @@ -2,6 +2,7 @@ #define ITEM_TIMEFUNC_INCLUDED /* Copyright (c) 2000, 2023, Oracle and/or its affiliates. + Copyright (c) 2021, Huawei Technologies Co., Ltd. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, @@ -21,7 +22,9 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ /* Function items used by mysql */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c380332ba..414823b40 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -810,6 +810,7 @@ MySQL clients support the protocol: #include "sql/options_mysqld.h" // OPT_THREAD_CACHE_SIZE #include "sql/partitioning/partition_handler.h" // partitioning_init #include "sql/persisted_variable.h" // Persisted_variables_cache +#include "sql/sql_parallel.h" #include "sql/plugin_table.h" #include "sql/protocol.h" #include "sql/psi_memory_key.h" // key_memory_MYSQL_RELAY_LOG_index @@ -9720,6 +9721,14 @@ static int show_telemetry_traces_support(THD * /*unused*/, SHOW_VAR *var, return 0; } +static int show_pq_memory(THD *, SHOW_VAR *var, char *buff) { + var->type = SHOW_INT; + var->value = buff; + unsigned int *value = reinterpret_cast(buff); + *value = get_pq_memory_total(); + return 0; +} + SHOW_VAR status_vars[] = { {"Aborted_clients", (char *)&aborted_threads, SHOW_LONG, SHOW_SCOPE_GLOBAL}, {"Aborted_connects", (char *)&show_aborted_connects, SHOW_FUNC, @@ -10082,6 +10091,13 @@ SHOW_VAR status_vars[] = { SHOW_FUNC, SHOW_SCOPE_GLOBAL}, {"Tls_sni_server_name", (char *)&show_ssl_get_tls_sni_servername, SHOW_FUNC, SHOW_SCOPE_SESSION}, + {"PQ_threads_refused", (char *)¶llel_threads_refused, SHOW_INT, + SHOW_SCOPE_GLOBAL}, + {"PQ_memory_refused", (char *)¶llel_memory_refused, SHOW_INT, + SHOW_SCOPE_GLOBAL}, + {"PQ_threads_running", (char *)¶llel_threads_running, SHOW_INT, + SHOW_SCOPE_GLOBAL}, + {"PQ_memory_used", (char *)&show_pq_memory, SHOW_FUNC, SHOW_SCOPE_GLOBAL}, {NullS, NullS, SHOW_LONG, SHOW_SCOPE_ALL}}; void add_terminator(vector *options) { diff --git a/sql/mysqld.h b/sql/mysqld.h index 089d88466..ca059cd47 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -185,6 +185,9 @@ extern Rpl_acf_configuration_handler *rpl_acf_configuration_handler; extern Source_IO_monitor *rpl_source_io_monitor; extern int32_t opt_regexp_time_limit; extern int32_t opt_regexp_stack_limit; +extern uint parallel_threads_running; +extern uint parallel_threads_refused; +extern uint parallel_memory_refused; #ifdef _WIN32 extern bool opt_no_monitor; #endif // _WIN32 diff --git a/sql/query_result.cc b/sql/query_result.cc index 2c3c0074a..2280bcc68 100644 --- a/sql/query_result.cc +++ b/sql/query_result.cc @@ -155,8 +155,8 @@ bool Query_result_mq::send_result_set_metadata( mq_fields_data = new (thd->pq_mem_root) Field_raw_data[send_fields_size]{}; mq_fields_null_array = new (thd->pq_mem_root) bool[2 * field_size]; - mq_fields_null_flag = new ( - thd->pq_mem_root) char[field_size / MQ_FIELDS_DATA_HEADER_LENGTH + 2]; + mq_fields_null_flag = + new (thd->pq_mem_root) char[field_size / MQ_FIELDS_DATA_HEADER_LENGTH + 2]; if (!mq_fields_data || !mq_fields_null_array || !mq_fields_null_flag) { return true; @@ -192,20 +192,8 @@ bool Query_result_mq::send_data( continue; } -// c2: check Item_copy. In the original execution plan, const_item will be -// transformed into Item_copy in tmp_table (or ORDERED_GROUP_BY) -#if 0 - if (item->type() == Item::COPY_STR_ITEM) { - Item *orig_item = down_cast(item)->get_item(); - assert(orig_item && !orig_item->skip_create_tmp_table); - if (orig_item->const_item() || - orig_item->basic_const_item()) { - pq_build_mq_item(orig_item, &mq_fields_data[fields_idx], - mq_fields_null_array,null_num, total_copy_bytes); - continue; - } - } -#endif + // c2: check Item_copy. In the original execution plan, const_item will be + // transformed into Item_copy in tmp_table (or ORDERED_GROUP_BY) // c3: check item_result_field and item_field result_field = item->get_result_field(); diff --git a/sql/sql_class.h b/sql/sql_class.h index fbd3efdf1..184a69085 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -20,7 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ #ifndef SQL_CLASS_INCLUDED @@ -2111,6 +2111,7 @@ class THD : public MDL_context_owner, Attachable_trx *m_attachable_trx; public: + Attachable_trx *get_attachable_trx() { return m_attachable_trx; } Transaction_ctx *get_transaction() { return m_transaction.get(); } const Transaction_ctx *get_transaction() const { return m_transaction.get(); } diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index e8ee443fc..c49d23078 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -1,5 +1,6 @@ /* Copyright (c) 2000, 2023, Oracle and/or its affiliates. + Copyright (c) 2021, Huawei Technologies Co., Ltd. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, @@ -19,7 +20,9 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ /* A lexical scanner on a temporary buffer with a yacc interface */ @@ -530,6 +533,8 @@ void lex_end(LEX *lex) { sp_head::destroy(lex->sphead); lex->sphead = nullptr; + lex->has_sp = false; + lex->has_notsupported_func = false; } void LEX::release_plugins() { diff --git a/sql/sql_lex.h b/sql/sql_lex.h index cf9b172ce..eb87be1ae 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -1,4 +1,5 @@ /* Copyright (c) 2000, 2023, Oracle and/or its affiliates. + Copyright (c) 2021, Huawei Technologies Co., Ltd. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, @@ -18,7 +19,9 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ /** @defgroup GROUP_PARSER Parser @@ -1773,6 +1776,8 @@ class Query_block : public Query_term { bool right_joins() const { return m_right_joins; } void set_right_joins() { m_right_joins = true; } + bool has_foj() const { return m_has_foj; } + void set_has_foj(bool val) { m_has_foj = val; } /// Lookup for Query_block type enum_explain_type type() const; @@ -2218,6 +2223,12 @@ class Query_block : public Query_term { /// How many expressions are part of the order by but not select list. int hidden_order_field_count{0}; + /** + Intrusive linked list of all query blocks within the same + Windows function maybe be optimized, so we save this value to determine + whether support parallel query. */ + uint saved_windows_elements{0}; + private: friend class Query_expression; friend class Condition_context; @@ -2393,6 +2404,7 @@ class Query_block : public Query_term { bool has_sj_nests{false}; bool has_aj_nests{false}; ///< @see has_sj_nests; counts antijoin nests. bool m_right_joins{false}; ///< True if query block has right joins + bool m_has_foj{false}; ///< True if query block has full outer joins /// Allow merge of immediate unnamed derived tables bool allow_merge_derived{true}; @@ -4065,8 +4077,11 @@ struct LEX : public Query_tables_list { bool sp_lex_in_use; /* Keep track on lex usage in SPs for error handling */ bool all_privileges; bool contains_plaintext_password; + bool in_execute_ps{false}; enum_keep_diagnostics keep_diagnostics; uint32 next_binlog_file_nr; + bool has_sp{false}; // Item_func_sp, create function with no nosame option. + bool has_notsupported_func{false}; // true: not support pq,false: support pq private: bool m_broken; ///< see mark_broken() diff --git a/sql/sql_parallel.h b/sql/sql_parallel.h index 4d8e7d9b9..db46ed3ed 100644 --- a/sql/sql_parallel.h +++ b/sql/sql_parallel.h @@ -26,6 +26,7 @@ Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ +#include "my_alloc.h" #include "sql/iterators/basic_row_iterators.h" #include "sql/sql_base.h" #include "sql/sql_lex.h" @@ -178,7 +179,7 @@ extern void add_to_list(SQL_I_List &list, ORDER *order); extern ulonglong parallel_memory_limit; extern ulong parallel_max_threads; -extern uint pq_memory_used[16]; +extern uint pq_memory_used[PQ_MEMORY_USED_BUCKET]; extern uint pq_memory_total_used; extern uint parallel_threads_running; diff --git a/sql/sql_pq_condition.cc b/sql/sql_pq_condition.cc index 2529247d5..ff57e9a79 100644 --- a/sql/sql_pq_condition.cc +++ b/sql/sql_pq_condition.cc @@ -20,6 +20,7 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + Copyright (c) 2023, Shannon Data AI and/or its affiliates. */ #include "sql/sql_pq_condition.h" @@ -31,6 +32,7 @@ #include "sql/range_optimizer/range_optimizer.h" #include "sql/sql_lex.h" #include "sql/sql_optimizer.h" +#include "sql/sql_parallel.h" #include "sql/sql_tmp_table.h" const enum_field_types NO_PQ_SUPPORTED_FIELD_TYPES[] = { @@ -51,8 +53,7 @@ const Item_sum::Sumfunctype NO_PQ_SUPPORTED_AGG_FUNC_TYPES[] = { const Item_func::Functype NO_PQ_SUPPORTED_FUNC_TYPES[] = { Item_func::FT_FUNC, Item_func::MATCH_FUNC, Item_func::SUSERVAR_FUNC, Item_func::FUNC_SP, Item_func::SUSERVAR_FUNC, Item_func::UDF_FUNC, - Item_func::NOT_ALL_FUNC -}; + Item_func::NOT_ALL_FUNC }; const char *NO_PQ_SUPPORTED_FUNC_ARGS[] = { "rand", @@ -225,8 +226,64 @@ bool check_pq_support_fieldtype_of_field_item(Item *item, } bool check_pq_support_fieldtype_of_func_item(Item *item, bool having) { - assert(item && having); - assert(false); + Item_func *func = static_cast(item); + assert(func); + + // check func type + if (pq_not_support_func(func)) { + return false; + } + + // the case of Item_func_make_set + if (!strcmp(func->func_name(), "make_set")) { + Item *arg_item = down_cast(func)->get_item(); + if (arg_item && !check_pq_support_fieldtype(arg_item, having)) { + return false; + } + } + + // check func args type + for (uint i = 0; i < func->arg_count; i++) { + // c: args contain unsupported fields + Item *arg_item = func->arguments()[i]; + if (arg_item == nullptr || + !check_pq_support_fieldtype(arg_item, having)) { // c + return false; + } + } + + // the case of Item_equal + if (func->functype() == Item_func::MULT_EQUAL_FUNC) { + Item_equal *item_equal = down_cast(item); + assert(item_equal); + + // check const_item + Item *const_item = item_equal->const_arg(); + if (const_item && + (const_item->type() == Item::SUM_FUNC_ITEM || // c1 + !check_pq_support_fieldtype(const_item, having))) { // c2 + return false; + } + + // check fields + /* + Item *field_item = nullptr; + List fields = item_equal->get_fields(); + List_iterator_fast it(fields); + for (size_t i = 0; (field_item = it++); i++) { + if (!check_pq_support_fieldtype(field_item, having)) { + return false; + } + } + */ + + for (Item_field &field : item_equal->get_fields()) { + if (!check_pq_support_fieldtype(&field, having)) { + return false; + } + } + } + return true; } @@ -275,7 +332,15 @@ bool check_pq_support_fieldtype_of_sum_func_item(Item *item, bool having) { } bool check_pq_support_fieldtype_of_ref_item(Item *item, bool having) { - assert(item && having); + Item_ref *item_ref = down_cast(item); + if (item_ref == nullptr || pq_not_support_ref(item_ref, having)) { + return false; + } + + if (!check_pq_support_fieldtype(item_ref->m_ref_item[0], having)) { + return false; + } + return true; } @@ -339,8 +404,7 @@ PQ_CHECK_ITEM_TYPE g_check_item_type[] = { {Item::XPATH_NODESET_CMP, nullptr}, {Item::VIEW_FIXER_ITEM, nullptr}, {Item::FIELD_BIT_ITEM, nullptr}, - {Item::VALUES_COLUMN_ITEM, nullptr} - }; + {Item::VALUES_COLUMN_ITEM, nullptr}}; /** * check item is supported by Parallel Query or not @@ -371,8 +435,6 @@ bool check_pq_support_fieldtype(Item *item, bool having) { bool check_pq_sort_aggregation(const ORDER_with_src &order_list) { if (order_list.order == nullptr) { return false; - } else{ - return true; } ORDER *tmp = nullptr; @@ -399,8 +461,133 @@ bool pq_create_result_fields(THD *thd, Temp_table_param *param, mem_root_deque &fields, bool save_sum_fields, ulonglong select_options, MEM_ROOT *root) { - assert(thd && param && root && fields.size() - && save_sum_fields && select_options); + const bool not_all_columns = !(select_options & TMP_TABLE_ALL_COLUMNS); + long hidden_field_count = param->hidden_field_count; + Field *from_field = nullptr; + Field **tmp_from_field = &from_field; + Field **default_field = &from_field; + + bool force_copy_fields = false; + TABLE_SHARE s; + TABLE table; + table.s = &s; + + uint copy_func_count = param->func_count; + if (param->precomputed_group_by) { + copy_func_count += param->sum_func_count; + } + + Func_ptr_array *copy_func = new (root) Func_ptr_array(root); + if (copy_func == nullptr) { + return true; + } + + copy_func->reserve(copy_func_count); + for (Item *item : fields) { + Item::Type type = item->type(); + const bool is_sum_func = + type == Item::SUM_FUNC_ITEM && !item->m_is_window_function; + + if (not_all_columns && item != nullptr) { + if (item->has_aggregation() && type != Item::SUM_FUNC_ITEM) { + if (item->is_outer_reference()) item->update_used_tables(); + if (type == Item::SUBSELECT_ITEM || + (item->used_tables() & ~OUTER_REF_TABLE_BIT)) { + param->using_outer_summary_function = 1; + goto update_hidden; + } + } + + if (item->m_is_window_function) { + if (!param->m_window || param->m_window_frame_buffer) { + goto update_hidden; + } + + if (param->m_window != down_cast(item)->window()) { + goto update_hidden; + } + } else if (item->has_wf()) { + if (param->m_window == nullptr || !param->m_window->is_last()) { + goto update_hidden; + } + } + + if (item->const_item()) continue; + } + + if (is_sum_func && !save_sum_fields) { + /* Can't calc group yet */ + } else { + Field *new_field = nullptr; + if (param->schema_table) { + new_field = + item ? create_tmp_field_for_schema(item, &table, root) : nullptr; + } else { + new_field = + item ? create_tmp_field(thd, &table, item, type, copy_func, + tmp_from_field, default_field, false, //(1) + !force_copy_fields && not_all_columns, + item->marker == Item::MARKER_BIT || + param->bit_fields_as_long, //(2) + force_copy_fields) + : nullptr; + } + + if (new_field == nullptr) { + assert(thd->is_fatal_error()); + return true; + } + + if (not_all_columns && type == Item::SUM_FUNC_ITEM) { + ((Item_sum *)item)->set_result_field(new_field); + } + + s.fields++; + } + + update_hidden: + if (!--hidden_field_count) { + param->hidden_field_count = 0; + } + } // end of while ((item=li++)). + + if (s.fields == 0) return true; + + Field *result_field = nullptr; + + for (Item *item : fields) { + // c1: const_item will not produce field in the first rewritten table + if (item->const_item() || item->basic_const_item()) { + continue; + } + + if (item->has_aggregation() && item->type() != Item::SUM_FUNC_ITEM) { + if (item->type() == Item::SUBSELECT_ITEM || + (item->used_tables() & ~OUTER_REF_TABLE_BIT)) { + continue; + } + } + + result_field = item->get_result_field(); + if (result_field) { + enum_field_types field_type = result_field->type(); + // c3: result_field contains unsupported data type + if (pq_not_support_datatype(field_type)) { + return true; + } + } else { + // c4: item is not FIELD_ITEM and it has no result_field + if (item->type() != Item::FIELD_ITEM) { + return true; + } + + result_field = down_cast(item)->result_field; + if (result_field && pq_not_support_datatype(result_field->type())) { + return true; + } + } + } + return false; } @@ -412,7 +599,126 @@ bool pq_create_result_fields(THD *thd, Temp_table_param *param, * false. */ bool check_pq_select_result_fields(JOIN *join) { - assert(join); + DBUG_ENTER("check result fields is suitable for parallel query or not"); + MEM_ROOT *pq_check_root = ::new MEM_ROOT(); + if (pq_check_root == nullptr) { + DBUG_RETURN(false); + } + + init_sql_alloc(key_memory_thd_main_mem_root, pq_check_root, + global_system_variables.query_alloc_block_size); + + bool suit_for_parallel = false; + + mem_root_deque *tmp_all_fields = join->fields; + + join->tmp_table_param.pq_copy(join->saved_tmp_table_param); + join->tmp_table_param.copy_fields.clear(); + + Temp_table_param *tmp_param = new (pq_check_root) + Temp_table_param(pq_check_root, join->tmp_table_param); + + if (tmp_param == nullptr) { + // free the memory + pq_check_root->Clear(); + if (pq_check_root) { + ::delete pq_check_root; + } + DBUG_RETURN(suit_for_parallel); + } + + tmp_param->m_window_frame_buffer = false; + mem_root_deque tmplist(*tmp_all_fields); + tmp_param->hidden_field_count = CountHiddenFields(*tmp_all_fields); + + // create_tmp_table may change the original item's result_field, hence + // we must save it before. + std::vector saved_result_field(tmplist.size(), nullptr); + + int i = 0; + for (Item *tmp_item : *tmp_all_fields) { + if (tmp_item->type() == Item::FIELD_ITEM || + tmp_item->type() == Item::DEFAULT_VALUE_ITEM) { + saved_result_field[i] = down_cast(tmp_item)->result_field; + } else { + saved_result_field[i] = tmp_item->get_result_field(); + } + i++; + } + + if (pq_create_result_fields(join->thd, tmp_param, tmplist, true, + join->query_block->active_options(), + pq_check_root)) { + suit_for_parallel = false; + } else { + suit_for_parallel = true; + } + + // restore result_field + i = 0; + for (Item *tmp_item : *tmp_all_fields) { + if (tmp_item->type() == Item::FIELD_ITEM || + tmp_item->type() == Item::DEFAULT_VALUE_ITEM) { + down_cast(tmp_item)->result_field = saved_result_field[i]; + } else { + tmp_item->set_result_field(saved_result_field[i]); + } + i++; + } + + // free the memory + pq_check_root->Clear(); + if (pq_check_root) { + ::delete pq_check_root; + } + DBUG_RETURN(suit_for_parallel); +} + +/** + * check whether the select fields is suitable for parallel query + * + * @return: + * true, suitable + * false. + */ +bool check_pq_select_fields(JOIN *join) { + for (Item *item : join->query_block->fields) { + if (!check_pq_support_fieldtype(item, false)) { + return false; + } + } + + // check whether contains blob, text, json and geometry field + for (Item *item : *join->fields) { + if (!check_pq_support_fieldtype(item, false)) { + return false; + } + } + + Item *n_where_cond = join->query_block->where_cond(); + Item *n_having_cond = join->query_block->having_cond(); + + if (n_where_cond && !check_pq_support_fieldtype(n_where_cond, false)) { + return false; + } + + /* + * For Having Aggr. function, the having_item will be pushed + * into all_fields in prepare phase. Currently, we have not support this + * operation. + */ + if (n_having_cond && !check_pq_support_fieldtype(n_having_cond, true)) { + return false; + } + + if (check_pq_sort_aggregation(join->order)) { + return false; + } + + if (!check_pq_select_result_fields(join)) { + return false; + } + return true; } @@ -429,12 +735,20 @@ bool check_pq_select_result_fields(JOIN *join) { * false, cann't found a parallel scan table */ bool choose_parallel_scan_table(JOIN *join) { - assert(join); + QEP_TAB *tab = &join->qep_tab[join->const_tables]; + if (tab->is_inner_table_of_outer_join() || + tab->get_qs()->first_sj_inner() >= 0) { + return false; + } + tab->do_parallel_scan = true; return true; } void set_pq_dop(THD *thd) { - assert(thd); + if (!thd->no_pq && thd->variables.force_parallel_execute && + thd->pq_dop == 0) { + thd->pq_dop = thd->variables.parallel_default_dop; + } } /** @@ -453,23 +767,27 @@ void set_pq_condition_status(THD *thd) { } bool suite_for_parallel_query(THD *thd) { - assert(thd); - return true; -} + if (thd->in_sp_trigger != 0 || // store procedure or trigger + thd->get_attachable_trx() || // attachable transaction + thd->tx_isolation ==ISO_SERIALIZABLE) { // seri without snapshot read + return false; + } -bool suite_for_parallel_query(LEX *lex) { - assert(lex); return true; } -bool suite_for_parallel_query(Query_expression *unit) { - if (!unit->is_simple()) { +bool suite_for_parallel_query(LEX *lex) { + if (lex->in_execute_ps || lex->has_sp || lex->has_notsupported_func) { return false; } return true; } +bool suite_for_parallel_query(Query_expression *unit) { + return unit->is_simple() ? true : false; +} + bool suite_for_parallel_query(Table_ref *tbl_list) { if (tbl_list->is_view() || // view tbl_list->lock_descriptor().type > TL_READ || // explicit table lock @@ -490,18 +808,128 @@ bool suite_for_parallel_query(Table_ref *tbl_list) { } bool suite_for_parallel_query(Query_block *select) { - assert(select); + if (select->first_inner_query_expression() != + nullptr || // nesting subquery, including view〝derived + // table〝subquery condition and so on. + select->outer_query_block() != nullptr || // nested subquery + select->is_distinct() || // select distinct + select->has_foj() || // full out join + select->saved_windows_elements) { // windows function + return false; + } + if (select->has_limit()) { + return false; + } + + for (Table_ref *tbl_list = select->get_table_list(); tbl_list != nullptr; + tbl_list = tbl_list->next_local) { + if (!suite_for_parallel_query(tbl_list)) { + return false; + } + } + + for (Table_ref *tbl_list = select->get_table_list(); tbl_list != nullptr; + tbl_list = tbl_list->next_global) { + if (!suite_for_parallel_query(tbl_list)) { + return false; + } + } + + for (Table_ref *tbl_list = select->leaf_tables; tbl_list != nullptr; + tbl_list = tbl_list->next_leaf) { + if (!suite_for_parallel_query(tbl_list)) { + return false; + } + } return true; } bool suite_for_parallel_query(JOIN *join) { - assert(join); + if ((join->best_read < join->thd->variables.parallel_cost_threshold) || + (join->primary_tables == join->const_tables) || + (join->select_distinct || join->select_count) || + (join->fields->size() > MAX_FIELDS) || + (join->rollup_state != JOIN::RollupState::NONE) || + (join->zero_result_cause != nullptr)) { + return false; + } + QEP_TAB *tab = &join->qep_tab[join->const_tables]; + // only support table/index full/range scan + join_type scan_type = tab->type(); + if (scan_type != JT_ALL && scan_type != JT_INDEX_SCAN && + scan_type != JT_REF && + (scan_type != JT_RANGE || !tab->range_scan() || + tab->range_scan()->quick_select_type() != PQ_RANGE_TYPE::PQ_INDEX_RANGE_SCAN)) { + return false; + } + if (tab->range_scan() && + tab->range_scan()->quick_select_type() != PQ_RANGE_TYPE::PQ_INDEX_RANGE_SCAN) { + return false; + } + + if (!check_pq_select_fields(join)) { + return false; + } + return true; } bool check_pq_running_threads(uint dop, ulong timeout_ms) { - assert(dop && timeout_ms); - return true; + bool success = false; + mysql_mutex_lock(&LOCK_pq_threads_running); + if (dop > parallel_max_threads) { + success = false; + } else if (parallel_threads_running + dop > parallel_max_threads) { + if (timeout_ms > 0) { + struct timespec start_ts; + struct timespec end_ts; + struct timespec abstime; + ulong wait_timeout = timeout_ms; + int wait_result; + + start: + set_timespec(&start_ts, 0); + /* Calcuate the waiting period. */ + abstime.tv_sec = start_ts.tv_sec + wait_timeout / TIME_THOUSAND; + abstime.tv_nsec = + start_ts.tv_nsec + (wait_timeout % TIME_THOUSAND) * TIME_MILLION; + if (abstime.tv_nsec >= TIME_BILLION) { + abstime.tv_sec++; + abstime.tv_nsec -= TIME_BILLION; + } + wait_result = mysql_cond_timedwait(&COND_pq_threads_running, + &LOCK_pq_threads_running, &abstime); + if (parallel_threads_running + dop <= parallel_max_threads) { + success = true; + } else { + success = false; + if (!wait_result) { // wait isn't timeout + set_timespec(&end_ts, 0); + ulong difftime = (end_ts.tv_sec - start_ts.tv_sec) * TIME_THOUSAND + + (end_ts.tv_nsec - start_ts.tv_nsec) / TIME_MILLION; + wait_timeout -= difftime; + goto start; + } + } + } + } else { + success = true; + } + + if (success) { + // uint32x2_t v_a = {parallel_threads_running, + // current_thd->pq_threads_running}; + // uint32x2_t v_b = {dop, dop}; + // v_a = vadd_u32(v_a, v_b); + // parallel_threads_running = vget_lane_u32(v_a, 0); + // current_thd->pq_threads_running = vget_lane_u32(v_a, 1); + + parallel_threads_running += dop; + current_thd->pq_threads_running += dop; + } + + mysql_mutex_unlock(&LOCK_pq_threads_running); + return success; } class PQCheck { @@ -644,6 +1072,52 @@ bool check_select_group_and_order_by(Query_block *select_lex) { } bool check_pq_conditions(THD *thd) { - assert(thd); + // max PQ memory size limit + if (get_pq_memory_total() >= parallel_memory_limit) { + atomic_add(parallel_memory_refused, 1); + return false; + } + + // max PQ threads limit + if (!check_pq_running_threads(thd->pq_dop, + thd->variables.parallel_queue_timeout)) { + atomic_add(parallel_threads_refused, 1); + return false; + } + + Query_block *select = thd->lex->unit->first_query_block(); + if (!check_select_group_and_order_by(select)) { + return false; + } + + // RBO limit + if (!suite_for_parallel_query(thd)) { + return false; + } + + if (!suite_for_parallel_query(thd->lex)) { + return false; + } + + if (!suite_for_parallel_query(thd->lex->unit)) { + return false; + } + + if (!suite_for_parallel_query(select)) { + return false; + } + + if (!suite_for_parallel_query(select->join)) { + return false; + } + + if (!check_select_id_and_type(select)) { + return false; + } + + if (!choose_parallel_scan_table(select->join)) { + return false; + } + return true; -} \ No newline at end of file +} diff --git a/sql/sql_pq_condition.h b/sql/sql_pq_condition.h index 3dc084540..68f9c8e99 100644 --- a/sql/sql_pq_condition.h +++ b/sql/sql_pq_condition.h @@ -1,5 +1,5 @@ -#ifndef SQL_PQ_CONDITION_H -#define SQL_PQ_CONDITION_H +#ifndef __SHANNONBASE_SQL_PQ_CONDITION_H__ +#define __SHANNONBASE_SQL_PQ_CONDITION_H__ /* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2021, Huawei Technologies Co., Ltd. @@ -34,4 +34,4 @@ void set_pq_condition_status(THD *thd); bool check_pq_conditions(THD *thd); -#endif /* SQL_PQ_CONDITION_H */ \ No newline at end of file +#endif //__SHANNONBASE_SQL_PQ_CONDITION_H__ \ No newline at end of file diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index 5f64c3c32..33e43420f 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -405,7 +405,9 @@ class Statement_backup { m_query_string = thd->query(); thd->set_query(stmt->m_query_string); - + if (thd->lex != nullptr) { + thd->lex->in_execute_ps = true; + } m_safe_to_display = thd->safe_to_display(); /* Keep the current behaviour of displaying prepared statements always by @@ -431,6 +433,7 @@ class Statement_backup { stmt->m_query_string = thd->query(); thd->set_query(m_query_string); + thd->lex->in_execute_ps = false; } /** diff --git a/sql/sql_resolver.cc b/sql/sql_resolver.cc index 7ce8ec77f..eb4012abc 100644 --- a/sql/sql_resolver.cc +++ b/sql/sql_resolver.cc @@ -188,6 +188,7 @@ bool Query_block::prepare(THD *thd, mem_root_deque *insert_field_list) { if (is_table_value_constructor) return prepare_values(thd); Query_expression *const unit = master_query_expression(); + if (has_windows()) saved_windows_elements = m_windows.elements; if (!m_table_nest.empty()) propagate_nullability(&m_table_nest, false); diff --git a/sql/sql_tmp_table.cc b/sql/sql_tmp_table.cc index dec7efbb4..0f76953cd 100644 --- a/sql/sql_tmp_table.cc +++ b/sql/sql_tmp_table.cc @@ -509,6 +509,37 @@ Field *create_tmp_field(THD *thd, TABLE *table, Item *item, Item::Type type, return result; } +/** + Create field for information schema table. + + @param table Temporary table + @param item Item to create a field for + + @retval + 0 on error + @retval + new_created field +*/ +Field *create_tmp_field_for_schema(Item *item, TABLE *table, MEM_ROOT *root) { + MEM_ROOT *pq_check_root = root ? root : *THR_MALLOC; + if (item->data_type() == MYSQL_TYPE_VARCHAR) { + Field *field; + if (item->max_length > MAX_FIELD_VARCHARLENGTH) + field = new (pq_check_root) + Field_blob(item->max_length, item->is_nullable(), + item->item_name.ptr(), item->collation.collation, false); + else { + field = new (pq_check_root) Field_varstring( + item->max_length, item->is_nullable(), item->item_name.ptr(), + table->s, item->collation.collation); + table->s->db_create_options |= HA_OPTION_PACK_RECORD; + } + if (field) field->init(table); + return field; + } + return item->tmp_table_field_from_field_type(table, false, root); +} + void Temp_table_param::pq_copy(Temp_table_param *orig) { end_write_records = orig->end_write_records; // field_count = orig->field_count; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 6fcbdd71e..ed7491d2c 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -131,6 +131,7 @@ #include "sql/sp_head.h" // SP_PSI_STATEMENT_INFO_COUNT #include "sql/sql_lex.h" #include "sql/sql_locale.h" // my_locale_by_number +#include "sql/sql_parallel.h" #include "sql/sql_parse.h" // killall_non_super_threads #include "sql/sql_show_processlist.h" // pfs_processlist_enabled #include "sql/sql_tmp_table.h" // internal_tmp_mem_storage_engine_names @@ -1102,6 +1103,54 @@ static Sys_var_bool Sys_password_require_current( "Current password is needed to be specified in order to change it", GLOBAL_VAR(password_require_current), CMD_LINE(OPT_ARG), DEFAULT(false)); +#ifndef NDEBUG +extern bool dbug_pq_worker_stall; + +static Sys_var_bool Sys_Debug_pq_worker_stall( + "debug_pq_worker_stall", + "PQ worker stall while send date to message queue.", + HINT_UPDATEABLE GLOBAL_VAR(dbug_pq_worker_stall), CMD_LINE(OPT_ARG), + DEFAULT(false)); +#endif + +static Sys_var_bool Sys_sql_force_parallel_execute( + "force_parallel_execute", "force parallel execute in session", + HINT_UPDATEABLE SESSION_VAR(force_parallel_execute), CMD_LINE(OPT_ARG), + DEFAULT(0)); + +static Sys_var_ulonglong Sys_parallel_memory_limit( + "parallel_memory_limit", + "upper limit memory size that parallel query can use", + GLOBAL_VAR(parallel_memory_limit), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(100 * 1024 * 1024), BLOCK_SIZE(IO_SIZE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(NULL)); + +static Sys_var_ulong Sys_parallel_max_threads( + "parallel_max_threads", "max running threads of parallel query.", + GLOBAL_VAR(parallel_max_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(64), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(NULL)); + +static Sys_var_ulong Sys_parallel_cost_threshold( + "parallel_cost_threshold", "Cost threshold for parallel query.", + SESSION_VAR(parallel_cost_threshold), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(1000), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG); + +static Sys_var_ulong Sys_parallel_default_dop( + "parallel_default_dop", "default degree of parallel query.", + SESSION_VAR(parallel_default_dop), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 1024), DEFAULT(4), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG); + +static Sys_var_ulong Sys_parallel_queue_timeout( + "parallel_queue_timeout", + "queue timeout for parallel query when resource is not enough ." + "the unit is microseconds", + SESSION_VAR(parallel_queue_timeout), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG); + /** Checks, if there exists at least a partial revoke on a database at the time