1

したがって、rxjsを使用して実装する方法を理解しようとしているシナリオは次のとおりです。

  1. ファイル/データベース/etc からメタデータのセットを読み込みます。メタデータの各要素には、ID と追加情報 (実際のデータの場所など) があります。現在、アプリケーションの開始時にこのメタデータをすべて非同期でロードしています。このデータが読み込まれると、Observable 呼び出しが完了します。最終的にはリフレッシュ機能を追加するかもしれません

  2. アプリケーションの後で、メタデータで利用可能なデータに基づいて、特定のデータ セットをロードする必要があります。私は現在、 fetchData(ids:string[]):Observableのような関数でこれを行おうとしています。これは、rxjs パラダイムの下でどのように進めるかについて私が不明な点です。fetchDatum(id:string):Observableのような関数を使用して単一のアイテムをリクエストする方法についても同様に確信が持てません

もちろん、フィルターを使用して、リスト内の名前のいずれかに一致する IMetadata Observable から発行された IMetdata アイテムのみを操作できますが、要求されたすべてのアイテムが IMetadata Observable 発行で見つかることも確認する必要がありますエラーする必要があります。

したがって、誰かが id = "Bob" の IMetadata を要求した場合、ソースの Observable から発行された IMetadata がない場合は、エラーが発生する必要があります。または、{ "Shirley", "Rex", "Samantha" } を要求し、"Rex" のデータがない場合、エラーになります。

ここで Rx.Subject を使用することを検討しましたが、私が読んだことから、rxjs パラダイムでは一般的に望ましくありません。rxjsパラダイムの下で、このシナリオでどのアプローチが機能するかについてアドバイスしてください。ありがとう!

4

1 に答える 1

1

これが私が思いついた解決策です。この関数は、IBufferEvaluator に依存する Observable を作成して、ソース Observable によって発行された各アイテムをどう処理するかを指示します。アイテムをバッファに追加する、発行されたアイテムをスキップする、バッファを消去する、サブスクライバにバッファをフラッシュするなどのことができます。ボックス rxjs ソリューション。ありがとう。

import Rx from 'rxjs/Rx';

export enum BufferAction {    
    APPEND, /** Append the current emission to the buffer and continue  **/
    SKIP, /** Do nothing, ignoring the current emission if applicable  **/
    FLUSH, /** This will ignore the current emission, if applicable, and flush the existing buffer contents */
    CLEAR, /** Clear the buffer contents. Ignore the current emission, if applicable */
    COMPLETE, /** Mark the Observable as Complete. The buffer will be cleared upon completion. **/
    APPEND_THEN_FLUSH,   /** Append the current emission to the buffer prior to flushing it  **/
    APPEND_THEN_COMPLETE, /** Append the current emission to the buffer and then complete **/
    CLEAR_THEN_APPEND, /** Clear the buffer contents and then append the current emission to it */
    FLUSH_THEN_APPEND, /** Flush the buffer contents and then append the current emission to it */
    FLUSH_THEN_COMPLETE, /** Flush the buffer contents and then mark the Observable as complete */
    APPEND_FLUSH_COMPLETE /** Append the current emission, flush the buffer, and then complete  */
}

export function bufferActionToString(action: BufferAction):string
{
    switch(action)
    {
        case BufferAction.APPEND: return "APPEND";
        case BufferAction.SKIP: return "SKIP";
        case BufferAction.FLUSH: return "FLUSH";
        case BufferAction.CLEAR: return "CLEAR";
        case BufferAction.COMPLETE: return "COMPLETE";
        case BufferAction.APPEND_THEN_FLUSH: return "APPEND_THEN_FLUSH";
        case BufferAction.APPEND_THEN_COMPLETE: return "APPEND_THEN_COMPLETE";
        case BufferAction.CLEAR_THEN_APPEND: return "CLEAR_THEN_APPEND";
        case BufferAction.FLUSH_THEN_APPEND: return "FLUSH_THEN_APPEND";
        case BufferAction.FLUSH_THEN_COMPLETE: return "FLUSH_THEN_COMPLETE";
        case BufferAction.APPEND_FLUSH_COMPLETE: return "APPEND_FLUSH_COMPLETE";
        default: return "Unrecognized Buffer Action [" + action + "]";
    }
}

export interface IBufferEvaluator<T>
{
    evalOnNext(next:T, buffer: T[]):BufferAction;
    evalOnComplete(buffer: T[]):BufferAction;
}

