7

コーパス内の NGram の頻度をカウントするプログラムを作成します。トークンのストリームを消費し、1 つのオーダーの NGrams を生成する関数が既にあります。

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

現時点では、1 つのストリーム コンシューマーをストリーム ソースに接続できます。

tokens --- trigrams --- countFreq

複数のストリーム コンシューマーを同じストリーム ソースに接続するにはどうすればよいですか? 私はこのようなものが欲しいです:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

プラスは、各コンシューマーを並行して実行することです

編集: Petrのおかげで、この解決策を思いつきました

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs
4

1 に答える 1

6

すべてのシンクがすべての値を受け取ることを望んでいると仮定しています。

私はお勧めします:

  1. newBroadcastTMChan新しいチャネルControl.Concurrent.STM.TMChan(stm-chans)を作成するために使用します。
  2. sinkTBMChanこのチャネルを使用して、メイン プロデューサーのfrom Data.Conduit.TMChan(stm-conduit) を使用してシンクを構築します。
  3. クライアントごとdupTMChanに、読み取り用の独自のコピーを作成するために使用します。を使用してこのコピーを読み取る新しいスレッドを開始しますsourceTBMChan
  4. スレッドから結果を収集します。
  5. クライアントがデータを生成されるのと同じ速さで読み取れるようにしてください。そうしないと、ヒープ オーバーフローが発生する可能性があります。

(私はそれを試していないので、それがどのように機能するか教えてください。)


更新:結果を収集する方法の 1 つは、MVarfor each consumer スレッドを作成することです。それらのそれぞれは、それがputMVar終了した後にその結果になります。そして、メインスレッドはtakeMVarこれらすべてのMVars を処理するため、すべてのスレッドが終了するのを待ちます。たとえば、varsが のリストである場合MVar、メイン スレッドはmapM takeMVar varsすべての結果を収集するために発行します。

于 2013-07-29T19:08:16.107 に答える