目標は、次の型の署名を持つコンジットを持つことです
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
ByteString -> a
コンジットは、TCP / IP(network-conduit
パッケージを使用)を介して受信したプロトコルバッファ(関数を使用)を繰り返し解析する必要があります。
ワイヤーメッセージのフォーマットは
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(中括弧はプロトコルの当事者ではなく、エンティティを区切るためにここでのみ使用されます)。
最初のアイデアは、1つのProtoBufを解析できるをsequenceSink
繰り返し適用するために使用することでした。Sink
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
ソースからすでに読み取られているが、それを介して消費されていない「残りの」バイトCB.take
が破棄されるため、機能しません(最初のプロトコルバッファでのみ機能します)。
そして、私は「残りをソースに戻す」方法を見つけられませんでした。
コンセプトが完全に間違っていましたか?
PS:ここでプロトコルバッファを使用していても、問題はプロトコルバッファとは関係ありません。問題をデバッグするために、私はいつも使用{length}{UTF8 encoded string}{length}{UTF8 encoded string}...
し、上記のようなコンジット(utf8StringConduit :: MonadResource m => Conduit ByteString m Text
)を使用します。
アップデート:
()
状態(上記のサンプルでは状態なし)を残りのバイトに置き換えようとしCB.take
、呼び出しを、最初に(状態から)既に読み取られたバイトを消費し、必要な場合にのみ呼び出す関数の呼び出しに置き換えましたawait
(状態が十分な大きさではありません)。残念ながら、ソースにバイトが残ってsequenceSink
いない場合はコードを実行しませんが、状態には残りのバイトが含まれているため、これも機能しません:-(。
コードに興味がある場合(最適化されていないか、あまり良くないが、テストするには十分なはずです):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"