ペイロード、つまりStockTicksData idに基づいて最新のイベントを検討し、時間枠内の重複を無視したい
ペイロードとクエリ
var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" },
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);
var query = (from e in stocks
group e by e.ID into ipGroup
from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2),
HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
CusipID = cusipGroup.Key,
Timestamp = win.Max(e => e.Timestamp),
Price = 0
});
var cusipIdGroupCepStream = (from px in query
join lz in stocks
on new { px.CusipID, px.Timestamp }
equals new { lz.CusipID, lz.Timestamp }
select new
{
CusipId = lz.CusipID,
Price = lz.Price,
TimeofArrival = lz.Timestamp
});
上記のクエリは正常に機能しますが、入力アダプタを使用する場合は、出力をフラッシュするためにctiイベントを挿入する必要があります。これがコードです
ダニジェネレーター それは到着の時間を持っています
priceTick.TimeofArrival = DateTime.Now.AddTicks(1);
入力アダプター
.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
pendingEvent = null;
Enqueue(ref currEvent);
// Also send an CTI event
EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
}
同じクエリでは、入力アダプタとctiイベントで期待される出力が得られません
どんな助けでも大歓迎です。