2

Elixir の (サイクリック) バリアの最もエレガントな実装は何ですか? 実装するアルゴリズム (頂点カラーリング) には、生成されたプロセスの待機フェーズ (「並列で同期的に実行」し、すべてのプロセスの結果を使用して終了条件をチェックする) を含むループがあり、アルゴリズム 5 の「6 色」です。 「分散コンピューティングの原則から、Ch。1.

ほとんどの参照は .NET、pthread、およびその他のスレッド関連の計算に関するものであるため、バリアが正しいパターンであるかどうかはわかりません。もっと「エリキシル」な方法があるかもしれません。

私はまだコードを持っていません(パターンを探しています)が、同じ問題の「遅い」バージョンを実装するコードは次のとおりです: https://codereview.stackexchange.com/questions/111487/coloring-trees-in-エリクサー

私が得たアイデアは、トップレベルのプロセス (グラフ ノードごとに 1 つのプロセスを生成するプロセス) を使用して、ノード プロセスを同期させるメッセージを送受信することです。ノード プロセスも相互に通信することに注意する必要があります。親は、最上位ループの 1 回の反復中に子にメッセージを送信します。ただし、複雑なのは、トップレベルがノードのメッセージを受信した後、すべてのノードが反復を実行する前にプロセスを続行しないことです (ほとんどの場合、末尾再帰を使用します)。そこでバリア機構を考えました。

4

2 に答える 2

3

これがまさにあなたが探しているものかどうかはわかりませんが、ここに Java の java.util.concurrent.CyclicBarrier クラスと ruby​​ のConcurrent::CyclicBarrier クラスに基づく循環バリアがあります。

defmodule CyclicBarrier do

  require Record
  Record.defrecordp :barrier, CyclicBarrier,
    pid: nil

  def start(parties, action \\ nil)
      when (is_integer(parties) and parties > 0)
      and (action === nil or is_function(action, 0)),
    do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))

  def stop(barrier(pid: pid)) do
    call(pid, :stop)
    true
  end

  def alive?(barrier(pid: pid)) do
    Process.alive?(pid)
  end

  def broken?(barrier(pid: pid)) do
    case call(pid, :status) do
      :waiting ->
        false
      _ ->
        true
    end
  end

  def number_waiting(barrier(pid: pid)) do
    case call(pid, :number_waiting) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def parties(barrier(pid: pid)) do
    case call(pid, :parties) do
      n when is_integer(n) ->
        n
      _ ->
        false
    end
  end

  def reset(barrier(pid: pid)) do
    case call(pid, :reset) do
      :reset ->
        true
      :broken ->
        true
      _ ->
        false
    end
  end

  def wait(barrier = barrier()),
    do: wait(nil, barrier)

  def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
    case call(pid, :wait, timeout) do
      :fulfilled ->
        true
      :broken ->
        false
      :timeout ->
        reset(barrier)
        false
      _ ->
        false
    end
  end

  defp call(pid, request, timeout \\ nil) do
    case Process.alive?(pid) do
      false ->
        {:EXIT, pid, :normal}
      true ->
        trap_exit = Process.flag(:trap_exit, true)
        Process.link(pid)
        ref = make_ref()
        send(pid, {ref, self(), request})
        case timeout do
          nil ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            end
          _ ->
            receive do
              {^ref, reply} ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                reply
              exited = {:EXIT, ^pid, _} ->
                Process.flag(:trap_exit, trap_exit)
                exited
            after
              timeout ->
                Process.unlink(pid)
                Process.flag(:trap_exit, trap_exit)
                :timeout
            end
        end

    end
  end

  defmodule Server do

    require Record
    Record.defrecordp :state_data,
      waiting: 0,
      parties: nil,
      action:  nil,
      q:       :queue.new()

    def init(parties, action),
      do: loop(:waiting, state_data(parties: parties, action: action))

    defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
      do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
    defp loop(state_name, sd) do
      receive do
        {ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
          handle(state_name, request, {ref, pid}, sd)
      end
    end

    defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
      do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
    defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:waiting, :reset, from, sd = state_data(q: q)),
      do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
    defp handle(:broken, :reset, from, sd = state_data(q: q)),
      do: loop(done(:reset, nil, :queue.in(from, q)), sd)
    defp handle(:broken, :wait, from, sd) do
      cast(from, :broken)
      loop(:broken, sd)
    end
    defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
      cast(from, number_waiting)
      loop(state_name, sd)
    end
    defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
      cast(from, parties)
      loop(state_name, sd)
    end
    defp handle(state_name, :status, from, sd) do
      cast(from, state_name)
      loop(state_name, sd)
    end
    defp handle(_state_name, :stop, _from, _sd) do
      exit(:normal)
    end

    defp broadcast(q, message),
      do: for from <- :queue.to_list(q),
        do: cast(from, message)

    defp cast({ref, pid}, message),
      do: send(pid, {ref, message})

    defp done(state, action, q, continue \\ true) do
      run(action)
      broadcast(q, state)
      case continue do
        true ->
          :waiting
        false ->
          state
      end
    end

    defp run(nil),
      do: nil
    defp run(action),
      do: action.()

  end

end

CyclicBarrierElixir の IEx シェルで使用する例を次に示します。

iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]

プロセス実行の正確な順序は非決定論的です。

上の関数の他の例をCyclicBarrier以下に示します。

iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false
于 2015-12-02T19:38:35.783 に答える