これはちょっと遠回りのように感じますが、データベースに接続し、サーバー上のデータベースのリストを取得し、それぞれに接続し、それぞれに対してクエリを実行し (ユーザー数)、それらを出力するためのパイプを作成しました。数えます。残念ながら、これは私の実際の例から単純化できる程度のものです。pipes-4.1.0、pipes-safe-2.0.2、および mysql-simple-0.2.2.4 を使用しています。コードは次のとおりです。
{-# LANGUAGE RankNTypes, OverloadedStrings #-}
import Pipes
import qualified Pipes.Safe as PS
import qualified Pipes.Prelude as P
import Database.MySQL.Simple
import qualified Data.Text as T
import Control.Monad.Catch as MC
import Control.Monad (forever)
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
data DBName = DBName T.Text deriving Show
-- connect to a database and use a table.
mydb :: T.Text -> ConnectInfo
mydb = undefined
-- Quirk of (mysql|postgresql)-simple libraries
unOnly (Only a) = a
queryProducer :: (MonadIO m, QueryParams params, QueryResults r) => Connection -> Query -> params -> Pipes.Producer' r m ()
queryProducer = undefined
myDBNames :: (PS.MonadSafe m, MonadIO m) => Producer DBName m ()
myDBNames = PS.bracket (liftIO $ connect $ mydb "sometable") (liftIO . close) $ \db ->
queryProducer db "show databases" () >-> P.map (DBName . unOnly)
-- I realize this is inefficient, one step at a time.
connectToDB :: (PS.MonadSafe m, MonadIO m) => Pipe DBName Connection m ()
connectToDB = forever $ do
(DBName dbname) <- await
PS.bracket
(liftIO . connect . mydb $ dbname)
(liftIO . close)
yield
userCount :: (PS.MonadCatch m, MonadIO m) => Pipe Connection Int m ()
userCount = forever $ do
db <- await
queryProducer db "select count(*) from user" () >-> P.map unOnly
main :: IO ()
main = PS.runSafeT $ runEffect $ myDBNames >-> P.tee P.print >-> connectToDB >-> userCount >-> P.print
これはうまくいきます。ただし、これらのデータベースの 1 つで、user テーブルの名前が user ではなく users であるとしましょう。したがって、mysql-simple はそのクエリの実行時に例外をスローします。そのエラーをインラインでキャッチし、それらのクエリに対して 0 ユーザーを返すだけで、続行します。私が試したこと:
(queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0))) >-> P.map unOnly
これはうまくいきません。場合によっては、失敗を出力して 0 を生成し、例外ですぐに終了することがあります。おそらく例外で queryProducer から抜け出したためだと思いました。再度呼び出す必要があるため、この再帰バージョンを試しました。
thequery db >-> P.map unOnly
where
thequery db = queryProducer db "select count(*) from user" () `PS.catchAll` (\e -> (liftIO $ putStrLn "failure") >> yield (Only 0) >> thequery db)
しかし、これも失敗します。ただし、実際にはいくつかのクエリを実行し、失敗を数回出力して、再び例外で終了する前にいくつかの 0 を生成することがあります。なぜこれが起こっているのか、私は本当に混乱しています。
非同期ライブラリによると、パイプが実行されているスレッドに例外を送信する必要があるため、スレッドの問題である可能性はないようです。
私の queryProducer の実装が重要な場合、これは pipes-postgresql クエリ関数の後にモデル化され、Producer に一般化されているため、他のコンビネータに埋め込むことができます。mysql-simple の下の mysql ライブラリには、SQL が意味をなさない場合に ConnectionError をスローする throw があり、この関数全体に浸透します。
{-# LANGUAGE RankNTypes #-}
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Database.MySQL.Simple as My
import Database.MySQL.Simple.QueryParams
import Database.MySQL.Simple.QueryResults
import qualified Pipes
import qualified Pipes.Concurrent as Pipes
--------------------------------------------------------------------------------
-- | Convert a query to a 'Producer' of rows.
--
-- For example,
--
-- > pg <- connectToMysql
-- > query pg "SELECT * FROM widgets WHERE ID = ?" (Only widgetId) >-> print
--
-- Will select all widgets for a given @widgetId@, and then print each row to
-- standard output.
queryProducer
:: (MonadIO m, QueryResults r, QueryParams params)
=> My.Connection -> My.Query -> params -> Pipes.Producer' r m ()
queryProducer c q p = do
(o, i, seal) <- liftIO (Pipes.spawn' Pipes.Single)
worker <- liftIO $ Async.async $ do
My.fold c q p () (const $ void . STM.atomically . Pipes.send o)
STM.atomically seal
liftIO $ Async.link worker
Pipes.fromInput i
また、EitherT を使用して例外をキャッチしようとしました。これは、過去にパイプで行われた方法と思われるためです。しかし、パイプのチュートリアルのドキュメントは 3 から 4 の間に消えてしまい、そのテクニックがまだ推奨されているかどうか疑問に思っています。残念ながら、単数形の await/yields の代わりに queryProducer を使用している方法では、構造化する方法がわからないため、機能させることができませんでした。