ポスト プロセッサのデータ ソースがあまりにも予測不可能な場合 (あなたのケースのようなプッシュ機能のないリモート Web サービスなど)、最善の方法は、ActivePivotに結果全体を伝えるこのポスト プロセッサのストリームとハンドラを作成することです。このポスト プロセッサの数は、 N秒ごとに変更されました。
次のような固定期間で (空の) イベントを送信する TickingStream を作成できます。
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Timer;
import java.util.TimerTask;
import com.quartetfs.biz.pivot.IActivePivotSession;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine;
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector;
/**
* Stream sending an event at a regular rate.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY)
public class TickingStream extends AStream<Void> {
private static final long serialVersionUID = 1L;
public static final String PLUGIN_KEY = "TICKING";
/** The default ticking period, in ms. **/
protected static final long DEFAULT_PERIOD = 1000;
/** The ticking period, in ms. **/
protected long period = DEFAULT_PERIOD;
/** The task responsible for sending the ticking events. */
protected final TimerTask sendEventTask;
/** The timer that schedules the {@link #sendEventTask}. */
protected Timer tickingTimer;
/**
* Create a ticking stream.
*
* @param engine
* @param session
*/
public TickingStream(IAggregatesContinuousQueryEngine engine,
IActivePivotSession session) {
super(engine, session);
// Create the task that will send the events.
sendEventTask = new TimerTask() {
@Override
public void run() {
sendEvent(null);
}
};
// Schedule this task with the default period:
rescheduleTask();
}
/**
* Schedule the {@link #sendEventTask} with the {@link #period} period.
* Removes also all previous scheduling of this task.
*/
protected void rescheduleTask() {
if (tickingTimer != null) {
tickingTimer.cancel();
}
tickingTimer = new Timer();
tickingTimer.schedule(sendEventTask, 0, period);
}
/**
* Change the ticking period of this stream. This will reschedule the task
* according to this new period. This setter will be called via
* {@link ExtendedPluginInjector extended plugin injection}
*
* @param period the period to set. Must be strictly positive.
*
* @throws IllegalArgumentException if period is smaller or equal to 0.
*/
public void setPeriod(long period) {
if (period <= 0) {
throw new IllegalArgumentException("Non-positive period.");
}
this.period = period;
rescheduleTask();
}
/** {@inheritDoc} */
@Override
public Class<Void> getEventType() {
return Void.class;
}
/** {@inheritDoc} */
@Override
public String getType() {
return PLUGIN_KEY;
}
}
そして、各ティックでこのハンドラーを持つポスト プロセッサに関連するすべての継続的なクエリの一部を更新するように要求するハンドラー:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Collections;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.query.aggregates.IImpact;
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler;
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
* The handler associated with a {@link TickingStream}.
*
* This handler asks for a full refresh of the locations queried on
* post-processors with this handler each time it receives a tick.
* <p>
* This is the handler to use for post processors that have unpredictable data
* sources which prevent the creation of a stream and a handler that can decide
* which subset of the currently queried locations should be updated in a
* continuous query.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY)
public class TickingHandler extends AAggregatesContinuousHandler<Void> {
private static final long serialVersionUID = 1L;
/**
* @param pivot
*/
public TickingHandler(IActivePivot pivot) {
super(pivot);
}
/**
* {@inheritDoc}
* <p>
* The impact on a queried location is the whole location since there is no
* way for us to know which part of the location should be updated or not.
*/
@Override
public IImpact computeImpact(ILocation location, Void event) {
return new Impact(location, Collections.singleton(location), Collections.singleton(location));
}
/** {@inheritDoc} */
@Override
public String getStreamKey() {
// This handler is made to be used with the TickingStream.
return TickingStream.PLUGIN_KEY;
}
/** {@inheritDoc} */
@Override
public String getType() {
return TickingStream.PLUGIN_KEY;
}
}
そして、このハンドラーを次のように使用するようにポスト プロセッサーを構成します。
<measure name="..." folder="..." aggregationFunctions="...">
<postProcessor pluginKey="yourPPpluginKey">
<properties>
<entry key="continuousQueryHandlerKeys" value="TICKING" />
</properties>
</postProcessor>
</measure>
ある意味では、ポスト プロセッサが毎秒 (デフォルトの期間) 呼び出されるため、Google Finance サービスをポーリングすることになります。ただし、これはユーザーがポスト プロセッサを使用して継続的にクエリを実行している場合にのみ発生し、ポスト プロセッサはユーザーがクエリを実行した場所でのみ呼び出されるため、Google Finance の非常に小さなサブセットのみが必要になることを願っています。情報。さらに、ポスト プロセッサへの呼び出しは複数のユーザー間で共有されるため、Google Finance に対して最小限のクエリを発行できます。