Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to safely re-enqueue an aborted job? #485

Open
sashkent3 opened this issue Nov 9, 2024 · 3 comments
Open

How to safely re-enqueue an aborted job? #485

sashkent3 opened this issue Nov 9, 2024 · 3 comments

Comments

@sashkent3
Copy link

sashkent3 commented Nov 9, 2024

I need to perform something similar to:

await Job("id", pool).abort()
await pool.enqueue_job("func", _job_id="id")

However, sometimes this can lead to the abort of the freshly enqueued job. From my observations, this always happens if the aborted job is not found. Waiting until a key in the abort queue expires (1 minute, not configurable) seems to help. My questions are then:

  1. Is waiting for arq.constants.abort_job_max_age guaranteed to be enough for the freshly enqueued job to not be aborted?
  2. Is it possible to not abort a job if it's not found? Simply checking the job.status() is a race condition.
@sashkent3
Copy link
Author

I believe I've found the culprit of my problem. Calling await Job("id", pool).abort() isn't ever safe. If the job isn't found, the specified job_id will remain in the abort_jobs_ss until a job with the specified id is run. At that point, the job will be "aborted before start".
The comment here is incorrect. Items in the abort_jobs_ss older than abort_job_max_age are not deleted. The line here seems rather confusing. The worker removes items from the abort_jobs_ss which are abort_job_max_age (60 ms) in the future. Such items can only appear if the Job.abort was called immediately after the above line but before the pipeline is executed.
If the only intended purpose here was to deliver on the abort_job_max_age comment's promise, the line should probably look like this:

pipe.zremrangebyscore(abort_jobs_ss, min=0, max=timestamp_ms() - abort_job_max_age)

I'm willing to submit a PR if the issue is confirmed and the proposed resolution is accepted by the maintainers.
Also, it seems like the value of the abort_job_max_age constant was specified in milliseconds by mistake, and the 60-second max-age was intended.

@drizzt
Copy link

drizzt commented Nov 23, 2024

hi, did you see if your line fixes the problem?

@sashkent3
Copy link
Author

hi, did you see if your line fixes the problem?

I did very limited testing so take it for what it's worth. But yes, the fix seems to be working for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants