5

PubSub のようなものが必要ですが、すべてのサブスクライバーにブロードキャストするのではなく、メッセージは 1 つのサブスクライバーにのみ送信されます (サブスクライバーは、受信バッファー内のメッセージの数に基づいて自動的に選択されることが望ましいです。低いほど良い)。

私が試みているのは、制御された数の分散ワーカーを使用して、数十万の http リクエストを送信することです。

4

2 に答える 2

4

これを解決するために私が最初に試みることは、ワーカーにリクエストをプッシュさせるのではなく、リクエストをプルさせることです。

Agentしたがって、リクエストを追加および取得するための API で実行される http リクエストのリストを保持するグローバルに登録されたものがあります。次に、この段階でプールボーイを追加するのではなく、 andをworker(Task, ...)使用してN ワーカーを開始します。各ワーカーは、http リクエストを作成して必要な作業を行った後、正常に終了し、スーパーバイザーによって再起動され、新しい URL を要求するように に要求します。Supervisorone_for_oneAgent

http タスクをエージェントにプッシュするのではなく、エージェントのリストからプルするワーカーは、やるべき仕事がある場合に利用可能なワーカーが常にビジーであることを保証します。

解決策が良さそうなら、プールボーイの追加を検討します。スーパーバイザーのオプションには注意が必要です。そのため、ワーカーがクラッシュする原因となる一連の悪い URL によって、スーパーバイザーが他のすべてを停止することはありません。

于 2015-01-06T12:06:15.693 に答える
2

私のコメントで述べたように、私のアプローチは Poolboy を使用してワーカーを処理することですが、N 個のワーカー (N は要求された URL の数) を要求するだけではプロセスの制限をすぐに超えてしまい、チェックアウト要求がタイムアウト。代わりに、ワーカーが使用可能かどうかを確認し、使用可能であれば URL を非同期に要求するループが必要です。空いているワーカーがない場合は、しばらくスリープしてから再試行する必要があります。

この目的のために、Poolboy には:poolboy.checkout/2関数があり、2 番目のパラメーターを使用して、ブロックするかどうかを指定できます。使用可能なワーカーがない場合は が返され:fullます。それ以外の場合は、ワーカー pid が返されます。

例:

def crawl_parallel(urls) do
  urls
  |> Enum.map(&crawl_task/1)
  |> Enum.map(&Task.await/1)
end

defp crawl_task(url) do
  case :poolboy.checkout Crawler, false do
    :full ->
      # No free workers, wait a bit and retry
      :timer.sleep 100
      crawl_task url
    worker_pid -> 
      # We have a worker, asynchronously crawl the url
      Task.async fn ->
        Crawler.Worker.crawl worker_pid, url
        :poolboy.checkin Crawler, worker_pid
      end
  end
end
于 2015-01-06T13:04:26.080 に答える