8

金融ドメインでは、通常、時系列データのストリームから移動ウィンドウ集計値を計算する必要があります。例として移動平均を使用します。たとえば、次のデータ ストリームがあるとします (T はタイム スタンプ、V は実際の値)。

[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],......

取得したストリームから移動平均 3 を計算するには:

avg([T0,V0],[T1,V1],[T2,V2]),
avg([T1,V1],[T2,V2],[T3,V3]),
avg([T2,V2],[T3,V3],[T4,V4]),
avg([T3,V3],[T4,V4],[T5,V5]),
avg([T4,V4],[T5,V5],[T6,V6]),...

移動平均を計算するには、次のようにして実行できるようです。

  1. 元のストリームから Observable を構築する
  2. 値をグループに集約して、元のストリームから Observable を構築します
  3. 集約演算子を使用して、ステップ 2 で Observable からの最終結果を計算します。

ステップ 1 と 3 は簡単に実装できますが、ステップ 2 については、現在の RxJava には移動ウィンドウ グループを生成するためのビルトイン オペレータがないようです。既存のオペレーターからソリューションを構成する簡単な方法ですが、RxJavaでこれを「エレガントに」行う方法を提案できる人はいますか?

4

2 に答える 2

6

RxJava バージョン: 0.15.1

import java.util.List;                                                          
import rx.Observable;                                                           
import rx.util.functions.Action1;                                               
                                                                                
class Bar {                                                                     
                                                                                
    public static void main(String args[]) {                                    
                                                                                
        Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6                            
        Observable<Integer> oi = Observable.from(arr);                          
                                                                                
        // 1.- bundle 3, skip 1                                                 
        oi.buffer(3, 1)                                                         
        /**                                                                     
         * 2.- take only the first X bundles                                    
         * When bundle 3, X = N - 2 => 4                                        
         * When bundle 4, X = N - 3 => 3                                        
         * When bundle a, X = N - (a-1)                                         
         */                                                                     
          .take(4)                                                              
        // 3.- calculate average                                                
          .subscribe(new Action1<List<Integer>>() {                             
            @Override                                                           
            public void call(List<Integer> lst) {                               
                int sum = 0;                                                    
                for(int i = 0; i < lst.size(); i++) {                           
                    sum += lst.get(i);                                          
                }                                                               
                                                                                
                System.out.println("MA(3) " + lst +                             
                                   " => " + sum / lst.size());                  
            }                                                                   
        });                                                                     
                                                                                
    }                                                                           
                                                                                
}  

出力例:

MA(3) [1, 2, 3] => 2

MA(3) [2, 3, 4] => 3

MA(3) [3, 4, 5] => 4

MA(3) [4, 5, 6] => 5

于 2013-12-28T08:53:40.127 に答える