5

潜在的に高いレートでポイントを生成するデータソースがあり、各ポイントで時間のかかる操作を実行したいと思います。ただし、システムが過負荷になったときに、余分なデータポイントを削除することで、システムを適切に劣化させたいと思います。

私の知る限り、gen_eventを使用してもイベントがスキップされることはありません。概念的には、gen_eventで実行したいのは、ハンドラーを再度実行する前に、最新の保留中のイベントを除くすべてを削除することです。

標準のOTPでこれを行う方法はありますか?それとも私がそのように物事を扱うべきではないという正当な理由がありますか?

これまでのところ、私が持っている最善の方法は、gen_serverを使用し、タイムアウトに依存して高価なイベントをトリガーすることです。

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

このアプローチは正しいですか?(特に監督に関して?)

4

2 に答える 2

1

監督についてコメントすることはできませんが、期限切れのアイテムを含むキューとしてこれを実装します。

以下で使用できるものを実装しました。

私はそれをgen_serverにしました。あなたがそれを作成するとき、あなたはそれに古いアイテムの最大年齢を与えます。

そのインターフェースは、処理するアイテムを送信したり、デキューされていないアイテムを要求したりできることです。すべてのアイテムを受信した時刻を記録します。処理するアイテムを受け取るたびに、キュー内のすべてのアイテムをチェックし、最大経過時間より古いアイテムをデキューして破棄します。(最大年齢を常に尊重したい場合は、キューに入れられたアイテムを返す前にキューをフィルタリングできます)

データソースはデータ({process_this, Anything})をワークキューにキャストし、(低速の可能性がある)コンシューマプロセスは(gimme)を呼び出してデータを取得します。

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

REPLでの小さなデモ:

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        
于 2011-08-15T16:33:56.200 に答える
0

標準のOTPでこれを行う方法はありますか?

いいえ。

私がそのように物事を扱うべきではない正当な理由はありますか?

いいえ、早期にタイムアウトすると、システム全体のパフォーマンスが向上する可能性があります。ここで方法について読んでください。

このアプローチは正しいですか?(特に監督に関して?)

わかりません、あなたは監督コードを提供していません。


あなたの最初の質問への少しの追加情報として:

OTPの外部でサードパーティのライブラリを使用できる場合は、プリエンプティブタイムアウトを追加できるものがいくつかあります。これは、あなたが説明していることです。

私がよく知っているのは2つあり、1つ目は割引、2つ目はひよこです(私はひよこの作者です。ここではプロジェクトを宣伝しないようにします)。

Dispcountは、同時に実行できるジョブの数が限られており、キューイングを行わない単一のリソースに非常に適しています。ここでそれについて読むことができます(本当に興味深い情報がたくさん警告されています!)。

アプリ内のさまざまなキューの量を処理するために4000以上のプロセスのプールを生成する必要があったため、割引は機能しませんでした。chickを書いたのは、キューの長さを動的に増減する方法と、4000以上のプロセスのプールを生成することなく、リクエストをキューに入れて他のユーザーを拒否できる方法が必要だったためです。

私があなたなら、最初に割引を試してみます(ほとんどのソリューションはひよこを必要としないため)。次に、もう少し動的なものが必要な場合は、特定の数の要求に応答できるプールがひよこを試してみます。

于 2013-10-14T22:11:44.047 に答える