5

MQTT 経由で受信したイベントのストリームを処理したい。私が使用しているライブラリは、コールバックを使用して結果を提供します。私が行っている処理は、最新のイベントだけでなく、以前の状態に依存します。また、将来的には、他のソースからイベントが収集される可能性があります。

最初に、良いアイデアだと思われるリストに構成することにしました。IOが遅延評価を妨げ、無限ストリームの待機が長くなる可能性があるという小さな問題がありましたが、IOをインターリーブすることで解決しました。

stream :: IO [Event]foldlfoldM map、などのような素晴らしいことを行うことができますmapM... 残念ながら、このアプローチでは、ロック機能がもうないため、2 つのストリームを結合することはできません。

私は多くのライブラリを掘り下げていて、たとえば TQueue を使用した STM を見つけました。残念ながら、それは私が正確に望んでいるものではありません。

カスタムタイプを作成してFoldable、折りたためるようにすることにしました。IOが原因で失敗しました。

import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined

次のように使用できますmain = foldStream (\x _ -> print x) () =<< events

通常のリストと同様に、このストリームで動作する基本クラスのいくつかを実装することは可能ですか?

4

1 に答える 1