7

RxJS を使用して、それぞれ約 1 GB の数百のログ ファイルを処理するスクリプトを作成しようとしています。スクリプトのスケルトンは次のようになります

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

コードは機能しますが、すべてのログ ファイルのフィルタリング手順が同時に開始されることに注意してください。ただし、ファイル システムの IO パフォーマンスの観点からは、ファイルを 1 つずつ処理することをお勧めします (または、数百のファイルすべてを同時に開くよりも、少なくとも同時実行を少数のファイルに制限することをお勧めします)。この点で、「機能的なリアクティブな方法」で実装するにはどうすればよいですか?

私はスケジューラを考えていましたが、ここでどのように役立つかわかりませんでした。

4

2 に答える 2

14

.merge(maxConcurrent)同時実行を制限するために使用できます。.merge(maxConcurrent)メタオブザーバブル (オブザーバブルのオブザーバブル) をオブザーバブルにフラット化するため、出力がメタオブザーバブル ("unflat") になるように を に置き換えてから.flatMap、を.map呼び出す必要があります.merge(maxConcurrent)

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

このコードはテストされていません (あなたが持っている開発環境にアクセスできないため)。RxJS には、同時実行パラメータを持つ多くの演算子はありませんが、ほとんどの場合、必要なことは.merge(maxConcurrent).

于 2014-09-30T17:34:06.550 に答える
1

RxJs 5 で同様の問題を解決したばかりなので、その解決策が同様の問題を抱えている他の人に役立つことを願っています。

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more),
// retry two times, push error on stream if retry fails.

//const Rx = require('rxjs-es6/Rx');

// -- Global variabel just to show that it works. --
let parallelRequests = 0;
// --------------------------------------------------

function simulateRequest(req) {
    console.log("Request " + req);
    // --- To log retries ---
    var retry = 0;
    // ----------------------

    // Can't retry a promise, need to restart before the promise is made.
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => {

        var random = Math.floor(Math.random() * 2000);
        // -- To show that it works --
        if (retry) {
            console.log("Retrying request " + req + " ,retry " + retry);
        } else {

            parallelRequests++;
        }
        // ---------------------------
        setTimeout(() => {
            if (random < 900) {
                retry++;
                return reject(req + " !!!FAILED!!!");
            }

            return resolve(req);
        }, random);
    })).retry(2).catch(e => Rx.Observable.of(e));
}

Rx.Observable.range(1, 10)
    .flatMap(e => simulateRequest(e), null, 2)
    // -- To show that it works --
    .do(() => {
        console.log("ParallelRequests " + parallelRequests);
        parallelRequests--;
    })
    // ---------------------------
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.6/dist/global/Rx.umd.js"></script>

于 2016-08-22T14:19:48.240 に答える