Replies: 2 comments 4 replies
-
Having just implemented such a solution with sidekiq-batch I must recommend you create a database backed locking mechanism. You'd want to unlock as part of a batch callback when the batch is done. Then you have an unlock status and set it to errored if the batch had failures or success if the batch is lacking failures. |
Beta Was this translation helpful? Give feedback.
-
Here is some example code for you. I have one setup like this for every resource; the thing is, due to the rate limit, I won't get to unlock the batch until hours after I started it, and we can't have multiple of these running at the same time. At the same time, I couldn't NOT process these jobs, so I resorted to a database-backed lock. #
# Raised when an integration lock can't be created
#
# @author Mikael Henriksson <[email protected]>
#
class Unlockable < RuntimeError
CONCURRENT_MESSAGE = "Skipping occurrence due to concurrent locks"
def initialize(message = nil)
super(CONCURRENT_MESSAGE)
end
end
#
# Handles shared logic to clear the integration lock when batch is completed.
#
# @author Mikael Henriksson <[email protected]>
#
class IntegrationCallback
def on_complete(status, options)
options.deep_symbolize_keys!
integration_lock = IntegrationLock.locked.find_by(id: options[:integration_lock_id])
if status.failures.positive?
integration_lock&.error!
else
integration_lock&.finish!
end
end
def on_success(status, options)
options.deep_symbolize_keys!
integration_lock = IntegrationLock.locked.find_by(id: options[:integration_lock_id])
integration_lock&.finish! if status.failures.zero?
end
end
#
# Collection service that handles a batch of upserts
#
# @author Mikael Henriksson <[email protected]>
#
class DownloadResources
def initialize(provider)
@provider = provider
end
def call
raise Unlockable unless (@lock = create_lock("product"))
self.callback_args = { integration_lock_id: lock.id }
batch.description = "Download resources to Application (#{site.subdomain})"
batch.on(:success, IntegrationCallback, callback_args)
batch.on(:complete, IntegrationCallback, callback_args)
batch.jobs do
# Start where we left off
timestamp = provider.previously_started_at
# Fallback to synchronizing everything
timestamp = nil if provider.never_successful?
enqueue_variants_for_download(1, since: timestamp, counter: 0)
end
rescue => ex
lock&.error!
raise
end
private
attr_accessor :callback_args
attr_reader :lock, :provider, :updated_products
def enqueue_resources_for_download(page, since:)
resources, next_page = client.get_resources(page: page, since: since, limit: 500)
resources = resources&.fetch("Resources", [])
resources.in_groups_of(25) do |resource_group|
worker_args = to_sidekiq_args(resource_group.compact)
bulk_args = { "class" => UpsertResource::Worker.name, "args" => worker_args }
# NOTE: We can process around 25 calls per 5 seconds without rate limit but..
# ... the first 25 can be pushed directly
bulk_args["at"] = Scheduler.compute_delay(provider.scheduler_key).seconds.from_now.to_f
Sidekiq::Client.push_bulk(bulk_args)
end
enqueue_resources_for_download(next_page, since: since) if next_page
end
def to_sidekiq_args(resource_group)
resource_group.map do |article|
next unless resource["WebshopResource"]
[provider.id, resource["ResourceNumber"]]
end.compact
end
end
#
# Handles upsert of single resource
#
# @author Mikael Henriksson <[email protected]>
#
class UpsertResource
#
# Sidekiq worker that delegates to service object
#
# @author Mikael Henriksson <[email protected]>
#
class Worker
include Sidekiq::Worker
sidekiq_options queue: :high, retry: 25
def perform(provider_id, resource_nr)
provider = Provider.find(provider_id)
UpsertResource.call(provider, resource_nr)
end
end
def self.call(provider, resource_nr)
new(provider, resource_nr).call
end
attr_reader :provider, :client
attr_reader :resource_nr, :resource
def initialize(provider, resource_nr)
@client = provider.client
@provider = provider
@resource_nr = resource_nr
end
def call
@resource = client.get_resource(resource_nr)
#
# ... ... ... PROCESSING ... ... ...
#
rescue ActiveRecord::RecordInvalid => ex
Vigil.capture(ex, site: site.subdomain)
end
end The way this then kicks of is by: DownloadResources.call(provider) This creates a batch, spreads a load of each worker in the batch throughout a couple of hours, and finally clears the lock with an appropriate message. Not sure if I can hook into the sidekiq-batches and provide uniqueness here or if it even makes sense. The thing is, I needed to maintain some history of what is going on in the system, and redis + sidekiq isn't as good at that. Also, since I track the time from batch started and batch completed via the integration lock, I also benefit from some metrics. |
Beta Was this translation helpful? Give feedback.
-
We have a job that we want a unique lock on. So, UntilAndWhileExecuting works well for this scenario. But this is a large job, so we want to fan out to a bunch of child jobs. The problem then is that the lock will be unlocked as soon the perform method of the parent job exits.
We really want the lock to be released once all the child jobs are complete. One idea I had was to use sidekiq-batch because it has an on_complete callback. So, use a custom lock that actually won't unlock at the end. And have the callback manually call unlock.
I'm not certain this would actually work though.
Has anyone else dealt with this scenario before?
Beta Was this translation helpful? Give feedback.
All reactions