MQTT 経由で受信したイベントのストリームを処理したい。私が使用しているライブラリは、コールバックを使用して結果を提供します。私が行っている処理は、最新のイベントだけでなく、以前の状態に依存します。また、将来的には、他のソースからイベントが収集される可能性があります。
最初に、良いアイデアだと思われるリストに構成することにしました。IOが遅延評価を妨げ、無限ストリームの待機が長くなる可能性があるという小さな問題がありましたが、IOをインターリーブすることで解決しました。
stream :: IO [Event]
foldl
、foldM
map
、などのような素晴らしいことを行うことができますmapM
... 残念ながら、このアプローチでは、ロック機能がもうないため、2 つのストリームを結合することはできません。
私は多くのライブラリを掘り下げていて、たとえば TQueue を使用した STM を見つけました。残念ながら、それは私が正確に望んでいるものではありません。
カスタムタイプを作成してFoldable
、折りたためるようにすることにしました。IOが原因で失敗しました。
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
次のように使用できますmain = foldStream (\x _ -> print x) () =<< events
通常のリストと同様に、このストリームで動作する基本クラスのいくつかを実装することは可能ですか?