diff --git a/src/base/system_parameter.c b/src/base/system_parameter.c index 898ec19c86..ecf530d163 100644 --- a/src/base/system_parameter.c +++ b/src/base/system_parameter.c @@ -664,6 +664,8 @@ static const char sysprm_ha_conf_file_name[] = "cubrid_ha.conf"; #define PRM_NAME_CTE_MAX_RECURSIONS "cte_max_recursions" +#define PRM_NAME_MAX_PARALLEL_THREAD "max_parallel_thread" + #define PRM_NAME_DWB_SIZE "double_write_buffer_size" #define PRM_NAME_DWB_BLOCKS "double_write_buffer_blocks" #define PRM_NAME_ENABLE_DWB_FLUSH_THREAD "double_write_buffer_enable_flush_thread" @@ -2159,6 +2161,12 @@ static int prm_cte_max_recursions_upper = 1000000; static int prm_cte_max_recursions_lower = 2; static unsigned int prm_cte_max_recursions_flag = 0; +int PRM_MAX_PARALLEL_THREAD = 1; +static int prm_max_parallel_thread_default = 1; +static int prm_max_parallel_thread_upper = 32; +static int prm_max_parallel_thread_lower = 1; +static unsigned int prm_max_parallel_thread_flag = 0; + bool PRM_JSON_LOG_ALLOCATIONS = false; static bool prm_json_log_allocations_default = false; static unsigned int prm_json_log_allocations_flag = 0; @@ -6448,6 +6456,18 @@ SYSPRM_PARAM prm_Def[] = { (void *) &prm_max_subquery_cache_size_lower, (char *) NULL, (DUP_PRM_FUNC) NULL, + (DUP_PRM_FUNC) NULL}, + {PRM_ID_MAX_PARALLEL_THREAD, + PRM_NAME_MAX_PARALLEL_THREAD, + (PRM_FOR_SERVER | PRM_FOR_CLIENT | PRM_USER_CHANGE | PRM_FOR_SESSION), + PRM_INTEGER, + &prm_max_parallel_thread_flag, + (void *) &prm_max_parallel_thread_default, + (void *) &PRM_MAX_PARALLEL_THREAD, + (void *) &prm_max_parallel_thread_upper, + (void *) &prm_max_parallel_thread_lower, + (char *) NULL, + (DUP_PRM_FUNC) NULL, (DUP_PRM_FUNC) NULL} }; diff --git a/src/base/system_parameter.h b/src/base/system_parameter.h index 2ea92fcbe0..5b9aed2f14 100644 --- a/src/base/system_parameter.h +++ b/src/base/system_parameter.h @@ -486,8 +486,9 @@ enum param_id PRM_ID_ENABLE_MEMORY_MONITORING, PRM_ID_MAX_SUBQUERY_CACHE_SIZE, + PRM_ID_MAX_PARALLEL_THREAD, /* change PRM_LAST_ID when adding new system parameters */ - PRM_LAST_ID = PRM_ID_MAX_SUBQUERY_CACHE_SIZE + PRM_LAST_ID = PRM_ID_MAX_PARALLEL_THREAD }; typedef enum param_id PARAM_ID; diff --git a/src/query/list_file.c b/src/query/list_file.c index dc6368e870..b05dae2649 100644 --- a/src/query/list_file.c +++ b/src/query/list_file.c @@ -243,8 +243,8 @@ static void qfile_close_and_free_list_file (THREAD_ENTRY * thread_p, QFILE_LIST_ static QFILE_LIST_ID *qfile_union_list (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id1, QFILE_LIST_ID * list_id2, int flag); -static SORT_STATUS qfile_get_next_sort_item (THREAD_ENTRY * thread_p, RECDES * recdes, void *arg); static int qfile_put_next_sort_item (THREAD_ENTRY * thread_p, const RECDES * recdes, void *arg); +static SORT_STATUS qfile_get_next_sort_item (THREAD_ENTRY * thread_p, RECDES * recdes, void *arg); static SORT_INFO *qfile_initialize_sort_info (SORT_INFO * info, QFILE_LIST_ID * listid, SORT_LIST * sort_list); static void qfile_clear_sort_info (SORT_INFO * info); static int qfile_copy_list_pages (THREAD_ENTRY * thread_p, VPID * old_first_vpidp, QMGR_TEMP_FILE * old_tfile_vfidp, @@ -2230,7 +2230,7 @@ qfile_destroy_list (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id_p) /* because qmgr_free_list_temp_file() destroy only FILE_TEMP file */ if (!VFID_ISNULL (&list_id_p->temp_vfid)) { - file_temp_retire (thread_p, &list_id_p->temp_vfid); + file_temp_retire (thread_p, &list_id_p->temp_vfid, false); } } @@ -3983,6 +3983,9 @@ qfile_sort_list_with_func (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id_p, S int sort_result, estimated_pages; SORT_DUP_OPTION dup_option; + /* The result file must be closed for parallel processing. If not closed, Latch contention may occur. */ + qfile_close_list (thread_p, list_id_p); + srlist_id = qfile_open_list (thread_p, &list_id_p->type_list, sort_list_p, list_id_p->query_id, flag, NULL); if (srlist_id == NULL) { @@ -4005,6 +4008,7 @@ qfile_sort_list_with_func (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id_p, S info.s_id = &s_scan_id; info.output_file = srlist_id; + info.input_file = list_id_p; info.extra_arg = extra_arg; if (get_func == NULL) @@ -4037,9 +4041,23 @@ qfile_sort_list_with_func (THREAD_ENTRY * thread_p, QFILE_LIST_ID * list_id_p, S dup_option = ((option == Q_DISTINCT) ? SORT_ELIM_DUP : SORT_DUP); +#if !defined(NDEBUG) + TSC_TICKS start_tick, end_tick; + TSCTIMEVAL tv_diff; + struct timeval orderby_time; + tsc_getticks (&start_tick); +#endif + sort_result = sort_listfile (thread_p, NULL_VOLID, estimated_pages, get_func, &info, put_func, &info, cmp_func, &info.key_info, - dup_option, limit, srlist_id->tfile_vfid->tde_encrypted); + dup_option, limit, srlist_id->tfile_vfid->tde_encrypted, SORT_ORDER_BY); + +#if !defined(NDEBUG) + tsc_getticks (&end_tick); + tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); + TSC_ADD_TIMEVAL (orderby_time, tv_diff); + printf ("sort_listfile time: %d\n", TO_MSEC (orderby_time)); +#endif if (sort_result < 0) { diff --git a/src/query/list_file.h b/src/query/list_file.h index e843c68827..c64af3cd39 100644 --- a/src/query/list_file.h +++ b/src/query/list_file.h @@ -230,4 +230,5 @@ extern int qfile_get_list_cache_number_of_entries (int ht_no); extern bool qfile_has_no_cache_entries (); + #endif /* _LIST_FILE_H_ */ diff --git a/src/query/query_executor.c b/src/query/query_executor.c index 3e91c06943..10e8221354 100644 --- a/src/query/query_executor.c +++ b/src/query/query_executor.c @@ -4695,7 +4695,7 @@ qexec_groupby (THREAD_ENTRY * thread_p, XASL_NODE * xasl, XASL_STATE * xasl_stat /* sort and aggregate partial results */ if (sort_listfile (thread_p, NULL_VOLID, estimated_pages, &qexec_hash_gby_get_next, &gbstate, &qexec_hash_gby_put_next, &gbstate, cmp_fn, &gbstate.agg_hash_context->sort_key, SORT_DUP, - NO_SORT_LIMIT, gbstate.output_file->tfile_vfid->tde_encrypted) != NO_ERROR) + NO_SORT_LIMIT, gbstate.output_file->tfile_vfid->tde_encrypted, SORT_GROUP_BY) != NO_ERROR) { GOTO_EXIT_ON_ERROR; } @@ -4776,7 +4776,7 @@ qexec_groupby (THREAD_ENTRY * thread_p, XASL_NODE * xasl, XASL_STATE * xasl_stat if (sort_listfile (thread_p, NULL_VOLID, estimated_pages, &qexec_gby_get_next, &gbstate, &qexec_gby_put_next, &gbstate, gbstate.cmp_fn, &gbstate.key_info, SORT_DUP, NO_SORT_LIMIT, - gbstate.output_file->tfile_vfid->tde_encrypted) != NO_ERROR) + gbstate.output_file->tfile_vfid->tde_encrypted, SORT_GROUP_BY) != NO_ERROR) { GOTO_EXIT_ON_ERROR; } @@ -22206,7 +22206,8 @@ qexec_execute_analytic (THREAD_ENTRY * thread_p, XASL_NODE * xasl, XASL_STATE * if (sort_listfile (thread_p, NULL_VOLID, estimated_pages, &qexec_analytic_get_next, &analytic_state, &qexec_analytic_put_next, &analytic_state, analytic_state.cmp_fn, &analytic_state.key_info, - SORT_DUP, NO_SORT_LIMIT, analytic_state.output_file->tfile_vfid->tde_encrypted) != NO_ERROR) + SORT_DUP, NO_SORT_LIMIT, analytic_state.output_file->tfile_vfid->tde_encrypted, + SORT_ANALYTIC) != NO_ERROR) { GOTO_EXIT_ON_ERROR; } diff --git a/src/query/query_manager.c b/src/query/query_manager.c index 4ef4f637db..25f31b33ea 100644 --- a/src/query/query_manager.c +++ b/src/query/query_manager.c @@ -2592,7 +2592,7 @@ qmgr_get_new_page (THREAD_ENTRY * thread_p, VPID * vpid_p, QMGR_TEMP_FILE * tfil if (VFID_ISNULL (&tfile_vfid_p->temp_vfid)) { TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE; - if (file_create_temp (thread_p, 1, &tfile_vfid_p->temp_vfid) != NO_ERROR) + if (file_create_temp (thread_p, 1, &tfile_vfid_p->temp_vfid, false) != NO_ERROR) { ASSERT_ERROR (); return NULL; @@ -2607,7 +2607,7 @@ qmgr_get_new_page (THREAD_ENTRY * thread_p, VPID * vpid_p, QMGR_TEMP_FILE * tfil if (file_apply_tde_algorithm (thread_p, &tfile_vfid_p->temp_vfid, tde_algo) != NO_ERROR) { ASSERT_ERROR (); - file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid); + file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid, false); VFID_SET_NULL (&tfile_vfid_p->temp_vfid); return NULL; } @@ -2871,7 +2871,7 @@ qmgr_create_result_file (THREAD_ENTRY * thread_p, QUERY_ID query_id) { /* query entry is not found */ er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_QPROC_UNKNOWN_QUERYID, 1, query_id); - file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid); + file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid, false); free_and_init (tfile_vfid_p); return NULL; } @@ -2884,7 +2884,7 @@ qmgr_create_result_file (THREAD_ENTRY * thread_p, QUERY_ID query_id) if (file_apply_tde_algorithm (thread_p, &tfile_vfid_p->temp_vfid, tde_algo) != NO_ERROR) { - file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid); + file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid, false); free_and_init (tfile_vfid_p); return NULL; } @@ -2954,7 +2954,7 @@ qmgr_free_temp_file_list (THREAD_ENTRY * thread_p, QMGR_TEMP_FILE * tfile_vfid_p } else { - fd_ret = file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid); + fd_ret = file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid, false); if (fd_ret != NO_ERROR) { /* set error but continue with the destroy process */ @@ -3095,7 +3095,7 @@ qmgr_free_list_temp_file (THREAD_ENTRY * thread_p, QUERY_ID query_id, QMGR_TEMP_ rc = ER_FAILED; } } - else if (file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid) != NO_ERROR) + else if (file_temp_retire (thread_p, &tfile_vfid_p->temp_vfid, false) != NO_ERROR) { /* stop; return error */ rc = ER_FAILED; diff --git a/src/storage/btree_load.c b/src/storage/btree_load.c index 380a3062c7..3f14767f56 100644 --- a/src/storage/btree_load.c +++ b/src/storage/btree_load.c @@ -3217,7 +3217,7 @@ btree_index_sort (THREAD_ENTRY * thread_p, SORT_ARGS * sort_args, SORT_PUT_FUNC return sort_listfile (thread_p, sort_args->hfids[0].vfid.volid, 0 /* TODO - support parallelism */ , &btree_sort_get_next, sort_args, out_func, out_args, compare_driver, sort_args, SORT_DUP, - NO_SORT_LIMIT, includes_tde_class); + NO_SORT_LIMIT, includes_tde_class, SORT_INDEX_LEAF); } /* diff --git a/src/storage/external_sort.c b/src/storage/external_sort.c index 06e17f0835..0634cf03ac 100644 --- a/src/storage/external_sort.c +++ b/src/storage/external_sort.c @@ -50,6 +50,9 @@ #include "server_support.h" #include "thread_entry_task.hpp" #include "thread_manager.hpp" // for thread_get_thread_entry_info and thread_sleep +#include "list_file.h" +#include "query_manager.h" +#include "object_representation.h" #include // XXX: SHOULD BE THE LAST INCLUDE HEADER @@ -64,6 +67,7 @@ * or total output files at each stage of the merging process. */ #define SORT_MAX_HALF_FILES 4 +#define SORT_MAX_TOT_FILES SORT_MAX_HALF_FILES * 2 /* Lower limit on the half of the total number of the temporary files. * The exact lower limit on total number of temp files is twice this number. @@ -72,6 +76,8 @@ */ #define SORT_MIN_HALF_FILES 2 +#define SORT_MAX_PARALLEL 32 + /* Initial size of the dynamic array that keeps the file contents list */ #define SORT_INITIAL_DYN_ARRAY_SIZE 30 @@ -93,6 +99,24 @@ } \ } while (0) +#define IS_PARALLEL_SORT(t) ((t)->px_max_index > 1) + +enum parallel_type +{ + PX_SINGLE = 0, + PX_MAIN_IN_PARALLEL = 1, + PX_THREAD_IN_PARALLEL +}; +typedef enum parallel_type PARALLEL_TYPE; + +enum px_status +{ + PX_ERR_FAILED = -1, + PX_DONE = 0, + PX_PROGRESS +}; +typedef enum px_status PX_STATUS; + typedef struct file_contents FILE_CONTENTS; struct file_contents { /* node of the file_contents linked list */ @@ -119,36 +143,12 @@ struct vol_list VOL_INFO *vol_info; /* array of volume information */ }; -/* Parallel eXecution and communition node */ -typedef struct px_tree_node PX_TREE_NODE; -struct px_tree_node -{ - int px_id; /* node ID */ -#if defined(SERVER_MODE) - int px_status; /* node status; access through px_mtx */ -#endif /* SERVER_MODE */ - - int px_height; /* tournament tree: node level */ - int px_myself; /* tournament tree: node ID */ - - int px_tran_index; - - void *px_arg; /* operation info */ - - char **px_buff; - char **px_vector; - long px_vector_size; - - char **px_result; /* output */ - long px_result_size; /* output */ -}; - typedef struct sort_param SORT_PARAM; struct sort_param { - VFID temp[2 * SORT_MAX_HALF_FILES]; /* Temporary file identifiers */ + VFID temp[SORT_MAX_TOT_FILES]; /* Temporary file identifiers */ VFID multipage_file; /* Temporary file for multi page sorting records */ - FILE_CONTENTS file_contents[2 * SORT_MAX_HALF_FILES]; /* Contents of each temporary file */ + FILE_CONTENTS file_contents[SORT_MAX_TOT_FILES]; /* Contents of each temporary file */ bool tde_encrypted; /* whether related temp files are encrypted (TDE) or not */ @@ -168,6 +168,10 @@ struct sort_param void *cmp_arg; SORT_DUP_OPTION option; + /* input function to apply on temporary records */ + SORT_GET_FUNC *get_fn; + void *get_arg; + /* output function to apply on temporary records */ SORT_PUT_FUNC *put_fn; void *put_arg; @@ -178,13 +182,19 @@ struct sort_param /* Details about the "limit" clause */ int limit; + /* total number of recordes */ + unsigned int total_numrecs; + /* support parallelism */ + int px_max_index; + PX_STATUS px_status; + int px_result_file_idx; + int px_tran_index; + SORT_PARALLEL_TYPE px_type; #if defined(SERVER_MODE) - pthread_mutex_t px_mtx; /* px_node status mutex */ + pthread_mutex_t *px_mtx; /* px_status mutex */ + pthread_cond_t *complete_cond; /* complete condition */ #endif - int px_height_max; /* px_node tournament tree max level */ - int px_array_size; /* px_node array size */ - PX_TREE_NODE *px_array; /* px_node array */ }; typedef struct sort_rec_list SORT_REC_LIST; @@ -240,27 +250,47 @@ struct sort_stack SRUN *srun; }; +typedef struct result_run RESULT_RUN; +struct result_run +{ + VFID temp_file; + int num_pages; +}; + typedef void FIND_RUN_FN (char **, long *, SORT_STACK *, long, SORT_CMP_FUNC *, void *); typedef void MERGE_RUN_FN (char **, char **, SORT_STACK *, SORT_CMP_FUNC *, void *); #if !defined(NDEBUG) static int sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg); #endif -static PX_TREE_NODE *px_sort_assign (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int px_id, char **px_buff, - char **px_vector, long px_vector_size, int px_height, int px_myself); -static int px_sort_myself (THREAD_ENTRY * thread_p, PX_TREE_NODE * px_node); -#if defined(SERVER_MODE) -static int px_sort_communicate (PX_TREE_NODE * px_node); -#endif +/* start parallel sort */ +static void sort_listfile_execute (cubthread::entry & thread_ref, SORT_PARAM * sort_param); +static int sort_copy_sort_param (THREAD_ENTRY * thread_p, SORT_PARAM * dest_param, SORT_PARAM * src_param, + int parallel_num); +static int sort_split_input_temp_file (THREAD_ENTRY * thread_p, SORT_PARAM * dest_param, SORT_PARAM * src_param, + int parallel_num); +static int sort_merge_run_for_parallel (THREAD_ENTRY * thread_p, SORT_PARAM * dest_param, SORT_PARAM * src_param, + int parallel_num); +static int sort_merge_nruns (THREAD_ENTRY * thread_p, RESULT_RUN * result_run, SORT_PARAM * sort_param, int first_idx, + int remaining_run, int level); +static int sort_check_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); +static int sort_start_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * dest_param, SORT_PARAM * src_param, + int parallel_num); +static int sort_end_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * dest_param, SORT_PARAM * src_param, + int parallel_num); +/* end parallel sort */ + +static int sort_listfile_internal (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); static int sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_next, void *arguments, unsigned int *total_numrecs); static int sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); static int sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); +static int sort_put_result_from_tmpfile (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); static int sort_get_avg_numpages_of_nonempty_tmpfile (SORT_PARAM * sort_param); -static void sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param); +static void sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, PARALLEL_TYPE parallel_type); static int sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc, - bool tde_encrypted); + bool tde_encrypted, bool is_parallel); static int sort_write_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start, bool tde_encrypted); @@ -1347,19 +1377,21 @@ sort_run_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, char **base, lo int sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GET_FUNC * get_fn, void *get_arg, SORT_PUT_FUNC * put_fn, void *put_arg, SORT_CMP_FUNC * cmp_fn, void *cmp_arg, SORT_DUP_OPTION option, - int limit, bool includes_tde_class) + int limit, bool includes_tde_class, SORT_PARALLEL_TYPE parallel_type) { + int error = NO_ERROR; SORT_PARAM *sort_param = NULL; - bool prm_enable_sort_parallel = false; /* TODO - PRM_SORT_PARALLEL_SORT */ INT32 input_pages; int i; - int file_pg_cnt_est; - unsigned int total_numrecs = 0; + + /* for parallel sort */ + SORT_PARAM px_sort_param[SORT_MAX_PARALLEL]; /* TO_DO : need dynamic alloc */ + int parallel_num = 1; /* TO_DO : depending on the number of pages in the temp file */ #if defined(SERVER_MODE) - int num_cpus; - int rv; -#endif /* SERVER_MODE */ + pthread_mutex_t px_mtx; /* px_status mutex */ + pthread_cond_t complete_cond; /* complete condition */ +#endif thread_set_sort_stats_active (thread_p, true); @@ -1376,22 +1408,31 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE } #if defined(SERVER_MODE) - rv = pthread_mutex_init (&(sort_param->px_mtx), NULL); - if (rv != 0) + if (pthread_mutex_init (&px_mtx, NULL) != 0) { error = ER_CSS_PTHREAD_MUTEX_INIT; er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0); - free_and_init (sort_param); - return error; } + if (pthread_cond_init (&complete_cond, NULL) != 0) + { + error = ER_CSS_PTHREAD_COND_INIT; + er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0); + free_and_init (sort_param); + return error; + } + sort_param->px_mtx = &px_mtx; + sort_param->complete_cond = &complete_cond; #endif /* SERVER_MODE */ sort_param->cmp_fn = cmp_fn; sort_param->cmp_arg = cmp_arg; sort_param->option = option; + sort_param->get_fn = get_fn; + sort_param->get_arg = get_arg; + sort_param->put_fn = put_fn; sort_param->put_arg = put_arg; @@ -1399,14 +1440,12 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE sort_param->tot_tempfiles = 0; /* initialize memory allocable fields */ - for (i = 0; i < 2 * SORT_MAX_HALF_FILES; i++) + for (i = 0; i < SORT_MAX_TOT_FILES; i++) { sort_param->temp[i].volid = NULL_VOLID; sort_param->file_contents[i].num_pages = NULL; } sort_param->internal_memory = NULL; - sort_param->px_height_max = sort_param->px_array_size = 0; - sort_param->px_array = NULL; /* initialize temp. overflow file. Real value will be assigned in sort_inphase_sort function, if long size sorting * records are encountered. */ @@ -1452,14 +1491,13 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE sort_param->tot_tempfiles = sort_param->half_files << 1; sort_param->in_half = 0; - for (i = 0; i < sort_param->tot_tempfiles; i++) + for (i = 0; i < SORT_MAX_TOT_FILES; i++) { /* Initilize temporary file identifier; real value will be set in "sort_add_new_file () */ sort_param->temp[i].volid = NULL_VOLID; /* Initilize file contents list */ - sort_param->file_contents[i].num_pages = - (int *) db_private_alloc (thread_p, SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int)); + sort_param->file_contents[i].num_pages = (int *) malloc (SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int)); if (sort_param->file_contents[i].num_pages == NULL) { sort_param->tot_tempfiles = i; @@ -1477,37 +1515,196 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE sort_param->tmp_file_pgs = MAX (1, sort_param->tmp_file_pgs); sort_param->tde_encrypted = includes_tde_class; + sort_param->px_type = parallel_type; + +#if defined(SERVER_MODE) + /* check the number of parallel process */ + parallel_num = sort_check_parallelism (thread_p, sort_param); + + if (parallel_num <= 1) + { + /* single process */ + sort_param->px_max_index = 1; + error = sort_listfile_internal (thread_p, sort_param); + } + else + { + /* parallel process */ + error = sort_start_parallelism (thread_p, px_sort_param, sort_param, parallel_num); + if (error != NO_ERROR) + { + goto cleanup; + } + + /* execute parallel sort */ + // *INDENT-OFF* + for (int i = 0; i < parallel_num; i++) + { + cubthread::entry_callable_task * task = + new cubthread::entry_callable_task (std:: + bind (sort_listfile_execute, std::placeholders::_1, &px_sort_param[i])); + css_push_external_task (css_get_current_conn_entry (), task); + } + // *INDENT-ON* + + /* wait for threads */ + pthread_mutex_lock (sort_param->px_mtx); + while (1) + { + int done = true; + for (int i = 0; i < parallel_num; i++) + { + if (px_sort_param[i].px_status == PX_PROGRESS) + { + done = false; + break; + } + else if (px_sort_param[i].px_status == PX_ERR_FAILED) + { + error = ER_FAILED; + } + } + if (done) + { + break; + } + pthread_cond_wait (sort_param->complete_cond, sort_param->px_mtx); + } + pthread_mutex_unlock (sort_param->px_mtx); + if (error != NO_ERROR) + { + goto cleanup; + } - sort_param->px_height_max = 0; /* init */ - sort_param->px_array_size = 1; /* init */ + error = sort_end_parallelism (thread_p, px_sort_param, sort_param, parallel_num); + if (error != NO_ERROR) + { + goto cleanup; + } + } - /* TODO - currently, disable parallelism */ - prm_enable_sort_parallel = false; +#else + /* single process for stand alone mode */ + sort_param->px_max_index = 1; + parallel_num = 1; + error = sort_listfile_internal (thread_p, sort_param); +#endif - tde_er_log ("sort_listfile(): tde_encrypted = %d\n", sort_param->tde_encrypted); +cleanup: +#if defined(ENABLE_SYSTEMTAP) + CUBRID_SORT_END (sort_param->total_numrecs, error); +#endif /* ENABLE_SYSTEMTAP */ #if defined(SERVER_MODE) - if (prm_enable_sort_parallel == true) + pthread_mutex_destroy (&px_mtx); + pthread_cond_destroy (&complete_cond); +#endif + + /* free sort_param */ + if (parallel_num > 1) + { + for (int i = 0; i < parallel_num; i++) + { + sort_return_used_resources (thread_p, &px_sort_param[i], PX_THREAD_IN_PARALLEL); + } + + sort_return_used_resources (thread_p, sort_param, PX_MAIN_IN_PARALLEL); + } + else { - /* TODO - calc n, 2^^n get the number of CPUs TODO - fileio_os_sysconf really get #cores instead of #CPUs - - * NEED MORE CONSIDERATION */ - num_cpus = fileio_os_sysconf (); + sort_return_used_resources (thread_p, sort_param, PX_SINGLE); + } + + thread_set_sort_stats_active (thread_p, false); + + return error; +} - sort_param->px_height_max = (int) sqrt ((double) num_cpus); /* n */ - sort_param->px_array_size = num_cpus; /* 2^^n */ +#if defined(SERVER_MODE) +// *INDENT-OFF* +static void +sort_listfile_execute (cubthread::entry &thread_ref, SORT_PARAM * sort_param) +{ + QFILE_LIST_SCAN_ID t_scan_id; + THREAD_ENTRY * thread_p = &thread_ref; + + thread_ref.tran_index = sort_param->px_tran_index; + pthread_mutex_unlock (&thread_ref.tran_index_lock); + + thread_p->push_resource_tracks (); + +#if !defined(NDEBUG) + TSC_TICKS start_tick, end_tick; + TSCTIMEVAL tv_diff; + struct timeval orderby_time; + tsc_getticks (&start_tick); +#endif + + if (sort_param->px_type == SORT_ORDER_BY) + { + SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg; - assert_release (sort_param->px_array_size == pow ((double) 2, (double) sort_param->px_height_max)); + /* open splitted temp file for read */ + if (qfile_open_list_scan (sort_info_p->input_file, &t_scan_id) != NO_ERROR) + { + sort_param->px_status = PX_ERR_FAILED; + goto cleanup; + } + sort_info_p->s_id->s_id = &t_scan_id; } -#endif /* SERVER_MODE */ + else + { + /* Not implemented yet */ + sort_param->px_status = PX_ERR_FAILED; + goto cleanup; + } + + sort_listfile_internal (&thread_ref, sort_param); - sort_param->px_array = (PX_TREE_NODE *) malloc (sort_param->px_array_size * sizeof (PX_TREE_NODE)); - if (sort_param->px_array == NULL) + if (sort_param->px_type == SORT_ORDER_BY) { - error = ER_OUT_OF_VIRTUAL_MEMORY; - er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (sort_param->px_array_size * sizeof (PX_TREE_NODE))); + SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg; + + /* close splitted temp file for read */ + qfile_close_scan (&thread_ref, sort_info_p->s_id->s_id); + } + else + { + /* Not implemented yet */ + sort_param->px_status = PX_ERR_FAILED; goto cleanup; } +#if !defined(NDEBUG) + tsc_getticks (&end_tick); + tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); + TSC_ADD_TIMEVAL (orderby_time, tv_diff); + printf ("thread %d done, time: %d\n",thread_p->index, TO_MSEC (orderby_time)); +#endif + + /* done */ + pthread_mutex_lock(sort_param->px_mtx); + sort_param->px_status = PX_DONE; + pthread_cond_signal(sort_param->complete_cond); + pthread_mutex_unlock(sort_param->px_mtx); + +cleanup: + thread_p->pop_resource_tracks (); +} +// *INDENT-ON* +#endif + +/* + * sort_listfile_internal () - Perform sorting + * return: + */ +int +sort_listfile_internal (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) +{ + int error = NO_ERROR; + int file_pg_cnt_est; + int i; + /* * Don't allocate any temp files yet, since we may not need them. * We'll allocate them on the fly as the need arises. @@ -1516,14 +1713,32 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE * space that is going to be needed. */ - error = sort_inphase_sort (thread_p, sort_param, get_fn, get_arg, &total_numrecs); +#if !defined(NDEBUG) + TSC_TICKS start_tick, end_tick; + TSC_TICKS start_tick2, end_tick2; + TSCTIMEVAL tv_diff, tv_diff2; + struct timeval orderby_time, orderby_time2; + tsc_getticks (&start_tick); +#endif + + error = sort_inphase_sort (thread_p, sort_param, sort_param->get_fn, sort_param->get_arg, &sort_param->total_numrecs); if (error != NO_ERROR) { - goto cleanup; + return error; } - if (sort_param->tot_runs > 1) +#if !defined(NDEBUG) + tsc_getticks (&end_tick); + tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); + TSC_ADD_TIMEVAL (orderby_time, tv_diff); + printf ("sort_inphase_sort time: %d\n", TO_MSEC (orderby_time)); + + tsc_getticks (&start_tick2); +#endif + + if (sort_param->tot_runs > 1 || IS_PARALLEL_SORT (sort_param)) { + assert (sort_param->tot_runs > 0); /* Create output temporary files make file and temporary volume page count estimates */ file_pg_cnt_est = sort_get_avg_numpages_of_nonempty_tmpfile (sort_param); file_pg_cnt_est = MAX (1, file_pg_cnt_est); @@ -1531,10 +1746,11 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++) { error = - sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted); + sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted, + IS_PARALLEL_SORT (sort_param)); if (error != NO_ERROR) { - goto cleanup; + return error; } } @@ -1549,15 +1765,12 @@ sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GE } } /* if (sort_param->tot_runs > 1) */ -cleanup: - -#if defined(ENABLE_SYSTEMTAP) - CUBRID_SORT_END (total_numrecs, error); -#endif /* ENABLE_SYSTEMTAP */ - - sort_return_used_resources (thread_p, sort_param); - sort_param = NULL; - thread_set_sort_stats_active (thread_p, false); +#if !defined(NDEBUG) + tsc_getticks (&end_tick2); + tsc_elapsed_time_usec (&tv_diff2, end_tick2, start_tick2); + TSC_ADD_TIMEVAL (orderby_time2, tv_diff2); + printf ("sort_exphase_merge time: %d\n", TO_MSEC (orderby_time2)); +#endif return error; } @@ -1602,635 +1815,88 @@ sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg #endif /* - * px_sort_assign() - + * sort_inphase_sort () - Internal sorting phase * return: - * thread_p(in): * sort_param(in): sort parameters - * px_id(in): - * px_buff(in): - * px_vector(in): - * px_vector_size(in): - * px_height(in): - * px_myself(in): + * get_fn(in): user-supplied function: provides the temporary record for + * the given input record + * get_arg(in): arguments for get_fn + * total_numrecs(out): records sorted * - * NOTE: support parallelism */ -static PX_TREE_NODE * -px_sort_assign (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int px_id, char **px_buff, char **px_vector, - long px_vector_size, int px_height, int px_myself) +static int +sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_fn, void *get_arg, + unsigned int *total_numrecs) { - PX_TREE_NODE *px_node; -#if defined(SERVER_MODE) - int rv = NO_ERROR; -#endif - - assert (sort_param != NULL); - - if (px_height < 0 || px_height > sort_param->px_height_max) - { - assert_release (false); - return NULL; - } + /* Variables for the input file */ + SORT_STATUS status; - if (px_id < 0 || px_id >= sort_param->px_array_size) - { - assert_release (false); - return NULL; - } + /* Variables for the current output file */ + int out_curfile; + char *output_buffer; + int cur_page[SORT_MAX_HALF_FILES]; - if (px_myself < 0 || px_myself > px_id) - { - assert_release (false); - return NULL; - } + /* Variables for the internal memory */ + RECDES temp_recdes; + RECDES long_recdes; /* Record desc. for reading in long sorting records */ + char *item_ptr; /* Pointer to the first free location of the temp. records region of internal memory */ + long numrecs; /* Number of records kept in the internal memory */ + long sort_numrecs; /* Number of sort records kept in the internal memory */ + bool once_flushed = false; + long saved_numrecs; + char **saved_index_area; + char **index_area; /* Part of internal memory keeping the addresses of records */ + char **index_buff; /* buffer area to sort indexes. */ + int i; + int error = NO_ERROR; - px_node = &(sort_param->px_array[px_id]); +#if defined (SERVER_MODE) + int rv = NO_ERROR; +#endif /* SERVER_MODE */ -#if defined(SERVER_MODE) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); + assert (sort_param->half_files <= SORT_MAX_HALF_FILES); -#if !defined(NDEBUG) - if (px_node->px_status != 0) + /* Initialize the current pages of all temp files to 0 */ + for (i = 0; i < sort_param->half_files; i++) { - assert (false); - pthread_mutex_unlock (&(sort_param->px_mtx)); - return NULL; + cur_page[i] = 0; } -#endif - - px_node->px_status = 0; - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#else /* SERVER_MODE */ - assert (sort_param->px_height_max == 0); - assert (sort_param->px_array_size == 1); - - assert (px_id == 0); - assert (px_height == 0); - assert (px_myself == 0); -#endif /* SERVER_MODE */ - /* set node info */ - - px_node->px_id = px_id; - - px_node->px_height = px_height; - px_node->px_myself = px_myself; - - px_node->px_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p); - - /* set operation info */ + sort_param->tot_runs = 0; + out_curfile = sort_param->in_half; - px_node->px_arg = (void *) sort_param; + output_buffer = sort_param->internal_memory + ((long) (sort_param->tot_buffers - 1) * DB_PAGESIZE); + assert (output_buffer > sort_param->internal_memory); - px_node->px_buff = px_buff; - px_node->px_vector = px_vector; - px_node->px_vector_size = px_vector_size; + numrecs = 0; + sort_numrecs = 0; + saved_numrecs = 0; + *total_numrecs = 0; + saved_index_area = NULL; + item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE; + index_area = (char **) (output_buffer - sizeof (char *)); + index_buff = index_area - 1; + temp_recdes.area_size = SORT_MAXREC_LENGTH; + temp_recdes.length = 0; - px_node->px_result = px_node->px_vector; /* init */ - px_node->px_result_size = px_node->px_vector_size; /* init */ + long_recdes.area_size = 0; + long_recdes.data = NULL; - return px_node; -} - -#if defined(SERVER_MODE) -// *INDENT-OFF* -static void -px_sort_myself_execute (cubthread::entry &thread_ref, PX_TREE_NODE * px_node) -{ - (void) px_sort_myself (&thread_ref, px_node); -} - -/* - * px_sort_communicate() - - * return: - * thread_p(in): - * px_node(in): - * - * NOTE: support parallelism - */ -static int -px_sort_communicate (PX_TREE_NODE * px_node) -{ - SORT_PARAM *sort_param; - - assert_release (px_node != NULL); - assert_release (px_node->px_arg != NULL); - - sort_param = (SORT_PARAM *) (px_node->px_arg); - assert_release (px_node->px_height <= sort_param->px_height_max); - assert_release (px_node->px_id < sort_param->px_array_size); - assert_release (px_node->px_vector_size > 1); - - cubthread::entry_callable_task *task = - new cubthread::entry_callable_task (std::bind (px_sort_myself_execute, std::placeholders::_1, px_node)); - css_push_external_task (css_get_current_conn_entry (), task); - - return NO_ERROR; -} -// *INDENT-ON* -#endif /* SERVER_MODE */ - -/* - * px_sort_myself() - - * return: - * thread_p(in): - * px_node(in): - * - * NOTE: support parallelism - * - * Partitioned merge logic - * - * The working core: each internal node recurses on this function - * both for its left side and its right side, as nodes one closer to - * the leaf level. It then merges the results into the vector - * - * Leaf level nodes just sort the vector. - */ -static int -px_sort_myself (THREAD_ENTRY * thread_p, PX_TREE_NODE * px_node) -{ -#define SORT_PARTITION_RUN_SIZE_MIN (ONE_M) - - int ret = NO_ERROR; - bool old_check_interrupt; - -#if defined(SERVER_MODE) - int parent; - int child_right = 0; - int child_height; - - int cmp; - - int rv = NO_ERROR; -#endif /* SERVER_MODE */ - - SORT_PARAM *sort_param; - - char **buff; - char **vector; - long vector_size; - - char **result = NULL; - long result_size = -1; - - assert_release (px_node != NULL); - assert_release (px_node->px_id >= 0); - assert_release (px_node->px_arg != NULL); - - if (thread_p == NULL) - { - thread_p = thread_get_thread_entry_info (); - } - - sort_param = (SORT_PARAM *) (px_node->px_arg); - -#if defined(SERVER_MODE) - if (px_node->px_id > 0) - { - /* is new childs */ - thread_p->tran_index = px_node->px_tran_index; - pthread_mutex_unlock (&thread_p->tran_index_lock); - } - -#if !defined(NDEBUG) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert (px_node->px_status == 0); - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#endif -#endif /* SERVER_MODE */ - - old_check_interrupt = logtb_set_check_interrupt (thread_p, false); - - buff = px_node->px_buff; - vector = px_node->px_vector; - vector_size = px_node->px_vector_size; - - assert_release (px_node->px_result == vector); - assert_release (px_node->px_result_size == vector_size); - - result = px_node->px_result; - result_size = px_node->px_result_size; - - assert_release (vector_size > 0); - -#if defined(SERVER_MODE) - parent = px_node->px_id & ~(1 << px_node->px_height); - child_height = px_node->px_height - 1; - if (child_height >= 0) - { - child_right = px_node->px_myself | (1 << child_height); - assert_release (child_right > 0); - } - - if (vector_size <= 1) - { - assert_release (px_node->px_result == vector); - assert_release (px_node->px_result_size == vector_size); - - result = px_node->px_result = vector; - result_size = px_node->px_result_size = vector_size; - - /* mark as finished */ - - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert_release (px_node->px_status == 0); - px_node->px_status = 1; /* done */ - - pthread_mutex_unlock (&(sort_param->px_mtx)); - - goto exit_on_end; - } - - if (px_node->px_height > 0 && vector_size > SORT_PARTITION_RUN_SIZE_MIN) - { - long left_vector_size, right_vector_size; - char **left_vector, **right_vector; - PX_TREE_NODE *left_px_node, *right_px_node; - int i, j, k; /* Used in the merge logic */ - - assert_release (child_right > 0); - - left_vector_size = vector_size / 2; - right_vector_size = vector_size - left_vector_size; - - assert_release (vector_size == left_vector_size + right_vector_size); - - /* do new child first */ - right_vector = vector + left_vector_size; - right_px_node = px_sort_assign (thread_p, sort_param, px_node->px_id + child_right, buff + left_vector_size, - right_vector, right_vector_size, child_height, 0 /* px_myself: set as root */ ); - if (right_px_node == NULL) - { - goto exit_on_error; - } - - if (right_vector_size > 1) - { - /* launch new worker */ - if (px_sort_communicate (right_px_node) != NO_ERROR) - { - goto exit_on_error; - } - } - else - { - /* mark as finished */ - - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert_release (right_px_node->px_status == 0); - right_px_node->px_status = 1; /* done */ - - pthread_mutex_unlock (&(sort_param->px_mtx)); - } - - left_vector = vector; - left_px_node = - px_sort_assign (thread_p, sort_param, px_node->px_id, buff, left_vector, left_vector_size, child_height, - px_node->px_myself); - if (left_px_node == NULL) - { - goto exit_on_error; - } - - assert_release (px_node == left_px_node); -#if !defined(NDEBUG) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert (px_node->px_status == 0); - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#endif - - if (left_vector_size > 1) - { - if (px_sort_myself (thread_p, left_px_node) != NO_ERROR) - { - goto exit_on_error; - } - } - - /* wait for right-child finished */ - do - { - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - if (right_px_node->px_status != 0) - { - assert (right_px_node->px_status == 1); - pthread_mutex_unlock (&(sort_param->px_mtx)); - break; - } - - pthread_mutex_unlock (&(sort_param->px_mtx)); - thread_sleep (10); /* 10 msec */ - } - while (1); - - assert_release (px_node == left_px_node); -#if !defined(NDEBUG) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert (right_px_node->px_status == 1); - assert (px_node->px_status == 0); - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#endif - - right_vector = right_px_node->px_result; - right_vector_size = right_px_node->px_result_size; - if (right_vector == NULL || right_vector_size < 0) - { - goto exit_on_error; - } - - left_vector = left_px_node->px_result; - left_vector_size = left_px_node->px_result_size; - if (left_vector == NULL || left_vector_size < 0) - { - goto exit_on_error; - } - - assert_release (vector_size >= left_vector_size + right_vector_size); - - result_size = px_node->px_result_size = left_vector_size + right_vector_size; - if (left_vector < vector) - { - assert_release (left_vector + left_vector_size <= vector); - result = px_node->px_result = vector; - } - else - { - result = px_node->px_result = buff; - } - - /* Merge the two sub-results back into upper result */ - - i = j = k = 0; - - /* STEP 2: check CON conditions if (left_max < right_min) do FORWARD-CON. we use '<' instead of '<=' */ - cmp = (*(sort_param->cmp_fn)) (&(left_vector[left_vector_size - 1]), &(right_vector[0]), sort_param->cmp_arg); - - if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT) - { - ; /* ok */ - } - else - { - assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT); - goto exit_on_error; - } - - if (cmp == DB_LT) - { - while (i < left_vector_size) - { - result[k++] = left_vector[i++]; - } - while (j < right_vector_size) - { - result[k++] = right_vector[j++]; - } - } - else - { - /* STEP 3: check CON conditions if (right_max < left_min) do BACKWARD-CON. we use '<' instead of '<=' */ - cmp = - (*(sort_param->cmp_fn)) (&(right_vector[right_vector_size - 1]), &(left_vector[0]), sort_param->cmp_arg); - - if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT) - { - ; /* ok */ - } - else - { - assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT); - goto exit_on_error; - } - - if (cmp == DB_LT) - { - while (j < right_vector_size) - { - result[k++] = right_vector[j++]; - } - while (i < left_vector_size) - { - result[k++] = left_vector[i++]; - } - } - else - { - /* STEP 4: do the actual merge */ - while (i < left_vector_size && j < right_vector_size) - { - cmp = (*(sort_param->cmp_fn)) (&(left_vector[i]), &(right_vector[j]), sort_param->cmp_arg); - - if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT) - { - ; /* ok */ - } - else - { - assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT); - goto exit_on_error; - } - - if (cmp == DB_EQ) - { - if (sort_param->option == SORT_DUP) /* allow duplicate */ - { - sort_append (&(left_vector[i]), &(right_vector[j])); - } - - result[k++] = right_vector[j++]; /* temp */ - i++; /* skip left-side dup */ - } - else if (cmp == DB_GT) - { - result[k++] = right_vector[j++]; - } - else - { - assert_release (cmp == DB_LT); - result[k++] = left_vector[i++]; - } - } - while (i < left_vector_size) - { - result[k++] = left_vector[i++]; - } - while (j < right_vector_size) - { - result[k++] = right_vector[j++]; - } - } /* else */ - } /* else */ - - assert_release (result_size >= k); - - result_size = px_node->px_result_size = k; - -#if !defined(NDEBUG) - if (sort_validate (result, result_size, sort_param->cmp_fn, sort_param->cmp_arg) != NO_ERROR) - { - goto exit_on_error; - } -#endif - } - else - { - result = px_node->px_result = sort_run_sort (thread_p, sort_param, vector, vector_size, 0 /* dummy */ , - buff, &(px_node->px_result_size)); - result_size = px_node->px_result_size; - } - -#else /* SERVER_MODE */ - - result = px_node->px_result = sort_run_sort (thread_p, sort_param, vector, vector_size, 0 /* dummy */ , - buff, &(px_node->px_result_size)); - result_size = px_node->px_result_size; - -#endif /* SERVER_MODE */ - - if (result == NULL || result_size < 0) - { - assert_release (false); - goto exit_on_error; - } - -exit_on_end: - - assert_release (result == px_node->px_result); - assert_release (result_size == px_node->px_result_size); - -#if defined(SERVER_MODE) - if (parent != px_node->px_id) - { - /* mark as finished */ - - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - assert_release (px_node->px_status == 0); - px_node->px_status = 1; /* done */ - - pthread_mutex_unlock (&(sort_param->px_mtx)); - } -#endif /* SERVER_MODE */ - - (void) logtb_set_check_interrupt (thread_p, old_check_interrupt); - - return ret; - -exit_on_error: - - result = px_node->px_result = NULL; - result_size = px_node->px_result_size = -1; - - ret = (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret; - - goto exit_on_end; -} - -/* - * sort_inphase_sort () - Internal sorting phase - * return: - * sort_param(in): sort parameters - * get_fn(in): user-supplied function: provides the temporary record for - * the given input record - * get_arg(in): arguments for get_fn - * total_numrecs(out): records sorted - * - */ -static int -sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_fn, void *get_arg, - unsigned int *total_numrecs) -{ - /* Variables for the input file */ - SORT_STATUS status; - - /* Variables for the current output file */ - int out_curfile; - char *output_buffer; - int cur_page[SORT_MAX_HALF_FILES]; - - /* Variables for the internal memory */ - RECDES temp_recdes; - RECDES long_recdes; /* Record desc. for reading in long sorting records */ - char *item_ptr; /* Pointer to the first free location of the temp. records region of internal memory */ - long numrecs; /* Number of records kept in the internal memory */ - long sort_numrecs; /* Number of sort records kept in the internal memory */ - bool once_flushed = false; - long saved_numrecs; - char **saved_index_area; - char **index_area; /* Part of internal memory keeping the addresses of records */ - char **index_buff; /* buffer area to sort indexes. */ - int i; - int error = NO_ERROR; - - PX_TREE_NODE *px_node; -#if defined (SERVER_MODE) - int rv = NO_ERROR; -#endif /* SERVER_MODE */ - - assert (sort_param->half_files <= SORT_MAX_HALF_FILES); - - assert (sort_param->px_height_max >= 0); - assert (sort_param->px_array_size >= 1); - - /* Initialize the current pages of all temp files to 0 */ - for (i = 0; i < sort_param->half_files; i++) - { - cur_page[i] = 0; - } - - sort_param->tot_runs = 0; - out_curfile = sort_param->in_half; - - output_buffer = sort_param->internal_memory + ((long) (sort_param->tot_buffers - 1) * DB_PAGESIZE); - assert (output_buffer > sort_param->internal_memory); - - numrecs = 0; - sort_numrecs = 0; - saved_numrecs = 0; - *total_numrecs = 0; - saved_index_area = NULL; - item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE; - index_area = (char **) (output_buffer - sizeof (char *)); - index_buff = index_area - 1; - temp_recdes.area_size = SORT_MAXREC_LENGTH; - temp_recdes.length = 0; - - long_recdes.area_size = 0; - long_recdes.data = NULL; - - for (;;) - { - if ((char *) index_buff < item_ptr) - { - /* Internal memory is already full */ - status = SORT_REC_DOESNT_FIT; - } - else - { - /* Internal memory is not full; try to get the next item */ - temp_recdes.data = item_ptr; - if (((int) ((char *) index_buff - item_ptr)) < SORT_MAXREC_LENGTH) - { - temp_recdes.area_size = (int) ((char *) index_buff - item_ptr) - (4 * sizeof (char *)); - } + for (;;) + { + if ((char *) index_buff < item_ptr) + { + /* Internal memory is already full */ + status = SORT_REC_DOESNT_FIT; + } + else + { + /* Internal memory is not full; try to get the next item */ + temp_recdes.data = item_ptr; + if (((int) ((char *) index_buff - item_ptr)) < SORT_MAXREC_LENGTH) + { + temp_recdes.area_size = (int) ((char *) index_buff - item_ptr) - (4 * sizeof (char *)); + } if (temp_recdes.area_size <= SSIZEOF (SORT_REC)) { @@ -2262,46 +1928,9 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU index_area++; - if (sort_numrecs == 0) - { - assert (sort_param->px_height_max >= 0); - assert (sort_param->px_array_size >= 1); -#if defined(SERVER_MODE) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - for (i = 0; i < sort_param->px_array_size; i++) - { - sort_param->px_array[i].px_status = 0; /* init */ - } - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#endif /* SERVER_MODE */ - - px_node = px_sort_assign (thread_p, sort_param, 0, index_buff, index_area, numrecs, - sort_param->px_height_max, 0 /* px_myself: set as root */ ); - if (px_node == NULL) - { - error = ER_FAILED; - goto exit_on_error; - } - - error = px_sort_myself (thread_p, px_node); - if (error != NO_ERROR) - { - goto exit_on_error; - } - - index_area = px_node->px_result; - numrecs = px_node->px_result_size; - *total_numrecs += numrecs; - } - else - { - index_area = - sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs); - *total_numrecs += numrecs; - } + index_area = + sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs); + *total_numrecs += numrecs; if (index_area == NULL || numrecs < 0) { @@ -2432,7 +2061,7 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU /* Create the multipage file */ sort_param->multipage_file.volid = sort_param->temp[0].volid; - error = file_create_temp (thread_p, 1, &sort_param->multipage_file); + error = file_create_temp (thread_p, 1, &sort_param->multipage_file, IS_PARALLEL_SORT (sort_param)); if (error != NO_ERROR) { ASSERT_ERROR (); @@ -2444,7 +2073,7 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU } if (file_apply_tde_algorithm (thread_p, &sort_param->multipage_file, tde_algo) != NO_ERROR) { - file_temp_retire (thread_p, &sort_param->multipage_file); + file_temp_retire (thread_p, &sort_param->multipage_file, IS_PARALLEL_SORT (sort_param)); ASSERT_ERROR (); goto exit_on_error; } @@ -2517,45 +2146,8 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU index_area++; - if (sort_numrecs == 0) - { - assert (sort_param->px_height_max >= 0); - assert (sort_param->px_array_size >= 1); -#if defined(SERVER_MODE) - rv = pthread_mutex_lock (&(sort_param->px_mtx)); - assert (rv == NO_ERROR); - - for (i = 0; i < sort_param->px_array_size; i++) - { - sort_param->px_array[i].px_status = 0; /* init */ - } - - pthread_mutex_unlock (&(sort_param->px_mtx)); -#endif /* SERVER_MODE */ - - px_node = px_sort_assign (thread_p, sort_param, 0, index_buff, index_area, numrecs, sort_param->px_height_max, - 0 /* px_myself: set as root */ ); - if (px_node == NULL) - { - error = ER_FAILED; - goto exit_on_error; - } - - if (px_sort_myself (thread_p, px_node) != NO_ERROR) - { - error = ER_FAILED; - goto exit_on_error; - } - - index_area = px_node->px_result; - numrecs = px_node->px_result_size; - *total_numrecs += numrecs; - } - else - { - index_area = sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs); - *total_numrecs += numrecs; - } + index_area = sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs); + *total_numrecs += numrecs; if (index_area == NULL || numrecs < 0) { @@ -2563,7 +2155,7 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU goto exit_on_error; } - if (sort_param->tot_runs > 0) + if (sort_param->tot_runs > 0 || IS_PARALLEL_SORT (sort_param)) { /* There has been other runs produced already */ @@ -2597,7 +2189,7 @@ sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FU } } } - else if (sort_param->tot_runs == 1) + else if (sort_param->tot_runs == 1 && !IS_PARALLEL_SORT (sort_param)) { if (once_flushed) { @@ -2694,7 +2286,7 @@ sort_run_flush (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int out_file, { error = sort_add_new_file (thread_p, &sort_param->temp[out_file], sort_param->tmp_file_pgs, false, - sort_param->tde_encrypted); + sort_param->tde_encrypted, IS_PARALLEL_SORT (sort_param)); if (error != NO_ERROR) { return error; @@ -2944,6 +2536,17 @@ sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) /* OUTER LOOP */ + /* for one temporary file, put result from the temp file instead of merging it. */ + if (!IS_PARALLEL_SORT (sort_param) && sort_get_numpages_of_active_infiles (sort_param) == 1) + { + error = sort_put_result_from_tmpfile (thread_p, sort_param); + if (error != NO_ERROR) + { + ASSERT_ERROR (); + goto bailout; + } + } + /* While there are more than one input files with different runs to merge */ while ((act_infiles = sort_get_numpages_of_active_infiles (sort_param)) > 1) { @@ -3255,6 +2858,12 @@ sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) /* Initialize the size of next run to zero */ out_runsize = 0; + /* In parallel sort, put_fn will be performed by the parent thread. save last file index. */ + if (very_last_run && IS_PARALLEL_SORT (sort_param)) + { + sort_param->px_result_file_idx = cur_outfile; + } + for (;;) { /* OUTPUT A RECORD */ @@ -3267,7 +2876,7 @@ sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) { /* we found first unique sort_key record */ - if (very_last_run) + if (very_last_run && !IS_PARALLEL_SORT (sort_param)) { /* OUTPUT THE RECORD */ /* Obtain the output record for this temporary record */ @@ -3553,7 +3162,7 @@ sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) } } - if (!very_last_run) + if (!(very_last_run && !IS_PARALLEL_SORT (sort_param))) { /* Flush whatever is left on the output section */ out_act_bufno++; /* Since 0 refers to the first active buffer */ @@ -3618,6 +3227,100 @@ sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) return (error == SORT_PUT_STOP) ? NO_ERROR : error; } +/* + * sort_put_result_from_tmpfile () - put result from last temp file + * return: + * sort_param(in): sort parameters + * + */ +static int +sort_put_result_from_tmpfile (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) +{ + int tot_pages; + int current_pages = 0, read_pages = 0, cur_read_pages = 0; + int slot_num = 0; + char *cur_pgptr; + int result_file_idx = sort_param->px_result_file_idx; + RECDES record = RECDES_INITIALIZER; + RECDES long_record = RECDES_INITIALIZER; + int error = NO_ERROR; + SORT_REC *sort_rec; + int tot_rows = 0; + + tot_pages = sort_param->file_contents[result_file_idx].num_pages[0]; + while (tot_pages > 0) + { + read_pages = (tot_pages > sort_param->tot_buffers) ? sort_param->tot_buffers : tot_pages; + + error = + sort_read_area (thread_p, &sort_param->temp[result_file_idx], current_pages, read_pages, + sort_param->internal_memory); + if (error != NO_ERROR) + { + goto bailout; + } + + cur_pgptr = sort_param->internal_memory; + cur_read_pages = read_pages; + while (cur_read_pages > 0) + { + /* read record from sort temp file */ + slot_num = sort_spage_get_numrecs (cur_pgptr); + tot_rows += slot_num; + for (int i = 0; i < slot_num; i++) + { + if (sort_spage_get_record (cur_pgptr, i, &record, PEEK) != S_SUCCESS) + { + er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0); + error = ER_SORT_TEMP_PAGE_CORRUPTED; + goto bailout; + } + + /* If this is a long record retrieve it */ + if (record.type == REC_BIGONE) + { + if (sort_retrieve_longrec (thread_p, &record, &long_record) == NULL) + { + ASSERT_ERROR (); + error = er_errid (); + goto bailout; + } + } + /* write data by put_fn */ + if (record.type == REC_BIGONE) + { + error = (*sort_param->put_fn) (thread_p, &long_record, sort_param->put_arg); + if (error != NO_ERROR) + { + goto bailout; + } + } + else + { + sort_rec = (SORT_REC *) (record.data); + /* cut-off link used in Internal Sort */ + sort_rec->next = NULL; + error = (*sort_param->put_fn) (thread_p, &record, sort_param->put_arg); + if (error != NO_ERROR) + { + goto bailout; + } + } + } + + /* Switch to the next page */ + cur_pgptr += DB_PAGESIZE; + + cur_read_pages--; + current_pages++; + } + tot_pages -= read_pages; + } + +bailout: + return error; +} + /* * sort_exphase_merge () - Merge phase * return: @@ -3724,6 +3427,17 @@ sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) out_half = 0; } + /* for one temporary file, put result from the temp file instead of merging it. */ + if (!IS_PARALLEL_SORT (sort_param) && sort_get_numpages_of_active_infiles (sort_param) == 1) + { + error = sort_put_result_from_tmpfile (thread_p, sort_param); + if (error != NO_ERROR) + { + ASSERT_ERROR (); + goto bailout; + } + } + /* OUTER LOOP */ /* While there are more than one input files with different runs to merge */ @@ -4025,6 +3739,12 @@ sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) /* Initialize the size of next run to zero */ out_runsize = 0; + /* In parallel sort, put_fn will be performed by the parent thread. save last file index. */ + if (very_last_run && IS_PARALLEL_SORT (sort_param)) + { + sort_param->px_result_file_idx = cur_outfile; + } + for (;;) { /* OUTPUT A RECORD */ @@ -4032,7 +3752,7 @@ sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) /* FIND MINIMUM RECORD IN THE INPUT AREA */ min = min_p->rec_pos; - if (very_last_run) + if (very_last_run && !IS_PARALLEL_SORT (sort_param)) { /* OUTPUT THE RECORD */ /* Obtain the output record for this temporary record */ @@ -4301,7 +4021,7 @@ sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) } } - if (!very_last_run) + if (!(very_last_run && !IS_PARALLEL_SORT (sort_param))) { /* Flush whatever is left on the output section */ @@ -4311,221 +4031,760 @@ sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) out_sectaddr, sort_param->tde_encrypted); if (error != NO_ERROR) { - goto bailout; + goto bailout; + } + cur_page[cur_outfile] += out_act_bufno; + out_runsize += out_act_bufno; + } + + /* END UP THIS RUN */ + + /* Remove previous first_run nodes of the file_contents lists of the input files */ + for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++) + { + sort_run_remove_first (&sort_param->file_contents[i]); + } + + /* Add a new node to the file_contents list of the current output file */ + error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize); + if (error != NO_ERROR) + { + goto bailout; + } + + /* PRODUCE A NEW RUN */ + + /* Switch to the next out file */ + if (++cur_outfile >= sort_param->half_files + out_half) + { + cur_outfile = out_half; + } + } + + /* Exchange input and output file indices */ + temp = sort_param->in_half; + sort_param->in_half = out_half; + out_half = temp; + } + +bailout: + + for (i = 0; i < sort_param->half_files; i++) + { + if (long_recdes[i].data != NULL) + { + free_and_init (long_recdes[i].data); + } + } + + if (last_long_recdes.data) + { + free_and_init (last_long_recdes.data); + } + + return (error == SORT_PUT_STOP) ? NO_ERROR : error; +} + +/* AUXILIARY FUNCTIONS */ + +/* + * sort_get_avg_numpages_of_nonempty_tmpfile () - Return average number of pages + * currently occupied by nonempty + * temporary file + * return: + * sort_param(in): Sort paramater + */ +static int +sort_get_avg_numpages_of_nonempty_tmpfile (SORT_PARAM * sort_param) +{ + int f; + int sum, i; + int nonempty_temp_file_num = 0; + + sum = 0; + for (i = 0; i < sort_param->tot_tempfiles; i++) + { + /* If the list is not empty */ + f = sort_param->file_contents[i].first_run; + if (f > -1) + { + nonempty_temp_file_num++; + for (; f <= sort_param->file_contents[i].last_run; f++) + { + sum += sort_param->file_contents[i].num_pages[f]; + } + } + } + + return (sum / MAX (1, nonempty_temp_file_num)); +} + +/* + * sort_return_used_resources () - Return system resource used for sorting + * return: void + * sort_param(in): Sort paramater + * + * Note: Clear the sort parameter structure by deallocating any allocated + * memory areas and destroying any temporary files and volumes. + */ +static void +sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, PARALLEL_TYPE parallel_type) +{ + int k; +#if defined(SERVER_MODE) + int rv; +#endif /* SERVER_MODE */ + + if (sort_param == NULL) + { + return; /* nop */ + } + + if (sort_param->internal_memory) + { + free_and_init (sort_param->internal_memory); + } + + if (parallel_type == PX_SINGLE || parallel_type == PX_MAIN_IN_PARALLEL) + { + for (k = 0; k < sort_param->tot_tempfiles; k++) + { + if (sort_param->temp[k].volid != NULL_VOLID) + { + (void) file_temp_retire (thread_p, &sort_param->temp[k], false); + } + } + } + + if (sort_param->multipage_file.volid != NULL_VOLID) + { + (void) file_temp_retire (thread_p, &(sort_param->multipage_file), false); + } + + for (k = 0; k < SORT_MAX_TOT_FILES; k++) + { + if (sort_param->file_contents[k].num_pages != NULL) + { + free_and_init (sort_param->file_contents[k].num_pages); + } + } + + if (parallel_type == PX_THREAD_IN_PARALLEL) + { + for (int i = 0; i < sort_param->px_max_index; i++) + { + if (sort_param->get_arg != NULL) + { + SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg; + if (sort_info_p->s_id != NULL) + { + db_private_free_and_init (thread_p, sort_info_p->s_id); + } + if (sort_info_p->input_file != NULL) + { + db_private_free_and_init (thread_p, sort_info_p->input_file); + } + db_private_free_and_init (thread_p, sort_param->get_arg); + } + } + } + + if (parallel_type == PX_SINGLE || parallel_type == PX_MAIN_IN_PARALLEL) + { + free_and_init (sort_param); + } +} + +/* + * sort_add_new_file () - Create a new temporary file for sorting purposes + * return: NO_ERROR + * vfid(in): Set to the created file identifier + * file_pg_cnt_est(in): Estimated file page count + * force_alloc(in): Allocate file pages now ? + * tde_encrypted(in): whether the file has to be encrypted or not for TDE + */ +static int +sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc, bool tde_encrypted, + bool is_parallel) +{ + VPID new_vpid; + TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE; + int ret = NO_ERROR; + + /* todo: sort file is a case I missed that seems to use file_find_nthpages. I don't know if it can be optimized to + * work without numerable files, that remains to be seen. */ + + ret = file_create_temp_numerable (thread_p, file_pg_cnt_est, vfid, is_parallel); + if (ret != NO_ERROR) + { + ASSERT_ERROR (); + return ret; + } + if (VFID_ISNULL (vfid)) + { + assert_release (false); + return ER_FAILED; + } + if (tde_encrypted) + { + tde_algo = (TDE_ALGORITHM) prm_get_integer_value (PRM_ID_TDE_DEFAULT_ALGORITHM); + } + + ret = file_apply_tde_algorithm (thread_p, vfid, tde_algo); + if (ret != NO_ERROR) + { + ASSERT_ERROR (); + file_temp_retire (thread_p, vfid, is_parallel); + VFID_SET_NULL (vfid); + return ret; + } + + if (force_alloc == false) + { + return NO_ERROR; + } + + /* page allocation force is specified, allocate pages for the file */ + /* todo: we don't have multiple page allocation, but allocation should be fast enough */ + for (; file_pg_cnt_est > 0; file_pg_cnt_est--) + { + ret = file_alloc (thread_p, vfid, NULL, NULL, &new_vpid, NULL); + if (ret != NO_ERROR) + { + ASSERT_ERROR (); + file_temp_retire (thread_p, vfid, is_parallel); + VFID_SET_NULL (vfid); + return ret; + } + } + + return NO_ERROR; +} + +/* + * sort_copy_sort_param () - copy sort param from src_param to dest_param + * return: NO_ERROR + * dest_param(in): + * src_param(in): + * parallel_num(in): + */ +static int +sort_copy_sort_param (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, int parallel_num) +{ + int error = NO_ERROR; + int i, j; + + /* copy from origin sort param */ + for (i = 0; i < parallel_num; i++) + { + memcpy (&px_sort_param[i], sort_param, sizeof (SORT_PARAM)); + } + + /* init */ + for (i = 0; i < parallel_num; i++) + { + px_sort_param[i].internal_memory = NULL; + for (j = 0; j < SORT_MAX_TOT_FILES; j++) + { + px_sort_param[i].file_contents[j].num_pages = NULL; + } + } + + /* alloc new memory */ + for (i = 0; i < parallel_num; i++) + { + px_sort_param[i].internal_memory = (char *) malloc ((size_t) sort_param->tot_buffers * (size_t) DB_PAGESIZE); + if (px_sort_param[i].internal_memory == NULL) + { + error = ER_OUT_OF_VIRTUAL_MEMORY; + er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (sort_param->tot_buffers * DB_PAGESIZE)); + break; + } + for (j = 0; j < SORT_MAX_TOT_FILES; j++) + { + /* Initilize file contents list */ + px_sort_param[i].file_contents[j].num_pages = (int *) malloc (SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int)); + if (px_sort_param[i].file_contents[j].num_pages == NULL) + { + sort_param->tot_tempfiles = j; + error = ER_OUT_OF_VIRTUAL_MEMORY; + break; + } + + px_sort_param[i].file_contents[j].num_slots = SORT_INITIAL_DYN_ARRAY_SIZE; + px_sort_param[i].file_contents[j].first_run = -1; + px_sort_param[i].file_contents[j].last_run = -1; + } + + /* init px variable */ + px_sort_param[i].px_status = PX_PROGRESS; + px_sort_param[i].px_max_index = parallel_num; + px_sort_param[i].px_result_file_idx = 0; + /* Copy the parent's tran_index. */ + px_sort_param[i].px_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p); + /* init get_arg. it'll be copied in sort_split_input_temp_file(). TO_DO : put_arg?? */ + px_sort_param[i].get_arg = NULL; + } + + if (error != NO_ERROR) + { + /* free memory */ + for (i = 0; i < parallel_num; i++) + { + if (px_sort_param[i].internal_memory != NULL) + { + free_and_init (px_sort_param[i].internal_memory); + } + for (j = 0; j < SORT_MAX_TOT_FILES; j++) + { + if (px_sort_param[i].file_contents[j].num_pages != NULL) + { + free_and_init (px_sort_param[i].file_contents[j].num_pages); } - cur_page[cur_outfile] += out_act_bufno; - out_runsize += out_act_bufno; } + } + } - /* END UP THIS RUN */ + return error; +} - /* Remove previous first_run nodes of the file_contents lists of the input files */ - for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++) - { - sort_run_remove_first (&sort_param->file_contents[i]); - } +/* + * sort_split_input_temp_file () - split input temp file + * return: NO_ERROR + * px_sort_param(in): + * sort_param(in): + * parallel_num(in): + */ +static int +sort_split_input_temp_file (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, + int parallel_num) +{ + /* TO_DO : Need a logic to revert it? */ + int error = NO_ERROR; + int i = 0, j = 0, splitted_num_page; + bool is_first_vpid = false; + PAGE_PTR page_p; + VPID next_vpid, prev_vpid, first_vpid[SORT_MAX_PARALLEL], last_vpid[SORT_MAX_PARALLEL]; + QFILE_LIST_SCAN_ID *scan_id_p; + SORT_INFO *sort_info_p, *org_sort_info_p; + + /* get scan id of input file */ + sort_info_p = (SORT_INFO *) sort_param->get_arg; + + /* init vpid */ + for (i = 0; i < parallel_num; i++) + { + first_vpid[i] = VPID_INITIALIZER; + last_vpid[i] = VPID_INITIALIZER; + } - /* Add a new node to the file_contents list of the current output file */ - error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize); - if (error != NO_ERROR) + /* page_cnt contains ovfl_page. If the length of tuple is longer than page, split by the number of tuples. */ + splitted_num_page = sort_info_p->input_file->page_cnt / parallel_num; + splitted_num_page = MIN (splitted_num_page, sort_info_p->input_file->tuple_cnt / parallel_num); + + /* find first and last vpid for splitted file */ + i = 0; + is_first_vpid = false; + prev_vpid = sort_info_p->input_file->first_vpid; + while (true) + { + page_p = qmgr_get_old_page (thread_p, &prev_vpid, sort_info_p->input_file->tfile_vfid); + if (is_first_vpid) + { + QFILE_PUT_PREV_VPID_NULL (page_p); + qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid); + is_first_vpid = false; + } + QFILE_GET_NEXT_VPID (&next_vpid, page_p); + if (VPID_ISNULL (&next_vpid)) + { + qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid); + break; + } + else if (++j >= splitted_num_page) + { + QFILE_PUT_NEXT_VPID_NULL (page_p); + qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid); + is_first_vpid = true; + first_vpid[i] = next_vpid; + last_vpid[i] = prev_vpid; + i++; + + if (i < parallel_num - 1) { - goto bailout; + j = 0; } - - /* PRODUCE A NEW RUN */ - - /* Switch to the next out file */ - if (++cur_outfile >= sort_param->half_files + out_half) + else { - cur_outfile = out_half; + qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid); + + /* init prev page id */ + page_p = qmgr_get_old_page (thread_p, &next_vpid, sort_info_p->input_file->tfile_vfid); + QFILE_PUT_PREV_VPID_NULL (page_p); + qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid); + qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid); + break; } } - - /* Exchange input and output file indices */ - temp = sort_param->in_half; - sort_param->in_half = out_half; - out_half = temp; + prev_vpid = next_vpid; + qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid); } -bailout: - - for (i = 0; i < sort_param->half_files; i++) + /* add splitted file info */ + for (i = 0; i < parallel_num; i++) { - if (long_recdes[i].data != NULL) + px_sort_param[i].get_arg = (void *) db_private_alloc (thread_p, sizeof (SORT_INFO)); + if (px_sort_param[i].get_arg == NULL) { - free_and_init (long_recdes[i].data); + error = ER_OUT_OF_VIRTUAL_MEMORY; + goto cleanup; + } + sort_info_p = (SORT_INFO *) px_sort_param[i].get_arg; + org_sort_info_p = (SORT_INFO *) sort_param->get_arg; + memcpy (sort_info_p, org_sort_info_p, sizeof (SORT_INFO)); + + sort_info_p->input_file = NULL; + sort_info_p->s_id = NULL; + + sort_info_p->s_id = (QFILE_SORT_SCAN_ID *) db_private_alloc (thread_p, sizeof (QFILE_SORT_SCAN_ID)); + if (sort_info_p->s_id == NULL) + { + error = ER_OUT_OF_VIRTUAL_MEMORY; + goto cleanup; + } + memcpy (sort_info_p->s_id, org_sort_info_p->s_id, sizeof (QFILE_SORT_SCAN_ID)); + + sort_info_p->input_file = (QFILE_LIST_ID *) db_private_alloc (thread_p, sizeof (QFILE_LIST_ID)); + if (sort_info_p->input_file == NULL) + { + error = ER_OUT_OF_VIRTUAL_MEMORY; + goto cleanup; } + memcpy (sort_info_p->input_file, org_sort_info_p->input_file, sizeof (QFILE_LIST_ID)); + /* tuple_cnt and page_cnt put approximately. */ + /* It can be put precisely through the page header, but not have to be precise for later process. */ + sort_info_p->input_file->tuple_cnt = org_sort_info_p->input_file->tuple_cnt / parallel_num; + sort_info_p->input_file->page_cnt = splitted_num_page; + sort_info_p->input_file->first_vpid = (i == 0) ? org_sort_info_p->input_file->first_vpid : first_vpid[i - 1]; + sort_info_p->input_file->last_vpid = + (i == parallel_num - 1) ? org_sort_info_p->input_file->last_vpid : last_vpid[i]; } - if (last_long_recdes.data) +cleanup: + if (error != NO_ERROR) { - free_and_init (last_long_recdes.data); + /* free memory */ + for (i = 0; i < parallel_num; i++) + { + if (px_sort_param[i].get_arg != NULL) + { + sort_info_p = (SORT_INFO *) px_sort_param[i].get_arg; + if (sort_info_p->s_id != NULL) + { + db_private_free_and_init (thread_p, sort_info_p->s_id); + } + if (sort_info_p->input_file != NULL) + { + db_private_free_and_init (thread_p, sort_info_p->input_file); + } + db_private_free_and_init (thread_p, px_sort_param[i].get_arg); + } + } } - return (error == SORT_PUT_STOP) ? NO_ERROR : error; + return error; } -/* AUXILIARY FUNCTIONS */ - /* - * sort_get_avg_numpages_of_nonempty_tmpfile () - Return average number of pages - * currently occupied by nonempty - * temporary file - * return: - * sort_param(in): Sort paramater + * sort_merge_run_for_parallel () - merge run for parallel + * return: NO_ERROR + * px_sort_param(in): + * sort_param(in): + * parallel_num(in): */ static int -sort_get_avg_numpages_of_nonempty_tmpfile (SORT_PARAM * sort_param) +sort_merge_run_for_parallel (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, + int parallel_num) { - int f; - int sum, i; - int nonempty_temp_file_num = 0; + int error = NO_ERROR; + int i = 0, idx = 0, file_pg_cnt_est; + int remaining_run, level, merge_num; + RESULT_RUN result_run[SORT_MAX_PARALLEL]; - sum = 0; - for (i = 0; i < sort_param->tot_tempfiles; i++) + if (parallel_num > SORT_MAX_PARALLEL) { - /* If the list is not empty */ - f = sort_param->file_contents[i].first_run; - if (f > -1) + return ER_FAILED; + } + + /* init result_run */ + for (i = 0; i < parallel_num; i++) + { + result_run[i].temp_file = px_sort_param[i].temp[px_sort_param[i].px_result_file_idx]; + result_run[i].num_pages = px_sort_param[i].file_contents[px_sort_param[i].px_result_file_idx].num_pages[0]; + } + + remaining_run = parallel_num; + level = 0; + + while (remaining_run > 1) + { + merge_num = (remaining_run + (SORT_MAX_HALF_FILES - 1)) / SORT_MAX_HALF_FILES; + + for (i = 0; i < merge_num; i++) { - nonempty_temp_file_num++; - for (; f <= sort_param->file_contents[i].last_run; f++) + idx = i * pow (SORT_MAX_HALF_FILES, level + 1); + error = sort_merge_nruns (thread_p, result_run, sort_param, idx, remaining_run, level); + if (error != NO_ERROR) { - sum += sort_param->file_contents[i].num_pages[f]; + return error; } } + + remaining_run = merge_num; + level++; } - return (sum / MAX (1, nonempty_temp_file_num)); + return error; } /* - * sort_return_used_resources () - Return system resource used for sorting - * return: void - * sort_param(in): Sort paramater - * - * Note: Clear the sort parameter structure by deallocating any allocated - * memory areas and destroying any temporary files and volumes. + * sort_merge_nruns () - merge n run + * return: NO_ERROR + * px_sort_param(in): + * sort_param(in): + * parallel_num(in): */ -static void -sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) +static int +sort_merge_nruns (THREAD_ENTRY * thread_p, RESULT_RUN * result_run, SORT_PARAM * sort_param, int first_idx, + int remaining_run, int level) { - int k; -#if defined(SERVER_MODE) - int rv; -#endif /* SERVER_MODE */ + int error = NO_ERROR; + int i = 0, idx = 0, file_pg_cnt_est; + int half_files = MIN (remaining_run - (first_idx / pow (SORT_MAX_HALF_FILES, level)), SORT_MAX_HALF_FILES); - if (sort_param == NULL) + if (half_files > remaining_run) { - return; /* nop */ + return ER_FAILED; } - if (sort_param->internal_memory) + /* init file info */ + sort_param->px_result_file_idx = 0; + sort_param->half_files = half_files; + sort_param->tot_tempfiles = half_files * 2; + sort_param->in_half = 0; + + /* copy temp file and file contents */ + for (i = 0; i < sort_param->half_files; i++) { - free_and_init (sort_param->internal_memory); + idx = (level == 0) ? (i + first_idx) : ((i * pow (SORT_MAX_HALF_FILES, level)) + first_idx); + sort_param->temp[i] = result_run[idx].temp_file; + /* copy the number of pages for one run */ + sort_param->file_contents[i].num_pages[0] = result_run[idx].num_pages; + sort_param->file_contents[i].first_run = 0; + sort_param->file_contents[i].last_run = 0; + } + for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++) + { + /* init temp file and contents */ + sort_param->temp[i].volid = NULL_VOLID; + sort_param->file_contents[i].first_run = -1; + sort_param->file_contents[i].last_run = -1; } - for (k = 0; k < sort_param->tot_tempfiles; k++) + /* Create output temporary files make file and temporary volume page count estimates */ + file_pg_cnt_est = sort_get_avg_numpages_of_nonempty_tmpfile (sort_param); + file_pg_cnt_est = MAX (1, file_pg_cnt_est); + + for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++) { - if (sort_param->temp[k].volid != NULL_VOLID) + error = + sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted, false); + if (error != NO_ERROR) { - (void) file_temp_retire (thread_p, &sort_param->temp[k]); + return error; } } - if (sort_param->multipage_file.volid != NULL_VOLID) +#if !defined(NDEBUG) + TSC_TICKS start_tick, end_tick; + TSCTIMEVAL tv_diff; + struct timeval orderby_time; + tsc_getticks (&start_tick); +#endif + + /* Merge the parallel processed results. */ + sort_param->px_max_index = (remaining_run <= SORT_MAX_HALF_FILES) ? 1 : 2; + if (sort_param->option == SORT_ELIM_DUP) { - (void) file_temp_retire (thread_p, &(sort_param->multipage_file)); + error = sort_exphase_merge_elim_dup (thread_p, sort_param); } + else + { + /* SORT_DUP */ + error = sort_exphase_merge (thread_p, sort_param); + } + +#if !defined(NDEBUG) + tsc_getticks (&end_tick); + tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); + TSC_ADD_TIMEVAL (orderby_time, tv_diff); + printf ("n-merge time: %d\n", TO_MSEC (orderby_time)); +#endif - for (k = 0; k < sort_param->tot_tempfiles; k++) + /* save result run */ + result_run[first_idx].temp_file = sort_param->temp[sort_param->px_result_file_idx]; + result_run[first_idx].num_pages = sort_param->file_contents[sort_param->px_result_file_idx].num_pages[0]; + + /* retire temp file */ + for (i = 0; i < sort_param->tot_tempfiles; i++) { - if (sort_param->file_contents[k].num_pages != NULL) + if (sort_param->temp[i].volid != NULL_VOLID && i != sort_param->px_result_file_idx) { - db_private_free_and_init (thread_p, sort_param->file_contents[k].num_pages); + (void) file_temp_retire (thread_p, &sort_param->temp[i], false); + VFID_SET_NULL (&sort_param->temp[i]); } } - if (sort_param->px_array) + return error; +} + +/* + * sort_check_parallelism () - check the number of parallel processes + * return: parallel_num + * sort_parallel_type(in): + * sort_param(in): + */ +static int +sort_check_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param) +{ + SORT_INFO *sort_info_p; + int parallel_num = prm_get_integer_value (PRM_ID_MAX_PARALLEL_THREAD); + + if (sort_param->px_type == SORT_ORDER_BY) { - free_and_init (sort_param->px_array); - } - sort_param->px_height_max = sort_param->px_array_size = 0; + /* get scan id of input file */ + sort_info_p = (SORT_INFO *) sort_param->get_arg; -#if defined(SERVER_MODE) - rv = pthread_mutex_destroy (&(sort_param->px_mtx)); - if (rv != 0) + /* Find the number of parallel processes by page_cnt and tuple_cnt */ + if (sort_info_p->input_file->page_cnt > parallel_num && sort_info_p->input_file->tuple_cnt > parallel_num) + { + /* TO_DO : need to check the appropriate number of parallels depending on the number of pages */ + return parallel_num; + } + } + else { - er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_MUTEX_DESTROY, 0); + /* Not implemented yet */ + return 1; } -#endif - free_and_init (sort_param); + /* single process */ + return 1; } /* - * sort_add_new_file () - Create a new temporary file for sorting purposes + * sort_start_parallelism () - start parallelism * return: NO_ERROR - * vfid(in): Set to the created file identifier - * file_pg_cnt_est(in): Estimated file page count - * force_alloc(in): Allocate file pages now ? - * tde_encrypted(in): whether the file has to be encrypted or not for TDE + * px_sort_param(in): + * sort_param(in): + * parallel_num(in): + * sort_parallel_type(in): */ static int -sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc, bool tde_encrypted) +sort_start_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, int parallel_num) { - VPID new_vpid; - TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE; - int ret = NO_ERROR; - - /* todo: sort file is a case I missed that seems to use file_find_nthpages. I don't know if it can be optimized to - * work without numerable files, that remains to be seen. */ + int error = NO_ERROR; - ret = file_create_temp_numerable (thread_p, file_pg_cnt_est, vfid); - if (ret != NO_ERROR) + /* copy sort_param for parallel sort */ + error = sort_copy_sort_param (thread_p, px_sort_param, sort_param, parallel_num); + if (error != NO_ERROR) { - ASSERT_ERROR (); - return ret; + return ER_FAILED; } - if (VFID_ISNULL (vfid)) + + /* case of ORDER BY */ + if (sort_param->px_type == SORT_ORDER_BY) { - assert_release (false); - return ER_FAILED; + QFILE_LIST_SCAN_ID *scan_id_p; + SORT_INFO *sort_info_p; + + /* get scan id of input file */ + sort_info_p = (SORT_INFO *) sort_param->get_arg; + scan_id_p = sort_info_p->s_id->s_id; + + /* close input file for read. it'll be opened in parallel */ + qfile_close_scan (thread_p, scan_id_p); + + /* split input temp file. TO_DO : may need to revert the splitted pages */ + error = sort_split_input_temp_file (thread_p, px_sort_param, sort_param, parallel_num); + if (error != NO_ERROR) + { + return ER_FAILED; + } } - if (tde_encrypted) + else { - tde_algo = (TDE_ALGORITHM) prm_get_integer_value (PRM_ID_TDE_DEFAULT_ALGORITHM); + /* not implemented yet (group by, analytic fuction, create index) */ + return ER_FAILED; } - ret = file_apply_tde_algorithm (thread_p, vfid, tde_algo); - if (ret != NO_ERROR) + return error; +} + +/* + * sort_end_parallelism () - end parallelism + * return: NO_ERROR + * px_sort_param(in): + * sort_param(in): + * parallel_num(in): + * sort_parallel_type(in): + */ +static int +sort_end_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, int parallel_num) +{ + int error = NO_ERROR; + SORT_INFO *sort_info_p; + QFILE_LIST_SCAN_ID *scan_id_p; + + if (sort_param->px_type == SORT_ORDER_BY) { - ASSERT_ERROR (); - file_temp_retire (thread_p, vfid); - VFID_SET_NULL (vfid); - return ret; - } + sort_info_p = (SORT_INFO *) sort_param->get_arg; + scan_id_p = sort_info_p->s_id->s_id; - if (force_alloc == false) + /* open origin temp file for read */ + if (qfile_open_list_scan (sort_info_p->input_file, scan_id_p) != NO_ERROR) + { + return ER_FAILED; + } + } + else { - return NO_ERROR; + /* not implemented yet (group by, analytic fuction, create index) */ + return ER_FAILED; } - /* page allocation force is specified, allocate pages for the file */ - /* todo: we don't have multiple page allocation, but allocation should be fast enough */ - for (; file_pg_cnt_est > 0; file_pg_cnt_est--) +#if !defined(NDEBUG) + TSC_TICKS start_tick, end_tick; + TSCTIMEVAL tv_diff; + struct timeval orderby_time; + tsc_getticks (&start_tick); +#endif + + /* merging temp files from parallel processed */ + error = sort_merge_run_for_parallel (thread_p, px_sort_param, sort_param, parallel_num); + if (error != NO_ERROR) { - ret = file_alloc (thread_p, vfid, NULL, NULL, &new_vpid, NULL); - if (ret != NO_ERROR) - { - ASSERT_ERROR (); - file_temp_retire (thread_p, vfid); - VFID_SET_NULL (vfid); - return ret; - } + return ER_FAILED; } - return NO_ERROR; +#if !defined(NDEBUG) + tsc_getticks (&end_tick); + tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); + TSC_ADD_TIMEVAL (orderby_time, tv_diff); + printf ("merge time: %d\n", TO_MSEC (orderby_time)); +#endif + + return error; } /* @@ -4783,7 +5042,7 @@ sort_checkalloc_numpages_of_outfiles (THREAD_ENTRY * thread_p, SORT_PARAM * sort /* If there is a file not to be used anymore, destroy it in order to reuse spaces. */ if (!VFID_ISNULL (&sort_param->temp[i])) { - error_code = file_temp_retire (thread_p, &sort_param->temp[i]); + error_code = file_temp_retire (thread_p, &sort_param->temp[i], IS_PARALLEL_SORT (sort_param)); if (error_code != NO_ERROR) { ASSERT_ERROR (); @@ -4893,8 +5152,7 @@ sort_run_add_new (FILE_CONTENTS * file_contents, int num_pages) if (file_contents->last_run >= file_contents->num_slots) { new_total_elements = ((int) (((float) file_contents->num_slots * SORT_EXPAND_DYN_ARRAY_RATIO) + 0.5)); - file_contents->num_pages = - (int *) db_private_realloc (NULL, file_contents->num_pages, new_total_elements * sizeof (int)); + file_contents->num_pages = (int *) realloc (file_contents->num_pages, new_total_elements * sizeof (int)); if (file_contents->num_pages == NULL) { return ER_FAILED; diff --git a/src/storage/external_sort.h b/src/storage/external_sort.h index 0c61dbc2f3..f51915b753 100644 --- a/src/storage/external_sort.h +++ b/src/storage/external_sort.h @@ -56,6 +56,14 @@ typedef enum SORT_DUP /* allow duplicate */ } SORT_DUP_OPTION; +typedef enum +{ + SORT_ORDER_BY, + SORT_GROUP_BY, + SORT_ANALYTIC, + SORT_INDEX_LEAF +} SORT_PARALLEL_TYPE; + typedef SORT_STATUS SORT_GET_FUNC (THREAD_ENTRY * thread_p, RECDES *, void *); typedef int SORT_PUT_FUNC (THREAD_ENTRY * thread_p, const RECDES *, void *); typedef int SORT_CMP_FUNC (const void *, const void *, void *); @@ -135,6 +143,7 @@ struct SORT_INFO SORTKEY_INFO key_info; /* All of the interesting key information. */ QFILE_SORT_SCAN_ID *s_id; /* A SCAN_ID for the input list file. This is stateful, and records the current * location of the scan between calls to ls_sort_get_next(). */ + QFILE_LIST_ID *input_file; QFILE_LIST_ID *output_file; /* The name of the output file. This is where ls_sort_put_next_*() deposits its stuff. */ RECDES output_recdes; /* A working buffer for output of tuples; used only when we're using @@ -144,6 +153,7 @@ struct SORT_INFO extern int sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GET_FUNC * get_fn, void *get_arg, SORT_PUT_FUNC * put_fn, void *put_arg, SORT_CMP_FUNC * cmp_fn, void *cmp_arg, - SORT_DUP_OPTION option, int limit, bool includes_tde_class); + SORT_DUP_OPTION option, int limit, bool includes_tde_class, + SORT_PARALLEL_TYPE sort_parallel_type); #endif /* _EXTERNAL_SORT_H_ */ diff --git a/src/storage/file_manager.c b/src/storage/file_manager.c index 39fa648ded..fa933ac028 100644 --- a/src/storage/file_manager.c +++ b/src/storage/file_manager.c @@ -67,6 +67,7 @@ // XXX: SHOULD BE THE LAST INCLUDE HEADER #include "memory_wrapper.hpp" + /************************************************************************/ /* Define structures, globals, and macro's */ /************************************************************************/ @@ -490,6 +491,7 @@ struct file_tempcache int ncached_numerable; pthread_mutex_t mutex; + pthread_mutex_t local_mutex; #if !defined (NDEBUG) int owner_mutex; #endif /* !NDEBUG */ @@ -727,7 +729,7 @@ static int file_perm_dealloc (THREAD_ENTRY * thread_p, PAGE_PTR page_fhead, cons static int file_rv_dealloc_internal (THREAD_ENTRY * thread_p, LOG_RCV * rcv, bool compensate_or_run_postpone); STATIC_INLINE int file_create_temp_internal (THREAD_ENTRY * thread_p, int npages, FILE_TYPE ftype, bool is_numerable, - VFID * vfid_out) __attribute__ ((ALWAYS_INLINE)); + VFID * vfid_out, bool with_lock) __attribute__ ((ALWAYS_INLINE)); static int file_sector_map_pages (THREAD_ENTRY * thread_p, const void *data, int index, bool * stop, void *args); static DISK_ISVALID file_table_check (THREAD_ENTRY * thread_p, const VFID * vfid, DISK_VOLMAP_CLONE * disk_map_clone); @@ -769,8 +771,8 @@ static int file_temp_alloc (THREAD_ENTRY * thread_p, PAGE_PTR page_fhead, FILE_A STATIC_INLINE int file_temp_set_type (THREAD_ENTRY * thread_p, VFID * vfid, FILE_TYPE ftype) __attribute__ ((ALWAYS_INLINE)); static int file_temp_reset_user_pages (THREAD_ENTRY * thread_p, const VFID * vfid); -STATIC_INLINE int file_temp_retire_internal (THREAD_ENTRY * thread_p, const VFID * vfid, bool was_preserved) - __attribute__ ((ALWAYS_INLINE)); +STATIC_INLINE int file_temp_retire_internal (THREAD_ENTRY * thread_p, const VFID * vfid, bool was_preserved, + bool with_lock) __attribute__ ((ALWAYS_INLINE)); /************************************************************************/ /* Temporary cache section */ @@ -797,6 +799,10 @@ STATIC_INLINE FILE_TEMPCACHE_ENTRY *file_tempcache_pop_tran_file (THREAD_ENTRY * __attribute__ ((ALWAYS_INLINE)); STATIC_INLINE void file_tempcache_push_tran_file (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_ENTRY * entry) __attribute__ ((ALWAYS_INLINE)); +STATIC_INLINE FILE_TEMPCACHE_ENTRY *file_tempcache_pop_tran_file_with_lock (THREAD_ENTRY * thread_p, const VFID * vfid) + __attribute__ ((ALWAYS_INLINE)); +STATIC_INLINE void file_tempcache_push_tran_file_with_lock (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_ENTRY * entry) + __attribute__ ((ALWAYS_INLINE)); STATIC_INLINE void file_tempcache_dump (FILE * fp) __attribute__ ((ALWAYS_INLINE)); /************************************************************************/ @@ -3152,7 +3158,8 @@ file_create_heap (THREAD_ENTRY * thread_p, bool reuse_oid, const OID * class_oid * vfid_out (out) : VFID of file (obtained from cache or created). */ STATIC_INLINE int -file_create_temp_internal (THREAD_ENTRY * thread_p, int npages, FILE_TYPE ftype, bool is_numerable, VFID * vfid_out) +file_create_temp_internal (THREAD_ENTRY * thread_p, int npages, FILE_TYPE ftype, bool is_numerable, VFID * vfid_out, + bool with_lock) { FILE_TABLESPACE tablespace; FILE_TEMPCACHE_ENTRY *tempcache_entry = NULL; @@ -3188,8 +3195,16 @@ file_create_temp_internal (THREAD_ENTRY * thread_p, int npages, FILE_TYPE ftype, *vfid_out = tempcache_entry->vfid; } - /* save to transaction temporary file list */ - file_tempcache_push_tran_file (thread_p, tempcache_entry); + /* save to transaction temporary file list entry */ + if (with_lock) + { + file_tempcache_push_tran_file_with_lock (thread_p, tempcache_entry); + } + else + { + file_tempcache_push_tran_file (thread_p, tempcache_entry); + } + return NO_ERROR; } @@ -3202,9 +3217,9 @@ file_create_temp_internal (THREAD_ENTRY * thread_p, int npages, FILE_TYPE ftype, * vfid (out) : File identifier */ int -file_create_temp (THREAD_ENTRY * thread_p, int npages, VFID * vfid) +file_create_temp (THREAD_ENTRY * thread_p, int npages, VFID * vfid, bool with_lock) { - return file_create_temp_internal (thread_p, npages, FILE_TEMP, false, vfid); + return file_create_temp_internal (thread_p, npages, FILE_TEMP, false, vfid, with_lock); } /* @@ -3216,9 +3231,9 @@ file_create_temp (THREAD_ENTRY * thread_p, int npages, VFID * vfid) * vfid (out) : File identifier */ int -file_create_temp_numerable (THREAD_ENTRY * thread_p, int npages, VFID * vfid) +file_create_temp_numerable (THREAD_ENTRY * thread_p, int npages, VFID * vfid, bool with_lock) { - return file_create_temp_internal (thread_p, npages, FILE_TEMP, true, vfid); + return file_create_temp_internal (thread_p, npages, FILE_TEMP, true, vfid, with_lock); } /* @@ -3231,7 +3246,7 @@ file_create_temp_numerable (THREAD_ENTRY * thread_p, int npages, VFID * vfid) int file_create_query_area (THREAD_ENTRY * thread_p, VFID * vfid) { - return file_create_temp_internal (thread_p, 1, FILE_QUERY_AREA, false, vfid); + return file_create_temp_internal (thread_p, 1, FILE_QUERY_AREA, false, vfid, false); } /* @@ -4300,9 +4315,9 @@ file_postpone_destroy (THREAD_ENTRY * thread_p, const VFID * vfid) * vfid (in) : file identifier */ int -file_temp_retire (THREAD_ENTRY * thread_p, const VFID * vfid) +file_temp_retire (THREAD_ENTRY * thread_p, const VFID * vfid, bool with_lock) { - return file_temp_retire_internal (thread_p, vfid, false); + return file_temp_retire_internal (thread_p, vfid, false, with_lock); } /* @@ -4315,7 +4330,7 @@ file_temp_retire (THREAD_ENTRY * thread_p, const VFID * vfid) int file_temp_retire_preserved (THREAD_ENTRY * thread_p, const VFID * vfid) { - return file_temp_retire_internal (thread_p, vfid, true); + return file_temp_retire_internal (thread_p, vfid, true, false); } /* @@ -4328,7 +4343,7 @@ file_temp_retire_preserved (THREAD_ENTRY * thread_p, const VFID * vfid) * transaction list. */ STATIC_INLINE int -file_temp_retire_internal (THREAD_ENTRY * thread_p, const VFID * vfid, bool was_preserved) +file_temp_retire_internal (THREAD_ENTRY * thread_p, const VFID * vfid, bool was_preserved, bool with_lock) { FILE_TEMPCACHE_ENTRY *entry = NULL; int error_code = NO_ERROR; @@ -4354,7 +4369,15 @@ file_temp_retire_internal (THREAD_ENTRY * thread_p, const VFID * vfid, bool was_ } else { - entry = file_tempcache_pop_tran_file (thread_p, vfid); + if (with_lock) + { + entry = file_tempcache_pop_tran_file_with_lock (thread_p, vfid); + } + else + { + entry = file_tempcache_pop_tran_file (thread_p, vfid); + } + assert (entry != NULL); } @@ -9491,9 +9514,56 @@ file_tempcache_cache_or_drop_entries (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_EN STATIC_INLINE FILE_TEMPCACHE_ENTRY * file_tempcache_pop_tran_file (THREAD_ENTRY * thread_p, const VFID * vfid) { - FILE_TEMPCACHE_ENTRY **tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; + FILE_TEMPCACHE_ENTRY **tran_files_p; + FILE_TEMPCACHE_ENTRY *entry = NULL, *prev_entry = NULL; + + tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; + + for (entry = *tran_files_p; entry != NULL; entry = entry->next) + { + if (VFID_EQ (&entry->vfid, vfid)) + { + /* remove entry from transaction list */ + if (prev_entry != NULL) + { + prev_entry->next = entry->next; + } + else + { + *tran_files_p = entry->next; + } + entry->next = NULL; + + file_log ("file_tempcache_pop_tran_file", "removed entry " FILE_TEMPCACHE_ENTRY_MSG, + FILE_TEMPCACHE_ENTRY_AS_ARGS (entry)); + + return entry; + } + prev_entry = entry; + } + + /* should have found it */ + assert_release (false); + return NULL; +} + +/* + * file_tempcache_pop_tran_file_with_lock () - pop entry with the given VFID from transaction list + * + * return : popped entry + * thread_p (in) : thread entry + * vfid (in) : file identifier + */ +STATIC_INLINE FILE_TEMPCACHE_ENTRY * +file_tempcache_pop_tran_file_with_lock (THREAD_ENTRY * thread_p, const VFID * vfid) +{ + FILE_TEMPCACHE_ENTRY **tran_files_p; FILE_TEMPCACHE_ENTRY *entry = NULL, *prev_entry = NULL; + pthread_mutex_lock (&file_Tempcache.local_mutex); + + tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; + for (entry = *tran_files_p; entry != NULL; entry = entry->next) { if (VFID_EQ (&entry->vfid, vfid)) @@ -9512,11 +9582,14 @@ file_tempcache_pop_tran_file (THREAD_ENTRY * thread_p, const VFID * vfid) file_log ("file_tempcache_pop_tran_file", "removed entry " FILE_TEMPCACHE_ENTRY_MSG, FILE_TEMPCACHE_ENTRY_AS_ARGS (entry)); + pthread_mutex_unlock (&file_Tempcache.local_mutex); return entry; } prev_entry = entry; } + pthread_mutex_unlock (&file_Tempcache.local_mutex); + /* should have found it */ assert_release (false); return NULL; @@ -9532,8 +9605,9 @@ file_tempcache_pop_tran_file (THREAD_ENTRY * thread_p, const VFID * vfid) STATIC_INLINE void file_tempcache_push_tran_file (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_ENTRY * entry) { - FILE_TEMPCACHE_ENTRY **tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; + FILE_TEMPCACHE_ENTRY **tran_files_p; + tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; entry->next = *tran_files_p; *tran_files_p = entry; @@ -9541,6 +9615,30 @@ file_tempcache_push_tran_file (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_ENTRY * e FILE_TEMPCACHE_ENTRY_AS_ARGS (entry)); } +/* + * file_tempcache_push_tran_file_with_lock () - push temporary file entry to transaction list + * + * return : void + * thread_p (in) : thread entry + * entry (in) : temporary cache entry + */ +STATIC_INLINE void +file_tempcache_push_tran_file_with_lock (THREAD_ENTRY * thread_p, FILE_TEMPCACHE_ENTRY * entry) +{ + FILE_TEMPCACHE_ENTRY **tran_files_p; + + pthread_mutex_lock (&file_Tempcache.local_mutex); + + tran_files_p = &file_Tempcache.tran_files[file_get_tempcache_entry_index (thread_p)]; + entry->next = *tran_files_p; + *tran_files_p = entry; + + pthread_mutex_unlock (&file_Tempcache.local_mutex); + + file_log ("file_tempcache_push_tran_file", "pushed entry " FILE_TEMPCACHE_ENTRY_MSG, + FILE_TEMPCACHE_ENTRY_AS_ARGS (entry)); +} + /* * file_get_tran_num_temp_files () - returns the number of temp file entries of the given transaction * diff --git a/src/storage/file_manager.h b/src/storage/file_manager.h index 4314fe8bd0..3f2d71a765 100644 --- a/src/storage/file_manager.h +++ b/src/storage/file_manager.h @@ -159,8 +159,8 @@ extern int file_create (THREAD_ENTRY * thread_p, FILE_TYPE file_type, FILE_TABLE extern int file_create_with_npages (THREAD_ENTRY * thread_p, FILE_TYPE file_type, int npages, FILE_DESCRIPTORS * des, VFID * vfid); extern int file_create_heap (THREAD_ENTRY * thread_p, bool reuse_oid, const OID * class_oid, VFID * vfid); -extern int file_create_temp (THREAD_ENTRY * thread_p, int npages, VFID * vfid); -extern int file_create_temp_numerable (THREAD_ENTRY * thread_p, int npages, VFID * vfid); +extern int file_create_temp (THREAD_ENTRY * thread_p, int npages, VFID * vfid, bool with_lock); +extern int file_create_temp_numerable (THREAD_ENTRY * thread_p, int npages, VFID * vfid, bool with_lock); extern int file_create_query_area (THREAD_ENTRY * thread_p, VFID * vfid); extern int file_create_ehash (THREAD_ENTRY * thread_p, int npages, bool is_tmp, FILE_EHASH_DES * des_ehash, VFID * vfid); @@ -169,7 +169,7 @@ extern int file_create_ehash_dir (THREAD_ENTRY * thread_p, int npages, bool is_t extern void file_postpone_destroy (THREAD_ENTRY * thread_p, const VFID * vfid); extern int file_destroy (THREAD_ENTRY * thread_p, const VFID * vfid, bool is_temp); -extern int file_temp_retire (THREAD_ENTRY * thread_p, const VFID * vfid); +extern int file_temp_retire (THREAD_ENTRY * thread_p, const VFID * vfid, bool with_lock); extern int file_temp_retire_preserved (THREAD_ENTRY * thread_p, const VFID * vfid); extern int file_init_page_type (THREAD_ENTRY * thread_p, PAGE_PTR page, void *args);