5

次のような型シグネチャを使用して、ソースの非決定論的なインターリーブ操作を見たいと思っていました

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)

ユースケースは、ネットワーク上の多くのノードへのオープン接続を維持する p2p アプリケーションがあり、ほとんどの場合、それらのいずれかからのメッセージを待っているだけです。メッセージが到着すると、どこから来たかは気にしませんが、できるだけ早くメッセージを処理する必要があります。理論的には、この種のアプリケーション (少なくともソケットのようなソースに使用される場合) は、GHC の IO マネージャーを完全にバイパスしてselect/ epoll/etc を実行できます。直接呼び出しますが、機能する限り、実装方法は特に気にしません。

コンジットでこのようなことは可能ですか?一般的ではありませんが、おそらくより実現可能なアプローチは[(k, Socket)] -> Source m (k, ByteString)、すべてのソケットでの受信を処理する関数を作成することです。

コンジットでの操作に気付きましたが、少なくともこの操作では、抽象化リークのように感じられるResumableSource特定の を認識したいようです。Sink

4

2 に答える 2

5

stm-conduit パッケージは、探しているものと同じではありませんが似たような処理を実行するmergeSourcesを提供します。それはおそらく始めるのに良い場所です。

于 2012-07-14T18:45:49.010 に答える
3

はい、可能です。

スレッドを分岐してポーリングすることにより、ブロックせずに多数の s をポーリングできSourceます。各スレッドで、出力を同時実行チャネルに送信するSourceとペアにする場所をポーリングします。Sink

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r

...そして、Sourceそのチャネルから読み取るを定義します。

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r

これは、スレッドをフォークしてソケット自体をポーリングすることと同じであることに注意してください。ただし、より一般的であるため、定義した s をconduit使用してソケット以外のものをポーリングしたいの他のユーザーにとっては便利です。Source

これらの機能を 1 つの関数に組み合わせると、呼び出しの全体的な API は次のようになります。

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)

...しかし、必要に応じてこれらkの s を投入することもできます。

于 2012-07-14T14:00:29.847 に答える