0

私はそのようなコードを持っています:

    public void IssueOrders(List<OrderAction> actions)
    {
        foreach (var action in actions)
        {
            if (action is AddOrder)
            {
                uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
                Console.WriteLine("order is placing userId = " + userId);
            }
            // TODO: implement other actions
        }
        // how to wait until OnApiTransactionsDataMessageReceived for all userId is received?

        // TODO: need to update actions with received data here
    }

    private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
    {
        var dataMsg = e.message;
        var userId = dataMsg.UserId;

apiTransactions.PlaceOrder非同期であるためuserId、結果として受信しますが、コールバックOnApiTransactionsDataMessageReceivedでデータを受信します。

したがって、たとえば、3つの注文を行うと、、、、などの3つのuserIdを1受け取り3ます4。次に、これらすべてのuserIdのデータが受信されるまで待つ必要があります。

userIdこれが重要な場合は常に増加しています。これはほとんど整数のシーケンスですが、並列実行のために一部の数値が省略される場合があります。

UPD注:

  • IssueOrdersは、異なるスレッドから並行して実行できます
  • PlaceOrderが返される前にcallackが呼び出される場合があります

UPD2

userId「コールバック」が受信される前にそれがわかっていることを保証できるように、以下のPlaceOrderコードをリファクタリングする必要がある可能性があります。

    public uint PlaceOrder(Order order)
    {
        Publisher pub = GetPublisher();

        SchemeDesc schemeDesc = pub.Scheme;
        MessageDesc messageDesc = schemeDesc.Messages[0]; //AddMM
        FieldDesc fieldDesc = messageDesc.Fields[3];
        Message sendMessage = pub.NewMessage(MessageKeyType.KeyName, "FutAddOrder");

        DataMessage smsg = (DataMessage)sendMessage;
        uint userId = counter.Next();
        FillDataMessageWithPlaceOrder(smsg, order, userId);

        System.Console.WriteLine("posting message dump: {0}", sendMessage);
        pub.Post(sendMessage, PublishFlag.NeedReply);
        sendMessage.Dispose();

        return userId;
    }

したがって、PlaceOrderを2つのメソッドに分割する必要があります:userId CreateOrdervoid PostOrder。これにより、コールバックが受信されたときに私が知っていることが保証されますuserId

4

3 に答える 3

1

ReactiveFrameworkのForkJoinメソッドを確認します。複数の非同期呼び出しが完了するまでブロックされます。

編集: ForkJoin()はRxの実験的なリリースにのみ含まれていたようです。これがMerge()に基づいて何が欲しいかについての議論です。

于 2012-09-05T08:32:37.850 に答える
0

最も愚かで実用的なアプローチの1つは次のとおりです。

public void IssueOrders(List<OrderAction> actions)
{
    var userIds = new List<uint>();
    lock(theHashMap)
        theHashMap[userIds] = "blargh";

    foreach (var action in actions)
    {
        if (action is AddOrder)
        {
            lock(userIds)
            {
               uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
               Console.WriteLine("order is placing userId = " + userId);

               userIds.Add(userId);
            }
        }

        // TODO: implement other actions
    }

    // waiting:
    do
    {
       lock(userIds)
          if(userIds.Count == 0)
             break;

       Thread.Sleep(???); // adjust the time depending on how long you wait for a callback on average

    }while(true);

    lock(theHashMap)
        theHashMap.Remove(userIds);

    // now you have the guarantee that all were received
}

private Dictionary<List<uint>, string> theHashMap = new Dictionary<List<uint>,string>();

private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
{
    var dataMsg = e.message;
    var userId = dataMsg.UserId;

    // do some other things

    lock(theHashMap)
        foreach(var list in theHashMap.Keys)
           lock(list)
              if(list.Remove(userId))
                 break;
}

しかし、これはかなり大雑把なアプローチです。ジョンがコメントで尋ねたように、待って何をするのかを説明しない限り、これ以上何かを提案するのは難しいです。たとえば、を離れてIssueOrdersどこかで待機し、すべてが到着したときに追加のジョブが実行されることを確認することができますか?それとも、IssueOrdersすべてが受け取られない限り、あなたは去ることができませんか?等..

編集:ADDの近くでは、ロックはPlaceOrderの前にある必要があることに注意してください。そうしないと、コールバックが超高速で到着すると、コールバックはIDが追加される前にIDを削除しようとする場合があります。また、この実装は非常に単純であることに注意してください。コールバックは、毎回すべてのリストを検索してロックする必要があります。いくつかの追加の辞書/マップ/インデックスを使用すると、大幅に最適化される可能性がありますが、読みやすくするためにここでは行いませんでした。

于 2012-09-05T08:40:56.530 に答える
0

APIを変更できる場合は、タスク並列ライブラリの使用を検討してください。これにより、コードがはるかに簡単になります。

そうしないと、 AutoResetEventが役立つ場合があります。

    private Dictionary<int, AutoResetEvent> m_Events = new ...;

    public void IssueOrders(List<OrderAction> actions)
    {
        foreach (var action in actions)
        {
            if (action is AddOrder)
            {
                uint userId = apiTransactions.PlaceOrder((action as AddOrder).order);
                // Attention: Race condition if PlaceOrder finishes                                         
                // before the MRE is created and added to the dictionary!
                m_Events[userId] = new ManualResetEvent(false);
                Console.WriteLine("order is placing userId = " + userId);
            }
            // TODO: implement other actions
        }

        WaitHandle.WaitAll(m_Events.Values);

        // TODO: Dispose the created MREs
    }

    private void OnApiTransactionsDataMessageReceived(object sender, DataMessageReceivedEventArgs e)
    {
        var dataMsg = e.message;
        var userId = dataMsg.UserId;

        m_Events[userId].Set();
    }
于 2012-09-05T08:41:14.673 に答える