1

Bytestringソース ( ) からのストリーミングのシンクをシミュレートするためにファイル ハンドルを使用するコードがありますAWS S3。シンクとして使用したい場合は、以下のコードを(接続へのハンドルを使用して)Network.Websocket交換するだけで十分でしょうか?LBS.writeFilesendBinaryData

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
    Right _ -> return 0

私にとって混乱の原因は、ストリームの終了がどのように決定されるかです? ファイルの場合、これはwriteFileAPI によって処理されます。どうsendBinaryDataですか?と同様に終了を処理しますwriteFileか? それとも、クライアント側のデータ パーサーによって決定されますか?

アップデート

この質問は、上記の例のファイル ハンドルで行うように、Websocket ハンドル (ハンドルが提供されていると仮定します) にデータをストリーミングする方法に関するものであり、実際には 内でハンドルを管理する方法に関するものではありませんresourceT。データをシンクするアプローチconduitを取っているようです。mapM_ですから、それは確かに進むべき道のようです。

終了に関する質問は、私が持っているこの考え方によるものです。Websocket ハンドルの反対側でデータをリッスンする関数がある場合、メッセージの終了を決定することは、ストリーミング コンテキストでは重要なようです。以下のような関数が与えられます:

f :: LBS.ByteString -> a

データを websocket ハンドルにストリーミングする場合S.mapM_、何らかのend of streamマーカーを追加fして、反対側でリッスンしているときに遅延バイト文字列の処理を停止できるようにしますか? そうfしないと、メッセージがいつ完了したかわかりません。

4

3 に答える 3

2

ハンドルには追加のトリックが必要であると考えるのは正しいです。しかし、あなたはすでにResourceTモナド変換子を使っているので、これはとても簡単ですallocateallocateリソースモナドでハンドルを作成し、クリーンアップアクションを登録できます(この場合、接続を閉じるだけです)。

ok <- runResourceT $ do
  (releaseKey, handle) <-
    allocate (WebSockets.acceptRequest request) 
             (`WebSockets.sendClose` closeMessage)
  WebSockets.sendBinaryData handle data
  return ok
where
  request = ...
  closeMessage = ...
  data = ...
  ok = ...

を使用することallocateで、ハンドルがrunResourceT戻るまでに閉じることが保証されますok

ただし、これがあなたの望むものかどうかは完全にはわかりません。getObjectWS 接続を受け入れて閉じる方法を知っておくべきではないように思えます。おそらく、WS 接続ハンドルを引数として取り、それに書き込む必要があります。戻り値の型を にアップグレードすると、呼び出し元にWS ハンドルの呼び出しと割り当てなどの責任を負わ せるResourceTことができます。しかし、うまくいけば、上記の例で十分に理解を深めることができます。getObjectrunResourceT

于 2016-06-14T14:54:33.640 に答える
1

以下に、物事をより理解しやすくするためのいくつかの小片を示します。まず、最初の小さなデモでは、あなたの を修正して、とにかくあるgetObjectを​​使用して、遅延バイト文字列による迂回を削除します。Streaming.ByteString.writeFileResourceT

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit 
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import           Streaming as S
import           Streaming.Prelude as S hiding (show,print)
import           Control.Concurrent.Async (async,waitCatch)
import           Data.Text as T (Text) 
import qualified Network.WebSockets as WebSockets
import           Control.Monad.Trans.Resource

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
                             , _aws_s3cfg :: S3.S3Configuration a
                             , _aws_httpmgr :: HTTP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text ->  IO Int
getObject cfg file bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    let bytestream = do 
         -- lookup "content-length" (S3.omUserMetadata mdata))
         SB.chunk B.empty -- this will be replaced by content-length 
         hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk 
    SB.writeFile file bytestream ) -- this is in ResourceT 
  case req of
    Left _ -> return 2
    Right _ -> return 0

これから、多かれ少なかれあなたが何をしていたかを抽象化できますSB.writeFile:

getObjectAbstracted
      :: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
         -> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) 
                  (_aws_s3cfg cfg) 
                  (_aws_httpmgr cfg) 
                  (S3.getObject bucket key)

    action (hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk) 

ここで、ストリーミング バイト文字列ライブラリに含まれていない小さなヘルパーが必要です

mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
  (a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
  return r

ストリーミングバイト文字列を使用して、@haoformayorが計画したように多かれ少なかれ進めることができます

writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection  = 
  mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)

-- following `haoformayor`
connectWrite
    :: (MonadResource m, WebSockets.WebSocketsData a) 
    => WebSockets.PendingConnection 
    -> a                  -- closing  message
    -> SB.ByteString m r  -- stream from aws
    -> m r
connectWrite request closeMessage bytestream = do
    (releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
                                         (`WebSockets.sendClose` closeMessage)
    writeConnection connection bytestream

getObjectWS :: WebSockets.WebSocketsData a =>
       WebSockets.PendingConnection
       -> a
       -> AwsConfig Aws.NormalQuery
       -> S3.Bucket
       -> Text
       -> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)

もちろん、これまでのところ、conduitstreaming/の違いを利用しているものはありませんstreaming-bytestring

于 2016-06-14T16:18:58.883 に答える