1

オブジェクトのいくつかのインスタンスが同時に「パルス」を生成するアプリケーションを構築する必要があります。(基本的に、これはカウンターをインクリメントしていることを意味します。) また、各オブジェクトの合計カウンターを追跡する必要もあります。また、カウンターで読み取りを実行するたびに、カウンターをゼロにリセットする必要があります。

職場の男性と話していたのですが、彼は Retlang とメッセージベースの同時実行性について言及しましたが、これは非常に興味深いものでした。しかし、明らかに私はその概念に非常に慣れていません。小さなプロトタイプを作成したところ、予想どおりの結果が得られました。これは素晴らしいことです。しかし、Retlang と並行プログラミング全般。

まず、私はこれらのクラスを持っています:

public class Plc {

        private readonly IChannel<Pulse> _channel;
        private readonly IFiber _fiber;
        private readonly int _pulseInterval;
        private readonly int _plcId;

        public Plc(IChannel<Pulse> channel, int plcId, int pulseInterval) {
            _channel = channel;
            _pulseInterval = pulseInterval;
            _fiber = new PoolFiber();
            _plcId = plcId;
        }

        public void Start() {
            _fiber.Start();
            // Not sure if it's safe to pass in a delegate which will run in an infinite loop...
            // AND use a shared channel object...
            _fiber.Enqueue(() => {
                SendPulse();
            });
        }

        private void SendPulse() {
            while (true) {
                // Not sure if it's safe to use the same channel object in different
                // IFibers...
                _channel.Publish(new Pulse() { PlcId = _plcId });
                Thread.Sleep(_pulseInterval);
            }
        }
    }

    public class Pulse {
        public int PlcId { get; set; }
    }

ここでの考え方は、複数の Plc をインスタンス化し、それぞれに同じ IChannel を渡し、それらに SendPulse 関数を同時に実行させることで、それぞれが同じチャネルにパブリッシュできるようにするというものです。しかし、私のコメントからわかるように、私がやっていることは実際に合法であるかどうかについては少し懐疑的です. 私は主に、同じ IChannel オブジェクトを使用して異なる IFiber のコンテキストでパブリッシュすることについて心配していますが、エンキューに渡されたデリゲートから決して返されないことについても心配しています。これをどのように処理すべきかについて、誰かが何らかの洞察を提供できることを願っています。

また、「サブスクライバー」クラスは次のとおりです。

public class PulseReceiver {

        private int[] _pulseTotals;
        private readonly IFiber _fiber;
        private readonly IChannel<Pulse> _channel;
        private object _pulseTotalsLock;

        public PulseReceiver(IChannel<Pulse> channel, int numberOfPlcs) {
            _pulseTotals = new int[numberOfPlcs];
            _channel = channel;
            _fiber = new PoolFiber();
            _pulseTotalsLock = new object();
        }

        public void Start() {
            _fiber.Start();
            _channel.Subscribe(_fiber, this.UpdatePulseTotals);
        }

        private void UpdatePulseTotals(Pulse pulse) {
            // This occurs in the execution context of the IFiber.
            // If we were just dealing with the the published Pulses from the channel, I think
            // we wouldn't need the lock, since I THINK the published messages would be taken
            // from a queue (i.e. each Plc is publishing concurrently, but Retlang enqueues
            // the messages).
            lock(_pulseTotalsLock) {

                _pulseTotals[pulse.PlcId - 1]++;
            }
        }

        public int GetTotalForPlc(int plcId) {
            // However, this access takes place in the application thread, not in the IFiber,
            // and I think there could potentially be a race condition here.  I.e. the array
            // is being updated from the IFiber, but I think I'm reading from it and resetting values
            // concurrently in a different thread.
            lock(_pulseTotalsLock) {

                if (plcId <= _pulseTotals.Length) {

                    int currentTotal = _pulseTotals[plcId - 1];
                    _pulseTotals[plcId - 1] = 0;
                    return currentTotal;
                }
            }

            return -1;
        }
    }

ここでは、Plc インスタンスに与えられたのと同じ IChannel を再利用していますが、別の IFiber がそれにサブスクライブしています。理想的には、各 Plc からメッセージを受信し、クラス内の単一のプライベート フィールドを更新できますが、スレッド セーフな方法で行うことができます。

私が理解していること (およびコメントで言及したこと) から、Plcs から各メッセージをシリアルに受信するため、Subscribe 関数に渡したデリゲートの _pulseTotals 配列を更新するだけで安全だと思います。

ただし、合計を読み取ってリセットする必要があるビットを処理する最善の方法がわかりません。コードとコメントからわかるように、_pulseTotals 配列へのすべてのアクセスをロックでラップすることになりました。しかし、これが必要かどうかはわかりません.a)実際にこれを行う必要があるかどうか、およびその理由、またはb)同様のものを実装する正しい方法を知りたいです.

そして最後に、私の主な機能は次のとおりです。

static void Main(string[] args) {

        Channel<Pulse> pulseChannel = new Channel<Pulse>();

        PulseReceiver pulseReceiver = new PulseReceiver(pulseChannel, 3);
        pulseReceiver.Start();

        List<Plc> plcs = new List<Plc>() {
            new Plc(pulseChannel, 1, 500),
            new Plc(pulseChannel, 2, 250),
            new Plc(pulseChannel, 3, 1000)
        };

        plcs.ForEach(plc => plc.Start());

        while (true) {
            Thread.Sleep(10000);
            Console.WriteLine(string.Format("Plc 1: {0}\nPlc 2: {1}\nPlc 3: {2}\n", pulseReceiver.GetTotalForPlc(1), pulseReceiver.GetTotalForPlc(2), pulseReceiver.GetTotalForPlc(3)));
        }
    }

1 つの IChannel をインスタンス化し、それをすべてに渡します。ここで、Receiver は内部的に IFiber をサブスクライブし、Plc は IFiber を使用して、継続的にチャネルにパブリッシュする戻りのないメソッドを「エンキュー」します。

ここでも、コンソール出力は、私が期待していたのとまったく同じように見えます。つまり、10 秒待った後、Plc 1 に対して 20 個の「パルス」が表示されます。また、読み取り後のカウンターのリセットも機能しているようです。つまり、Plc 1 には、10 秒ごとに 20 個の「パルス」があります。しかし、それは私が何か重要なことを見落としていないことを保証するものではありません.

Retlang と並行プログラミングの手法についてもう少し学べることをとても楽しみにしています。そのため、誰かが私のコードを調べて、私の特定の問題に対する提案や、私の要件に基づいた別の設計を提供してくれることを願っています!

4

0 に答える 0