PubSub のようなものが必要ですが、すべてのサブスクライバーにブロードキャストするのではなく、メッセージは 1 つのサブスクライバーにのみ送信されます (サブスクライバーは、受信バッファー内のメッセージの数に基づいて自動的に選択されることが望ましいです。低いほど良い)。
私が試みているのは、制御された数の分散ワーカーを使用して、数十万の http リクエストを送信することです。
PubSub のようなものが必要ですが、すべてのサブスクライバーにブロードキャストするのではなく、メッセージは 1 つのサブスクライバーにのみ送信されます (サブスクライバーは、受信バッファー内のメッセージの数に基づいて自動的に選択されることが望ましいです。低いほど良い)。
私が試みているのは、制御された数の分散ワーカーを使用して、数十万の http リクエストを送信することです。
これを解決するために私が最初に試みることは、ワーカーにリクエストをプッシュさせるのではなく、リクエストをプルさせることです。
Agent
したがって、リクエストを追加および取得するための API で実行される http リクエストのリストを保持するグローバルに登録されたものがあります。次に、この段階でプールボーイを追加するのではなく、 andをworker(Task, ...)
使用してN ワーカーを開始します。各ワーカーは、http リクエストを作成して必要な作業を行った後、正常に終了し、スーパーバイザーによって再起動され、新しい URL を要求するように に要求します。Supervisor
one_for_one
Agent
http タスクをエージェントにプッシュするのではなく、エージェントのリストからプルするワーカーは、やるべき仕事がある場合に利用可能なワーカーが常にビジーであることを保証します。
解決策が良さそうなら、プールボーイの追加を検討します。スーパーバイザーのオプションには注意が必要です。そのため、ワーカーがクラッシュする原因となる一連の悪い URL によって、スーパーバイザーが他のすべてを停止することはありません。
私のコメントで述べたように、私のアプローチは Poolboy を使用してワーカーを処理することですが、N 個のワーカー (N は要求された URL の数) を要求するだけではプロセスの制限をすぐに超えてしまい、チェックアウト要求がタイムアウト。代わりに、ワーカーが使用可能かどうかを確認し、使用可能であれば URL を非同期に要求するループが必要です。空いているワーカーがない場合は、しばらくスリープしてから再試行する必要があります。
この目的のために、Poolboy には:poolboy.checkout/2
関数があり、2 番目のパラメーターを使用して、ブロックするかどうかを指定できます。使用可能なワーカーがない場合は が返され:full
ます。それ以外の場合は、ワーカー pid が返されます。
例:
def crawl_parallel(urls) do
urls
|> Enum.map(&crawl_task/1)
|> Enum.map(&Task.await/1)
end
defp crawl_task(url) do
case :poolboy.checkout Crawler, false do
:full ->
# No free workers, wait a bit and retry
:timer.sleep 100
crawl_task url
worker_pid ->
# We have a worker, asynchronously crawl the url
Task.async fn ->
Crawler.Worker.crawl worker_pid, url
:poolboy.checkin Crawler, worker_pid
end
end
end