これが私が思いついた解決策です。この関数は、IBufferEvaluator に依存する Observable を作成して、ソース Observable によって発行された各アイテムをどう処理するかを指示します。アイテムをバッファに追加する、発行されたアイテムをスキップする、バッファを消去する、サブスクライバにバッファをフラッシュするなどのことができます。ボックス rxjs ソリューション。ありがとう。
import Rx from 'rxjs/Rx';
export enum BufferAction {
APPEND, /** Append the current emission to the buffer and continue **/
SKIP, /** Do nothing, ignoring the current emission if applicable **/
FLUSH, /** This will ignore the current emission, if applicable, and flush the existing buffer contents */
CLEAR, /** Clear the buffer contents. Ignore the current emission, if applicable */
COMPLETE, /** Mark the Observable as Complete. The buffer will be cleared upon completion. **/
APPEND_THEN_FLUSH, /** Append the current emission to the buffer prior to flushing it **/
APPEND_THEN_COMPLETE, /** Append the current emission to the buffer and then complete **/
CLEAR_THEN_APPEND, /** Clear the buffer contents and then append the current emission to it */
FLUSH_THEN_APPEND, /** Flush the buffer contents and then append the current emission to it */
FLUSH_THEN_COMPLETE, /** Flush the buffer contents and then mark the Observable as complete */
APPEND_FLUSH_COMPLETE /** Append the current emission, flush the buffer, and then complete */
}
export function bufferActionToString(action: BufferAction):string
{
switch(action)
{
case BufferAction.APPEND: return "APPEND";
case BufferAction.SKIP: return "SKIP";
case BufferAction.FLUSH: return "FLUSH";
case BufferAction.CLEAR: return "CLEAR";
case BufferAction.COMPLETE: return "COMPLETE";
case BufferAction.APPEND_THEN_FLUSH: return "APPEND_THEN_FLUSH";
case BufferAction.APPEND_THEN_COMPLETE: return "APPEND_THEN_COMPLETE";
case BufferAction.CLEAR_THEN_APPEND: return "CLEAR_THEN_APPEND";
case BufferAction.FLUSH_THEN_APPEND: return "FLUSH_THEN_APPEND";
case BufferAction.FLUSH_THEN_COMPLETE: return "FLUSH_THEN_COMPLETE";
case BufferAction.APPEND_FLUSH_COMPLETE: return "APPEND_FLUSH_COMPLETE";
default: return "Unrecognized Buffer Action [" + action + "]";
}
}
export interface IBufferEvaluator<T>
{
evalOnNext(next:T, buffer: T[]):BufferAction;
evalOnComplete(buffer: T[]):BufferAction;
}
/** bufferWithEval.ts
* An Operator that buffers the emissions from the source Observable. As each emission is recieved,
* it and the buffered emissions are evaluated to determine what BufferAction to APPEND. You can APPEND
* the current emission value to the end of the buffered emissions, you can FLUSH the buffered emissions
* before or after appending the current emission value, you can SKIP the current emission value and then
* (optionally) FLUSH the buffer, and you can CLEAR the buffer before or after appending the current emission.
*
* The evalOnNext and evalOnComplete are expected to return a BufferAction to indicate
* which action to take. If no evalOnNext is supplied, it will default to APPENDing each emission. The evalOnComplete
* will default to FLUSH_THEN_COMPLETE. If evalOnNext or evalOnComplete throw an exception, the Observable will emit
* the exception and cease.
*/
export function bufferWithEval<T>
( source: Rx.Observable<T>,
evaluatorFactory?: () => IBufferEvaluator<T>
) : Rx.Observable<T[]>
{
/** if no evaluatorFactory supplied, use the default evaluatorFactory **/
if(!evaluatorFactory)
{
evaluatorFactory = () => {
return {
evalOnNext : function(next: T, buffer: T[]) { return BufferAction.APPEND; },
evalOnComplete : function(buffer: T[]) { return BufferAction.FLUSH; }
};
}
}
return new Rx.Observable<T[]>((subscriber: Rx.Subscriber<T[]>) =>
{
var _buffer = new Array<T>();
var _evaluator = evaluatorFactory();
var _subscription: Rx.Subscription = null;
function append(next: T)
{
_buffer.push(next);
}
function flush()
{
try
{
subscriber.next(_buffer);
}
finally
{
// Ignore any exceptions that come from subscriber.next()
clear();
}
}
function clear()
{
_buffer = new Array<T>();
}
function next(next: T)
{
try
{
var action = _evaluator.evalOnNext(next, _buffer.slice(0));
switch(action)
{
case BufferAction.APPEND: { append(next); break; }
case BufferAction.SKIP: { break; }
case BufferAction.FLUSH: { flush(); break; }
case BufferAction.CLEAR: { clear(); break; }
case BufferAction.COMPLETE: { complete(); break; }
case BufferAction.APPEND_THEN_FLUSH: { append(next); flush(); break; }
case BufferAction.APPEND_THEN_COMPLETE: { append(next); complete(); break; }
case BufferAction.APPEND_FLUSH_COMPLETE: { append(next); flush(); complete(); break; }
case BufferAction.CLEAR_THEN_APPEND: { clear(); append(next); break; }
case BufferAction.FLUSH_THEN_APPEND: { flush(); append(next); break; }
case BufferAction.FLUSH_THEN_COMPLETE: { flush(); complete(); break; }
default: throw new Error("next(): Invalid BufferAction '" + bufferActionToString(action) + "'");
}
}
catch(e)
{
error(e);
}
}
function complete()
{
try
{
var action = _evaluator.evalOnComplete(_buffer.slice(0));
switch(action)
{
case BufferAction.FLUSH_THEN_COMPLETE:
case BufferAction.FLUSH: { flush(); }
case BufferAction.CLEAR:
case BufferAction.COMPLETE: { break; }
case BufferAction.APPEND:
case BufferAction.APPEND_THEN_FLUSH:
case BufferAction.APPEND_THEN_COMPLETE:
case BufferAction.APPEND_FLUSH_COMPLETE:
case BufferAction.SKIP:
case BufferAction.CLEAR_THEN_APPEND:
case BufferAction.FLUSH_THEN_APPEND:
default: throw new Error("complete(): Invalid BufferAction '" + bufferActionToString(action) + "'");
}
clear();
subscriber.complete();
_subscription.unsubscribe();
}
catch(e)
{
error(e);
}
}
function error(err: any)
{
try
{
subscriber.error(err);
}
finally
{
_subscription.unsubscribe();
}
}
_subscription = source.subscribe(next, error, complete);
return _subscription;
});
}