5

エンコードされた大きな(<bloblength><blob>)*バイナリ ファイルのデシリアライザーを作成しているときに、さまざまな Haskell の Produce-Transform-Consume ライブラリに行き詰まりました。これまでのところ、4 つのストリーミング ライブラリを認識しています。

  • Data.Conduit : 広く使用されており、非常に慎重なリソース管理が行われています
  • Pipes : 同様conduit( Haskell Cast #6は と の違いをうまく明らかにしていconduitますpipes)
  • Data.Binary.Get : getWord32be などの便利な関数を提供しますが、ストリーミングの例は扱いにくいです
  • System.IO.Streams : 一番使いやすいようです

Word32これは、ストリーミングをしようとしたときに問題が発生する場所の簡略化された例ですconduit。もう少し現実的な例では、最初Word32に blob の長さを決定する a を読み取り、次にその長さの lazyByteStringを生成します (その後、さらにデシリアライズされます)。しかし、ここでは、バイナリ ファイルからストリーミング形式で Word32 を抽出しようとしています。

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes

プログラムの出力は、読み取られた Word32 の数だけです。最初のチャンク (約 32KiB) を読み取った後、ストリームが終了することがわかります。何らかの理由mbsでが neverであるため、チャンクが消費されたときにストリームを停止するものをNothing確認する必要があります。null bs明らかに、私のコンジットtransformは故障しています。ソリューションへの 2 つのルートが表示されます。

  1. はのawait2 番目のチャンクに行きたくないByteStreamので、次のチャンクをプルする別の関数はありますか? 私が見た例 (例: Conduit 101 ) では、これはどのように行われたかではありません
  2. これは、セットアップの方法が間違っているだけtransformです。

これはどのように適切に行われますか?これは正しい方法ですか?(パフォーマンスは重要です。)

更新:これを使用してそれを行う悪い方法は次のSystems.IO.Streamsとおりです。

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r

「悪い」とは、時間と空間の要求が非常に高く、デコード例外を処理しないことを意味します。

4

3 に答える 3

3

pipes(and pipes-groupand ) を使用するとpipes-bytestring、デモの問題はコンビネータに縮小されます。最初に、着信未分化バイト ストリームを小さな 4 バイト チャンクに分解します。

chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n) 

次に、これらをWord32s にマップし、(ここでは) カウントします。

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
     print n

4 バイト未満の場合、または解析に失敗した場合、これは失敗しますが、次のようにマッピングすることもできます

getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case  G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
  Left r -> Nothing
  Right (_, off, w32) -> Just w32

次のプログラムは、有効な 4 バイト シーケンスの解析結果を出力します。

main :: IO ()
main = do
   filename:_ <- getArgs
   IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ chunksOfStrict 4 (Bytes.fromHandle h) 
                 >-> P.map getMaybeWord32
                 >-> P.concat  -- here `concat` eliminates maybes
                 >-> P.print 

もちろん、失敗した解析に対処する方法は他にもあります。

しかし、これはあなたが求めたプログラムにより近いものです。バイト ストリーム ( ) から 4 バイト セグメントを取得し、それが十分な長さでProducer ByteString m rあるかのように読み取ります。Word32次に、受信バイトの多くを受け取り、それらを遅延バイト文字列に蓄積して、それを生成します。バイトがなくなるまでこれを繰り返すだけです。以下ではmain、生成された生成された遅延バイト文字列をそれぞれ出力します。

module Main (main) where 
import Pipes 
import qualified Pipes.Prelude as P
import Pipes.Group (folds) 
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL 
import System.Environment ( getArgs )

splitLazy :: (Monad m, Integral n) =>
   n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
  (bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
  return (BL.fromChunks bss, rest)

measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
 (lbs, rest) <- lift $ splitLazy 4 bs
 if BL.length lbs /= 4
   then rest >-> P.drain -- in fact it will be empty
   else do
     let w32 = G.runGet G.getWord32be lbs
     (lbs', rest') <- lift $ splitLazy w32 bs
     yield lbs
     measureChunks rest

main :: IO ()
main = do
  filename:_ <- getArgs
  IO.withFile filename IO.ReadMode $ \h -> do
     runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print

runGetこれもnotを使用しているという点で粗雑ですrunGetOrFailが、これは簡単に修正できます。パイプの標準的な手順は、解析が失敗したときにストリーム変換を停止し、解析されていないバイトストリームを返すことです。

Word32s対応するバイト ストリームを遅延バイト文字列として蓄積したくないが、それらを蓄積せずに別のファイルに書き込むと言うように、 が大きな数であると予想していた場合は、プログラムを非常に簡単に変更してそれを行うことができます。これには、conduit の高度な使用が必要になりますが、pipesand を使用した推奨されるアプローチstreamingです。

于 2016-10-08T20:33:15.937 に答える
1

これは、リングに投入したい比較的簡単なソリューションです。(のサブセット) と同一のインターフェースを与えるモナドにsplitAtラップされたの繰り返し使用です。結果はoverで取得されます。StateData.Binary.Get[ByteString]mainwhileJustgetBlob

module Main (main) where

import           Control.Monad.Loops
import           Control.Monad.State
import qualified Data.Binary.Get      as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import           Data.Int             (Int64)
import           Data.Word            (Word32)
import           System.Environment   (getArgs)

-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString

getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
    let (w, rest) = BL.splitAt 4 bs
    case BL.length w of
        4 -> (Just w', rest) where
            w' = G.runGet G.getWord32be w
        _ -> (Nothing, BL.empty)

getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs

getBlob :: Get (Maybe BL.ByteString)
getBlob = do
    ml <- getWord32be
    case ml of
        Nothing -> return Nothing
        Just l -> do
            blob <- getLazyByteString (fromIntegral l :: Int64)
            return $ Just blob

runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs

main :: IO ()
main = do
    fname <- head <$> getArgs
    bs <- BL.readFile fname
    let ls = runGet loop bs where
        loop = whileJust getBlob return
    print $ length ls

にはエラー処理はありませんがgetBlob、拡張は簡単です。結果のリストが慎重に使用される限り、時間と空間の複雑さはかなり良好です。(上記で使用するランダム データを作成する python スクリプトはこちらです)。

于 2016-10-10T13:43:04.000 に答える