Might be easier to explain the process and the high level steps, give a sample implementation (a stripped down version of one that I use), and then talk about throw and catch:
- Insert the raw csv rows with an incrementing index (to be able to resume from a specific row/index later)
- Process the CSV stopping every 'chunk' to check if the job is done by checking if
Sidekiq::Fetcher.done?
returns true
- When the fetcher is
done?
, store the index of the currently processed item on the user and return so that the job completes
and control is returned to sidekiq.
- Note that if a job is still running after a short timeout (default 20s) the job will be killed.
- Then when the job runs again simply, start where you left off last time (or at 0)
Example:
class UserCSVImportWorker
include Sidekiq::Worker
def perform(user_id)
user = User.find(user_id)
items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
items.each_with_index do |item, i|
if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
user.update(last_csv_index: item.index)
return
end
# Process the item as normal
end
end
end
The above class makes sure that each 100 items we check that the fetcher is not done (a proxy for if shutdown has been started), and ends execution of the job. Before the execution ends however we update the user with the last index
that has been processed so that we can start where we left off next time.
throw catch is a way to implement this above functionality a little cleaner (maybe) but is a little like using Fibers, nice concept but hard to wrap your head around. Technically throw catch is more like goto than most people are generally comfortable with.
edit
Also you could not make call to Sidekiq::Fetcher.done?
and record the last_csv_index
on each row or on each chunk of rows processed, that way if your worker is killed without having the opportunity to record the last_csv_index
you can still resume 'close' to where you left off.