2

以下の更新された質問を参照してください。

元の質問:

私の現在の Rails プロジェクトでは、大きな xml/csv データ ファイルを解析し、それを mongodb に保存する必要があります。今、私はこの手順を使用します:

  1. ユーザーからアップロードされたファイルを受け取り、データをmongodbに保存します
  2. sidekiq を使用して、mongodb 内のデータの非同期処理を実行します。
  3. 処理が完了したら、生データを削除します。

localhost の小規模および中規模のデータの場合、上記の手順は適切に実行されます。しかし、heroku では、hirefire を使用してワーカー dyno を動的に上下にスケーリングします。ワーカーがまだ大きなデータを処理しているときに、hirefire は空のキューを確認し、ワーカーの dyno をスケールダウンします。これにより、プロセスに kill シグナルが送信され、プロセスが不完全な状態のままになります。

解析を行うためのより良い方法を探しており、解析プロセスがいつでも強制終了され (kill シグナルを受信したときに現在の状態を保存する)、プロセスが再キューイングされるようにしています。

現在、私は Model.delay.parse_file を使用していますが、再キューイングされません。

アップデート

sidekiq wiki を読んだ後、ジョブ制御に関する記事を見つけました。コード、その仕組み、および SIGTERM シグナルを受信して​​ワーカーが再キューイングされたときに状態を保持する方法を説明できる人はいますか?

ジョブの終了を処理し、現在の状態を保存し、最後の位置から続行する別の方法はありますか?

ありがとう、

4

2 に答える 2

6

Might be easier to explain the process and the high level steps, give a sample implementation (a stripped down version of one that I use), and then talk about throw and catch:

  1. Insert the raw csv rows with an incrementing index (to be able to resume from a specific row/index later)
  2. Process the CSV stopping every 'chunk' to check if the job is done by checking if Sidekiq::Fetcher.done? returns true
  3. When the fetcher is done?, store the index of the currently processed item on the user and return so that the job completes and control is returned to sidekiq.
  4. Note that if a job is still running after a short timeout (default 20s) the job will be killed.
  5. Then when the job runs again simply, start where you left off last time (or at 0)

Example:

    class UserCSVImportWorker
      include Sidekiq::Worker

      def perform(user_id)
        user = User.find(user_id)

        items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
        items.each_with_index do |item, i|
          if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
            user.update(last_csv_index: item.index)

            return
          end

          # Process the item as normal
        end
      end
    end

The above class makes sure that each 100 items we check that the fetcher is not done (a proxy for if shutdown has been started), and ends execution of the job. Before the execution ends however we update the user with the last index that has been processed so that we can start where we left off next time.

throw catch is a way to implement this above functionality a little cleaner (maybe) but is a little like using Fibers, nice concept but hard to wrap your head around. Technically throw catch is more like goto than most people are generally comfortable with.

edit

Also you could not make call to Sidekiq::Fetcher.done? and record the last_csv_index on each row or on each chunk of rows processed, that way if your worker is killed without having the opportunity to record the last_csv_index you can still resume 'close' to where you left off.

于 2014-07-08T15:18:46.310 に答える
3

冪等性の概念に対処しようとしています。これは、不完全なサイクルの可能性があるものを複数回処理しても問題が発生しないという考えです。( https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-jobs-idempotent-and-transactional )

可能な前進

  1. ファイルをパーツに分割し、パーツごとのジョブでそれらのパーツを処理します。
  2. ジョブが完全に完了する可能性が高いときにスケーリングされるように、採用のしきい値を引き上げます (10 分)
  3. ジョブの実行中にハイヤーファイアがスケールダウンすることを許可しない (開始時に redis キーを設定し、完了時にクリアする)
  4. 処理中のジョブの進行状況を追跡し、ジョブが強制終了された場合は中断したところから再開します。
于 2014-07-05T04:04:45.397 に答える