0

ペイロード、つまりStockTicksData idに基づいて最新のイベントを検討し、時間枠内の重複を無視したい

ペイロードとクエリ

var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105"    }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" }, 
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);

var query = (from e in stocks
        group e by e.ID into ipGroup
        from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2), 
            HoppingWindowOutputPolicy.ClipToWindowEnd)
        select new 
        {   
             CusipID = cusipGroup.Key,
             Timestamp = win.Max(e => e.Timestamp),
             Price = 0
        });

var cusipIdGroupCepStream = (from px in query
                         join lz in stocks
                         on new { px.CusipID, px.Timestamp }
                         equals new { lz.CusipID, lz.Timestamp }
                         select new 
                           {
                              CusipId = lz.CusipID,
                              Price = lz.Price,
                              TimeofArrival = lz.Timestamp
                           });

上記のクエリは正常に機能しますが、入力アダプタを使用する場合は、出力をフラッシュするためにctiイベントを挿入する必要があります。これがコードです

ダニジェネレーター それは到着の時間を持っています

priceTick.TimeofArrival = DateTime.Now.AddTicks(1);

入力アダプター

.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
  pendingEvent = null;

 Enqueue(ref currEvent);

 // Also send an CTI event
 EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
 }

同じクエリでは、入力アダプタとctiイベントで期待される出力が得られません

どんな助けでも大歓迎です。

4

1 に答える 1

0

上記のクエリは、生成されるCTIイベントがないため、LinqPadで機能します。入出力アダプターを使用して構成した場合、同じクエリが結果の完全なセットを吐き出すことはありません。

構成する必要のある出力をフラッシュするには、入力ファクトリのITypedDeclareAdvanceTimePropertiesを介してCTIイベントを生成します。設定に基づいてCTIイベントの生成を処理します。私は以下の構成を使用しています

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties(StockTickerInputConfig configInfo、EventShape eventShape){return new AdapterAdvanceTimeSettings(new AdvanceTimeGenerationSettings(configInfo.CtiFrequency、TimeSpan.FromTicks(1))、AdvanceTimePolicy.Adjust); }

データの頻度が毎秒非常に高いポイントイベントの場合、イベントをドロップするか、出力アダプタにフラッシュしません。結果が出力アダプターにフラッシュアウトされるように、LASTCTIイベントを手動でエンキューする必要があります。

入力ファクトリの現在の設定では、最後のデータフィード/イベントがキューに入れられた後の最後のCTIの生成は処理されません。入力アダプターに条件を作成して、これがエンキューされる最後のイベントであるかどうかを確認してから、EnqueueCtiEventを介してctiイベントをエンキューします。

必要に応じてコードを提供できます。これを解決するためのより良い方法がある人がいることを知らせてください。

于 2012-05-17T05:37:10.533 に答える