/** bufferWithEval.ts
 *  An Operator that buffers the emissions from the source Observable. As each emission is recieved,
 *  it and the buffered emissions are evaluated to determine what BufferAction to APPEND. You can APPEND
 *  the current emission value to the end of the buffered emissions, you can FLUSH the buffered emissions
 *  before or after appending the current emission value, you can SKIP the current emission value and then
 *  (optionally) FLUSH the buffer, and you can CLEAR the buffer before or after appending the current emission.
 *   
 *  The evalOnNext and evalOnComplete are expected to return a BufferAction to indicate
 *  which action to take. If no evalOnNext is supplied, it will default to APPENDing each emission. The evalOnComplete
 *  will default to FLUSH_THEN_COMPLETE. If evalOnNext or evalOnComplete throw an exception, the Observable will emit 
 *  the exception and cease.
 */
export function bufferWithEval<T>
    (   source: Rx.Observable<T>, 
        evaluatorFactory?: () => IBufferEvaluator<T>
    ) : Rx.Observable<T[]> 
{   
    /** if no evaluatorFactory supplied, use the default evaluatorFactory **/
    if(!evaluatorFactory)
    {
        evaluatorFactory = () => {
            return {
                evalOnNext : function(next: T, buffer: T[]) { return BufferAction.APPEND; },
                evalOnComplete : function(buffer: T[]) { return BufferAction.FLUSH; }
            };
        }
    }

    return new Rx.Observable<T[]>((subscriber: Rx.Subscriber<T[]>) => 
    {
        var _buffer = new Array<T>();          
        var _evaluator = evaluatorFactory();
        var _subscription: Rx.Subscription = null;

        function append(next: T)
        {
            _buffer.push(next);
        }

        function flush()
        {
            try
            {
                subscriber.next(_buffer);
            }
            finally
            {
                // Ignore any exceptions that come from subscriber.next()
                clear();
            }          
        }

        function clear()
        {
            _buffer = new Array<T>();
        }

        function next(next: T)
        {
            try
            {
                var action = _evaluator.evalOnNext(next, _buffer.slice(0));                
                switch(action)
                {
                    case BufferAction.APPEND: { append(next); break; }
                    case BufferAction.SKIP: { break; }
                    case BufferAction.FLUSH: { flush(); break; }
                    case BufferAction.CLEAR: { clear(); break; }
                    case BufferAction.COMPLETE: { complete(); break; }
                    case BufferAction.APPEND_THEN_FLUSH: { append(next); flush(); break; }
                    case BufferAction.APPEND_THEN_COMPLETE: { append(next); complete(); break; }
                    case BufferAction.APPEND_FLUSH_COMPLETE: { append(next); flush(); complete(); break; }
                    case BufferAction.CLEAR_THEN_APPEND: { clear(); append(next); break; }
                    case BufferAction.FLUSH_THEN_APPEND: { flush(); append(next); break; }
                    case BufferAction.FLUSH_THEN_COMPLETE: { flush(); complete(); break; }

                    default: throw new Error("next(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }   
            }
            catch(e)
            {
                error(e);
            }          
        }    

        function complete()
        {
            try
            {            
                var action = _evaluator.evalOnComplete(_buffer.slice(0));
                switch(action)
                {
                    case BufferAction.FLUSH_THEN_COMPLETE:
                    case BufferAction.FLUSH:  { flush(); } 

                    case BufferAction.CLEAR: 
                    case BufferAction.COMPLETE: { break; }                   

                    case BufferAction.APPEND:
                    case BufferAction.APPEND_THEN_FLUSH:
                    case BufferAction.APPEND_THEN_COMPLETE:
                    case BufferAction.APPEND_FLUSH_COMPLETE:
                    case BufferAction.SKIP: 
                    case BufferAction.CLEAR_THEN_APPEND: 
                    case BufferAction.FLUSH_THEN_APPEND: 
                    default: throw new Error("complete(): Invalid BufferAction '" + bufferActionToString(action) + "'");
                }

                clear();
                subscriber.complete();
                _subscription.unsubscribe();        
            }
            catch(e)
            {
                error(e);
            }           
        }        

        function error(err: any)
        {                  
            try
            {
                subscriber.error(err);
            }
            finally
            {   
                _subscription.unsubscribe();
            }
        }          

        _subscription = source.subscribe(next, error, complete);
        return _subscription;
    });
}
于 2016-03-15T22:54:02.173 に答える