6

目標は、次の型の署名を持つコンジットを持つことです

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

ByteString -> aコンジットは、TCP / IP(network-conduitパッケージを使用)を介して受信したプロトコルバッファ(関数を使用)を繰り返し解析する必要があります。

ワイヤーメッセージのフォーマットは

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(中括弧はプロトコルの当事者ではなく、エンティティを区切るためにここでのみ使用されます)。

最初のアイデアは、1つのProtoBufを解析できるをsequenceSink繰り返し適用するために使用することでした。Sink

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

ソースからすでに読み取られているが、それを介して消費されていない「残りの」バイトCB.takeが破棄されるため、機能しません(最初のプロトコルバッファでのみ機能します)。

そして、私は「残りをソースに戻す」方法を見つけられませんでした。

コンセプトが完全に間違っていましたか?

PS:ここでプロトコルバッファを使用していても、問題はプロトコルバッファとは関係ありません。問題をデバッグするために、私はいつも使用{length}{UTF8 encoded string}{length}{UTF8 encoded string}...し、上記のようなコンジット(utf8StringConduit :: MonadResource m => Conduit ByteString m Text)を使用します。

アップデート:

()状態(上記のサンプルでは状態なし)を残りのバイトに置き換えようとしCB.take、呼び出しを、最初に(状態から)既に読み取られたバイトを消費し、必要な場合にのみ呼び出す関数の呼び出しに置き換えましたawait(状態が十分な大きさではありません)。残念ながら、ソースにバイトが残ってsequenceSinkいない場合はコードを実行しませんが、状態には残りのバイトが含まれているため、これも機能しません:-(。

コードに興味がある場合(最適化されていないか、あまり良くないが、テストするには十分なはずです):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"
4

1 に答える 1

4

プロトコルバッファの解析とシリアル化にはmessageWithLengthPutMand messageWithLengthGetM(以下を参照)を使用しますが、長さにはvarintエンコーディングを使用していると思いますが、これは必要なものではありません。messageWithLengthGet / Putを次のようなものに置き換えることで、以下の実装を適応させようと思うでしょう。

myMessageWithLengthGetM = 
   do size <- getWord32be 
      getMessageWithSize size

getMessageWithSizeしかし、プロトコルバッファパッケージから利用可能な関数を使用して実装する方法がわかりません。一方getByteString、バイト文字列を「再解析」することもできます。

コンジットについて:コンジットなしで実装してみましたData.Conduit.Utilか?何かのようなもの

protobufConduit protobufDecode = loop
   where
      loop = 
         do len <- liftM convertLen (CB.take 4)
            bs <- CB.take len
            yield (protobufDecode bs)
            loop

使用するコードは次のとおりです。

pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
    where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
    where
      new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
      read parse =
          do mbs <- await
             case mbs of
               Just bs -> checkResult (parse bs)
               Nothing -> return ()
      checkResult result =
          case result of
            Failed _ errmsg -> fail errmsg
            Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
            Finished rest _ msg ->
                do yield msg
                   checkResult (runGet messageWithLengthGetM rest)
于 2012-09-24T18:48:54.473 に答える