From f2e5edc5d3f9481a9636d967d5cbe7ac191995eb Mon Sep 17 00:00:00 2001 From: Caius Durling Date: Wed, 31 Jan 2024 03:38:12 +0000 Subject: [PATCH] Optimize JobWrapper queueing ActiveJobs (#35) This PR introduces a change to avoid calling `ActiveJob::Base#deserialize` during queueing the job whilst maintaining existing functionality. The change is in `JobWrapper`, which previously only took in a hash of `job_data` and knew how to deserialize it into a job object so we can interrogate it for `queue_name` or `max_attempts`, etc. To avoid the roundtrip through serialization, we change `JobWrapper#initialize` to take in either the job object directly, or a hash as it did previously. When it's a hash (needed when running the job in the worker), it still deserializes it into a job object. When it's a job object just assign it to `@job` directly and continue. We discovered this through having a second-order effect in `#deserialize` using it to trigger some logic as the job comes off the queue, but we were then seeing the logic run multiple times for one job execution. Tracked it back to the `JobClass.perform_later` calling `#deserialize` and being majorly confused why it was deserialising in the web process. This will make enqueueing jobs faster too, because we don't need a roundtrip through `#serialize`, `#deserialize` as well as allocating less objects. In practice this is likely a small effect, but we get it for free avoiding the behaviour above. Looking at all the built-in adapters in ActiveJob, as well as good_job and solid_queue, they all avoid calling `job.serialize` during the enqueuing, even those adapters that support invoking methods on the Job class instance like delayed does. From one angle it's an optimization, from another angle it's a bugfix (: --- CHANGELOG.md | 21 +++++++++++++++++++++ delayed.gemspec | 2 +- lib/delayed/active_job_adapter.rb | 2 +- lib/delayed/job_wrapper.rb | 12 ++++++++++-- spec/delayed/active_job_adapter_spec.rb | 10 ++++++++++ 5 files changed, 43 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61336ef..8aa7f1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,27 @@ and this project aims to adhere to [Semantic Versioning](http://semver.org/spec/ ### Removed ### Fixed +## [0.5.3] - 2024-01-12 +### Changed + +- Bring ActiveJob queue adapter's enqueuing behaviour inline with delayed_job & + upstream adapters, by stopping it serializing & deserializing the job instance + during the enqueue process. Optimises the enqueueing of jobs slightly too. + +## [0.5.2] - 2023-10-19 +### Fixed + +- Fixed regression for Rails 7.1.1 not executing jobs, due to lazy loading change in Rails. + The `JobWrapper` needs to be loaded before `ActiveJob::Base` is loaded, but this wasn't + happening in the worker processes. Fix by extracting `JobWrapper` into it's own file and + loading when `Delayed` is loaded earlier in the process. + +## [0.5.1] - 2023-10-11 +### Changed + +- Explicitly test Rails 7.1 and Ruby 3.1, Ruby 3.2 to verify compatability. Previous + gems were compatible, but this release is now verified to be compatible. + ## [0.5.0] - 2023-01-20 ### Changed - Reduced handler size by excluding redundant 'job:' key (only 'job_data:' is diff --git a/delayed.gemspec b/delayed.gemspec index 0e4b0b9..e0a046f 100644 --- a/delayed.gemspec +++ b/delayed.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day' - spec.version = '0.5.2' + spec.version = '0.5.3' spec.metadata = { 'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md', 'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues', diff --git a/lib/delayed/active_job_adapter.rb b/lib/delayed/active_job_adapter.rb index 67b72c0..12d8765 100644 --- a/lib/delayed/active_job_adapter.rb +++ b/lib/delayed/active_job_adapter.rb @@ -14,7 +14,7 @@ def _enqueue(job, opts = {}) opts.merge!({ queue: job.queue_name, priority: job.priority }.compact) .merge!(job.provider_attributes || {}) - Delayed::Job.enqueue(JobWrapper.new(job.serialize), opts).tap do |dj| + Delayed::Job.enqueue(JobWrapper.new(job), opts).tap do |dj| job.provider_job_id = dj.id end end diff --git a/lib/delayed/job_wrapper.rb b/lib/delayed/job_wrapper.rb index 09030d5..4cb2b5e 100644 --- a/lib/delayed/job_wrapper.rb +++ b/lib/delayed/job_wrapper.rb @@ -4,8 +4,16 @@ class JobWrapper # rubocop:disable Betterment/ActiveJobPerformable delegate_missing_to :job - def initialize(job_data) - @job_data = job_data + def initialize(job_or_data) + # During enqueue the job instance is passed in directly, saves us deserializing + # it to find out how to queue the job. + # During load from the db, we get a data hash passed in so deserialize lazily. + if job_or_data.is_a?(ActiveJob::Base) + @job = job_or_data + @job_data = job_or_data.serialize + else + @job_data = job_or_data + end end def display_name diff --git a/spec/delayed/active_job_adapter_spec.rb b/spec/delayed/active_job_adapter_spec.rb index 45cea97..53afa01 100644 --- a/spec/delayed/active_job_adapter_spec.rb +++ b/spec/delayed/active_job_adapter_spec.rb @@ -23,6 +23,16 @@ def perform; end ActiveJob::Base.queue_adapter = adapter_was end + it "does not invoke #deserialize during enqueue" do # rubocop:disable RSpec/NoExpectationExample + JobClass.include(Module.new do + def deserialize(*) + raise "uh oh, deserialize called during enqueue!" + end + end) + + JobClass.perform_later + end + it 'serializes a JobWrapper in the handler with expected fields' do Timecop.freeze('2023-01-20T18:52:29Z') do JobClass.perform_later