2

私はパイプ エコシステム、特にパイプ同時実行性を使用してストリーミング関数を作成しています。これは運用ライブラリに基づいており、ネットワーク経由でサーバーまたは stdin/out にコマンドを渡す小さなプログラム スニペットをすばやく作成できます。シェル コマンドを呼び出してから、応答を読み返します。この場合、それはアスタリスクですが、同様のものに一般化できます。

最初はパイプを念頭に置いてこれを書きましたが、うまくいきません。次のコードが機能しない理由は、astPipe が a を返すのPipe _ _ IO aに対して、 i と o の両方が pipes-concurrency から両方とも を返すためConsumer/Producer _ IO ()です。astPipeyieldを用意Maybe ByteStringしてから、出力をConsumerconsumerにすることを考えましMaybe ByteStringたが、それでもProducer返される問題は解決しません()

解決にかなり近づいたような気がしますが、なかなか解決できません。このファイルでスタックを実行するだけで複製できるはずです。

#!/usr/bin/env stack
-- stack --resolver lts-6.20 runghc --package pipes  --package pipes-concurrency  --package operational --package process-streaming

{-# LANGUAGE OverloadedStrings, LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module West.Asterisk where

import System.Process.Streaming as PS
import Control.Monad.Operational as Op

import Pipes as P
import Pipes.Concurrent as PC;

import qualified Data.ByteString.Char8 as B

import Control.Concurrent.Async

import GHC.IO.Exception (ExitCode)

data Version = Version String
data Channels = Channels

data AsteriskInstruction a where
    Login :: AsteriskInstruction (Maybe Version)
    CoreShowChannels :: AsteriskInstruction (Maybe Channels)

type Asterisk a = Program AsteriskInstruction a

runAsterisk :: forall a. Asterisk a -> IO a
runAsterisk m = 
  let

    runAsterisk' :: Producer B.ByteString {- TODO Response -} IO () -> Consumer B.ByteString IO () -> Asterisk a -> IO a
    runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o
      where
        astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        astPipe k = 
          case Op.view m' of

            Return a -> return a

            Login :>>= k -> do
              yield logincmd
              resp <- await -- :: Response
              let v = undefined resp :: Maybe Version
              astPipe (k v)

            CoreShowChannels :>>= k -> do
              yield coreshowchannelscmd
              resp <- await
              let c = undefined resp :: Maybe Channels
              astPipe (k c)

  in do
    withSpawn unbounded $ \(out1, in1) -> do
        async $ asteriskManager (fromInput in1) (toOutput out1)
        runAsterisk' (fromInput in1) (toOutput out1) m 

asteriskManager :: Producer B.ByteString IO () -> Consumer B.ByteString IO () -> IO ExitCode
asteriskManager prod cons = do
  let ssh = shell "nc someserver 5038"
  execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode)


logincmd, coreshowchannelscmd :: B.ByteString
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n"
coreshowchannelscmd = "action: coreshowchannels\n\n"

エラー:

  Blah.hs:38:45:
    Couldn't match type ‘a’ with ‘()’
      ‘a’ is a rigid type variable bound by
          the type signature for runAsterisk :: Asterisk a -> IO a
          at Blah.hs:33:23
    Expected type: Proxy () B.ByteString () B.ByteString IO ()
      Actual type: Pipe B.ByteString B.ByteString IO a
    Relevant bindings include
      astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        (bound at Blah.hs:41:9)
      m' :: Asterisk a (bound at Blah.hs:38:22)
      runAsterisk' :: Producer B.ByteString IO ()
                      -> Consumer B.ByteString IO () -> Asterisk a -> IO a
        (bound at Blah.hs:38:5)
      m :: Asterisk a (bound at Blah.hs:34:13)
      runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1)
    In the second argument of ‘(>->)’, namely ‘astPipe m'’
    In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’
4

1 に答える 1

2

Producers とConsumers が戻ると()、それ自体で停止できます。戻り値の型が多態的であるProducerおよびは、それ自体で停止することはありません。Consumer

Eitherケースの戻り値の型を統一するには、 を使用して、それぞれを の異なるブランチに配置しfmapます。

runAsterisk' :: Producer B.ByteString IO () 
             -> Consumer B.ByteString IO () 
             -> Asterisk a 
             -> IO (Either () a)
runAsterisk' i o m' = runEffect $ fmap Left i >-> fmap Right (astPipe m') >-> fmap Left o

でのパターン マッチングにより、Eitherどのコンポーネントがパイプラインを停止したかが明らかになります。

また、 を使用して、それ自体で停止しないコンシューマーdrainに変換することもできます。Consumer a IO ()

neverStop :: Consumer a IO () -> Consumer a IO r
neverStop consumer = consumer *> drain

元のコンシューマーが停止した後に受信したすべての入力は破棄されます。

于 2016-10-05T00:01:58.040 に答える