3

分散 cronjob 実行用のシステム (いわゆるcron コンピューティング クラスター) を実装しています。アクションの時間になったら、Cronjobs をメッセージ キュー (RabbitMQ) に入れる必要があります。反対側 (クラスターのノード/ワーカー) はAnyEvent::RabbitMQ、メッセージ キューから正確に 1 つの cronjob/タスク/メッセージを受信し、タスクを処理し、メッセージ キューから正確に 1 つの cronjob/タスク/メッセージを要求するために利用する Perl デーモンです。すぐ。

AnyEvent::RabbitMQ私は、RabbitMQが壊れた接続を識別するのを助けるために実装されている RabbitMQ のハートビート機能を使用しています。

ハートビート間隔の実際の値は気にしないでください! また、数日かかる非常に長時間実行されるジョブもあります。したがって、間隔を cronjob にかかる最長の時間に設定することはオプションではありません。

Perl デーモン ワーカー内で実際の cronjob を実行するには、次のスニペットを参照してください。メッセージに対してRabbitMQをDoSしないように、「AnyEvent->timer」内に実装されています。このメソッドは、RabbitMQconsumeが (管理によって) 禁止されていたため使用されました。

sub _timer_tick {

  $rabbitmq_channel->get(
    queue      => 'job_queue',
    on_success => sub {
      my ($amqp_method) = @_;
      if ( not $amqp_method->{empty} ) {
        pause_timer();
        progress_job($amqp_method);
        resume_timer();
      }
    },
    on_failure => sub { $quit_programm->send( 'RABBITMQ_ERROR', @_ ) },
  );

  return;
}

progress_job()メッセージが解析され、ジョブが実行される場所です。pause_timer()をトリガーし、 をresume_timer()制御します。AnyEvent->timer_timer_tick()

use Capture::Tiny 'capture';
sub progress_job {
  my ($amqp_method) = @_;
  my $job = decode_json( $amqp_method->{body}->to_raw_payload() );
  my ( $stdout, $stderr, $exit ) = capture {
    system $job->{execute};
  };
  return;
}

最初の長時間実行ジョブが開始され、システムが「クラッシュ」してさまざまなエラー メッセージが表示されました。'Unknown channel id: 1' をスローすることもあれば、'Channel has already been closed' をスローすることもあります。そこで、「ダム デバッグ」(構成をいじろうとする) を実行したところ、間隔がこれらのエラーheartbeat内の所要時間よりも短い場合にスローされることがわかりました。progress_job()少し考えた後、それは理にかなっています。progress_job()はブロッキング サブルーチンであり、AnyEvent はハートビート パッケージを RabbitMQ に送信できません。

progress_job()ブロッキング ヒートビートの問題を解決するために最初に考えたのは、子プロセスでfork して doすることでした。FORK に関する AnyEvents のドキュメントでforkは、子内で (AnyEvent などを介して) イベント システムにアクセスできない場合に使用するために保存されていることが指摘されています。次の考え: OK、イベント システムにアクセスできないので、fork を実行できます。resume_timer()BUT: タイマーは ( )progress_job()が戻った後に再開する必要があります。理論的には、リターンの後ではなく、resume_timer()直後に呼び出されます。だから私は私の実装を停止しました。fork()progress_job()

私の質問: 最後のビットを解決するにはどうすればよいですか? (つまり、フォークされた子) が戻ったresume_timer()後はどうすればよいですか? progress_job()分岐が原因で子に入れることができずresume_timer()、イベントシステムはスレッドセーフではありません。

4

1 に答える 1