diff --git a/concurrent.c b/concurrent.c index 60b1719..b833832 100644 --- a/concurrent.c +++ b/concurrent.c @@ -25,7 +25,8 @@ extern PGDLLIMPORT int wal_segment_size; static void apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, ScanKey key, - int nkeys, IndexInsertState *iistate); + int nkeys, IndexInsertState *iistate, + struct timeval *must_complete); static bool processing_time_elapsed(struct timeval *utmost); static void plugin_startup(LogicalDecodingContext *ctx, @@ -64,6 +65,16 @@ process_concurrent_changes(LogicalDecodingContext *ctx, bool done; dstate = (DecodingOutputState *) ctx->output_writer_private; + + /* + * If some changes could not be applied due to time constraint, make sure + * the tuplestore is empty before we insert new tuples into it. + */ + if (dstate->nchanges > 0) + apply_concurrent_changes(dstate, rel_dst, ident_key, + ident_key_nentries, iistate, NULL); + Assert(dstate->nchanges == 0); + done = false; while (!done) { @@ -87,7 +98,11 @@ process_concurrent_changes(LogicalDecodingContext *ctx, * non-trivial. */ apply_concurrent_changes(dstate, rel_dst, ident_key, - ident_key_nentries, iistate); + ident_key_nentries, iistate, must_complete); + + if (processing_time_elapsed(must_complete)) + /* Like above. */ + return false; } return true; @@ -189,7 +204,8 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, */ static void apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, - ScanKey key, int nkeys, IndexInsertState *iistate) + ScanKey key, int nkeys, IndexInsertState *iistate, + struct timeval *must_complete) { TupleTableSlot *slot; #if PG_VERSION_NUM >= 120000 @@ -240,6 +256,9 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, bool isnull[1]; Datum values[1]; + Assert(dstate->nchanges > 0); + dstate->nchanges--; + /* Get the change from the single-column tuple. */ #if PG_VERSION_NUM >= 120000 tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree); @@ -483,10 +502,22 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation, Assert(shouldFree); pfree(tup_change); #endif + + /* + * If there is a limit on the time of completion, check it + * now. However, make sure the loop does not break if tup_old was set + * in the previous iteration. In such a case we could not resume the + * processing in the next call. + */ + if (must_complete && tup_old == NULL && + processing_time_elapsed(must_complete)) + /* The next call will process the remaining changes. */ + break; } - tuplestore_clear(dstate->tstore); - dstate->nchanges = 0; + /* If we could not apply all the changes, the next call will do. */ + if (dstate->nchanges == 0) + tuplestore_clear(dstate->tstore); PopActiveSnapshot(); diff --git a/pg_squeeze.c b/pg_squeeze.c index 9f32089..529c3e6 100644 --- a/pg_squeeze.c +++ b/pg_squeeze.c @@ -3193,6 +3193,10 @@ perform_final_merge(Oid relid_src, Oid *indexes_src, int nindexes, cat_state, rel_dst, ident_key, ident_key_nentries, iistate, AccessExclusiveLock, NULL); + + /* No time constraint, all changes must have been processed. */ + Assert(((DecodingOutputState *) + ctx->output_writer_private)->nchanges == 0); } return success; diff --git a/pg_squeeze.h b/pg_squeeze.h index 7caf1fb..aa59905 100644 --- a/pg_squeeze.h +++ b/pg_squeeze.h @@ -33,6 +33,8 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" +extern int squeeze_max_xlock_time; + typedef enum { PG_SQUEEZE_CHANGE_INSERT, diff --git a/worker.c b/worker.c index dea2622..3f949c5 100644 --- a/worker.c +++ b/worker.c @@ -92,6 +92,7 @@ typedef struct WorkerTask NameData relname; NameData indname; /* clustering index */ NameData tbspname; /* destination tablespace */ + int max_xlock_time; /* * index destination tablespaces. @@ -166,12 +167,14 @@ typedef struct TaskDetails ArrayType *ind_tbsps; bool last_try; bool skip_analyze; + int max_xlock_time; } TaskDetails; static void init_task_details(TaskDetails *task, int32 task_id, Name relschema, Name relname, Name cl_index, Name rel_tbsp, ArrayType *ind_tbsps, - bool last_try, bool skip_analyze); + bool last_try, bool skip_analyze, + int max_xlock_time); static void squeeze_handle_error_app(ErrorData *edata, TaskDetails *td); static void release_task(void); @@ -502,6 +505,7 @@ squeeze_table_new(PG_FUNCTION_ARGS) memcpy(task->ind_tbsps, ind_tbsps, VARSIZE(ind_tbsps)); else SET_VARSIZE(task->ind_tbsps, 0); + task->max_xlock_time = squeeze_max_xlock_time; task_id = task->id; LWLockRelease(task->lock); @@ -1046,7 +1050,7 @@ process_tasks(MemoryContext task_cxt) } init_task_details(tasks, 0, relschema, relname, cl_index, rel_tbsp, - ind_tbsps, false, false); + ind_tbsps, false, false, task->max_xlock_time); MemoryContextSwitchTo(oldcxt); /* No other worker should pick this task. */ @@ -1176,7 +1180,10 @@ LIMIT %d", TASK_BATCH_SIZE); init_task_details(&tasks[i], task_id, relschema, relname, cl_index, rel_tbsp, ind_tbsps, last_try, - skip_analyze); + skip_analyze, + /* XXX Should max_xlock_time be added to + * squeeze.tables ? */ + 0); } MemoryContextSwitchTo(oldcxt); @@ -1293,6 +1300,8 @@ LIMIT %d", TASK_BATCH_SIZE); if (strlen(NameStr(td->rel_tbsp)) > 0) rel_tbsp = &td->rel_tbsp; + squeeze_max_xlock_time = td->max_xlock_time; + /* Perform the actual work. */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); @@ -1529,7 +1538,8 @@ run_command(char *command, int rc) static void init_task_details(TaskDetails *task, int32 task_id, Name relschema, Name relname, Name cl_index, Name rel_tbsp, - ArrayType *ind_tbsps, bool last_try, bool skip_analyze) + ArrayType *ind_tbsps, bool last_try, bool skip_analyze, + int max_xlock_time) { memset(task, 0, sizeof(TaskDetails)); task->id = task_id; @@ -1542,6 +1552,7 @@ init_task_details(TaskDetails *task, int32 task_id, Name relschema, task->ind_tbsps = ind_tbsps; task->last_try = last_try; task->skip_analyze = skip_analyze; + task->max_xlock_time = max_xlock_time; } #define ACTIVE_WORKERS_RES_ATTRS 7