監督についてコメントすることはできませんが、期限切れのアイテムを含むキューとしてこれを実装します。
以下で使用できるものを実装しました。
私はそれをgen_serverにしました。あなたがそれを作成するとき、あなたはそれに古いアイテムの最大年齢を与えます。
そのインターフェースは、処理するアイテムを送信したり、デキューされていないアイテムを要求したりできることです。すべてのアイテムを受信した時刻を記録します。処理するアイテムを受け取るたびに、キュー内のすべてのアイテムをチェックし、最大経過時間より古いアイテムをデキューして破棄します。(最大年齢を常に尊重したい場合は、キューに入れられたアイテムを返す前にキューをフィルタリングできます)
データソースはデータ({process_this, Anything}
)をワークキューにキャストし、(低速の可能性がある)コンシューマプロセスは(gimme
)を呼び出してデータを取得します。
-module(work_queue).
-behavior(gen_server).
-export([init/1, handle_cast/2, handle_call/3]).
init(DiscardAfter) ->
{ok, {DiscardAfter, queue:new()}}.
handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
Instant = now(),
Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
Queue2 = queue:in({Instant, Data}, Queue1),
{noreply, {DiscardAfter, Queue2}}.
handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
case queue:is_empty(Queue0) of
true ->
{reply, no_data, State};
false ->
{{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
{reply, {data, Data}, {DiscardAfter, Queue1}}
end.
delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.
too_old(Stamp, Instant, DiscardAfter) ->
delta(Stamp, Instant) > DiscardAfter.
REPLでの小さなデモ:
c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),
timer:sleep(11 * 1000),
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.