Skip to content

Commit

Permalink
Fixed processing of the squeeze.max_xlock_time parameter.
Browse files Browse the repository at this point in the history
1. So far, the value of the parameter was not passed to the worker at all.

2. The worker coding was such that the parameter was only checked during
logical decoding, but not when applying the data changes.
  • Loading branch information
Antonin Houska committed Jun 13, 2024
1 parent 3f6c271 commit 6e1f4f4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
41 changes: 36 additions & 5 deletions concurrent.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ extern PGDLLIMPORT int wal_segment_size;

static void apply_concurrent_changes(DecodingOutputState *dstate,
Relation relation, ScanKey key,
int nkeys, IndexInsertState *iistate);
int nkeys, IndexInsertState *iistate,
struct timeval *must_complete);
static bool processing_time_elapsed(struct timeval *utmost);

static void plugin_startup(LogicalDecodingContext *ctx,
Expand Down Expand Up @@ -64,6 +65,16 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
bool done;

dstate = (DecodingOutputState *) ctx->output_writer_private;

/*
* If some changes could not be applied due to time constraint, make sure
* the tuplestore is empty before we insert new tuples into it.
*/
if (dstate->nchanges > 0)
apply_concurrent_changes(dstate, rel_dst, ident_key,
ident_key_nentries, iistate, NULL);
Assert(dstate->nchanges == 0);

done = false;
while (!done)
{
Expand All @@ -87,7 +98,11 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
* non-trivial.
*/
apply_concurrent_changes(dstate, rel_dst, ident_key,
ident_key_nentries, iistate);
ident_key_nentries, iistate, must_complete);

if (processing_time_elapsed(must_complete))
/* Like above. */
return false;
}

return true;
Expand Down Expand Up @@ -189,7 +204,8 @@ decode_concurrent_changes(LogicalDecodingContext *ctx,
*/
static void
apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
ScanKey key, int nkeys, IndexInsertState *iistate)
ScanKey key, int nkeys, IndexInsertState *iistate,
struct timeval *must_complete)
{
TupleTableSlot *slot;
#if PG_VERSION_NUM >= 120000
Expand Down Expand Up @@ -240,6 +256,9 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
bool isnull[1];
Datum values[1];

Assert(dstate->nchanges > 0);
dstate->nchanges--;

/* Get the change from the single-column tuple. */
#if PG_VERSION_NUM >= 120000
tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
Expand Down Expand Up @@ -483,10 +502,22 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
Assert(shouldFree);
pfree(tup_change);
#endif

/*
* If there is a limit on the time of completion, check it
* now. However, make sure the loop does not break if tup_old was set
* in the previous iteration. In such a case we could not resume the
* processing in the next call.
*/
if (must_complete && tup_old == NULL &&
processing_time_elapsed(must_complete))
/* The next call will process the remaining changes. */
break;
}

tuplestore_clear(dstate->tstore);
dstate->nchanges = 0;
/* If we could not apply all the changes, the next call will do. */
if (dstate->nchanges == 0)
tuplestore_clear(dstate->tstore);

PopActiveSnapshot();

Expand Down
4 changes: 4 additions & 0 deletions pg_squeeze.c
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,10 @@ perform_final_merge(Oid relid_src, Oid *indexes_src, int nindexes,
cat_state, rel_dst, ident_key,
ident_key_nentries, iistate,
AccessExclusiveLock, NULL);

/* No time constraint, all changes must have been processed. */
Assert(((DecodingOutputState *)
ctx->output_writer_private)->nchanges == 0);
}

return success;
Expand Down
2 changes: 2 additions & 0 deletions pg_squeeze.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "utils/resowner.h"
#include "utils/snapmgr.h"

extern int squeeze_max_xlock_time;

typedef enum
{
PG_SQUEEZE_CHANGE_INSERT,
Expand Down
19 changes: 15 additions & 4 deletions worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct WorkerTask
NameData relname;
NameData indname; /* clustering index */
NameData tbspname; /* destination tablespace */
int max_xlock_time;

/*
* index destination tablespaces.
Expand Down Expand Up @@ -166,12 +167,14 @@ typedef struct TaskDetails
ArrayType *ind_tbsps;
bool last_try;
bool skip_analyze;
int max_xlock_time;
} TaskDetails;

static void init_task_details(TaskDetails *task, int32 task_id,
Name relschema, Name relname, Name cl_index,
Name rel_tbsp, ArrayType *ind_tbsps,
bool last_try, bool skip_analyze);
bool last_try, bool skip_analyze,
int max_xlock_time);
static void squeeze_handle_error_app(ErrorData *edata, TaskDetails *td);
static void release_task(void);

Expand Down Expand Up @@ -502,6 +505,7 @@ squeeze_table_new(PG_FUNCTION_ARGS)
memcpy(task->ind_tbsps, ind_tbsps, VARSIZE(ind_tbsps));
else
SET_VARSIZE(task->ind_tbsps, 0);
task->max_xlock_time = squeeze_max_xlock_time;

task_id = task->id;
LWLockRelease(task->lock);
Expand Down Expand Up @@ -1046,7 +1050,7 @@ process_tasks(MemoryContext task_cxt)
}

init_task_details(tasks, 0, relschema, relname, cl_index, rel_tbsp,
ind_tbsps, false, false);
ind_tbsps, false, false, task->max_xlock_time);
MemoryContextSwitchTo(oldcxt);

/* No other worker should pick this task. */
Expand Down Expand Up @@ -1176,7 +1180,10 @@ LIMIT %d", TASK_BATCH_SIZE);

init_task_details(&tasks[i], task_id, relschema, relname,
cl_index, rel_tbsp, ind_tbsps, last_try,
skip_analyze);
skip_analyze,
/* XXX Should max_xlock_time be added to
* squeeze.tables ? */
0);

}
MemoryContextSwitchTo(oldcxt);
Expand Down Expand Up @@ -1293,6 +1300,8 @@ LIMIT %d", TASK_BATCH_SIZE);
if (strlen(NameStr(td->rel_tbsp)) > 0)
rel_tbsp = &td->rel_tbsp;

squeeze_max_xlock_time = td->max_xlock_time;

/* Perform the actual work. */
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
Expand Down Expand Up @@ -1529,7 +1538,8 @@ run_command(char *command, int rc)
static void
init_task_details(TaskDetails *task, int32 task_id, Name relschema,
Name relname, Name cl_index, Name rel_tbsp,
ArrayType *ind_tbsps, bool last_try, bool skip_analyze)
ArrayType *ind_tbsps, bool last_try, bool skip_analyze,
int max_xlock_time)
{
memset(task, 0, sizeof(TaskDetails));
task->id = task_id;
Expand All @@ -1542,6 +1552,7 @@ init_task_details(TaskDetails *task, int32 task_id, Name relschema,
task->ind_tbsps = ind_tbsps;
task->last_try = last_try;
task->skip_analyze = skip_analyze;
task->max_xlock_time = max_xlock_time;
}

#define ACTIVE_WORKERS_RES_ATTRS 7
Expand Down

0 comments on commit 6e1f4f4

Please sign in to comment.