0

私たちの ActivePivot ソリューションでは、株式の価格 (およびボラティリティ パラメーター) に応じてストック オプションの価格を計算するポスト プロセッサを作成しました。評価されると、ポスト プロセッサは (今のところ) Google Finance サービスに接続して、その場で株価を取得します。したがって、ユーザーが ActivePivot でクエリを実行するたびに、集計は最新の価格でリアルタイムで計算されます。

ただし、ActivePivot の継続的なクエリを活用し、変更された集計を (ActivePivot Live の更新ボタンを定期的に押すのではなく) リアルタイムでユーザーにプッシュしたいと考えています。これは通常、価格変更イベントを ActivePivot に伝達し、ActivePivot にサブスクライブされたクエリへの影響を計算させる継続的なハンドラーを作成することによって実装されることがわかっています。しかし、Google Finance はプッシュ API を提供しておらず、何百もの株式を定期的にポーリングしてサービスを攻撃すると、禁止されます。

この問題を回避するために ActivePivot で推奨されるメカニズムは何ですか?

4

1 に答える 1

0

ポスト プロセッサのデータ ソースがあまりにも予測不可能な場合 (あなたのケースのようなプッシュ機能のないリモート Web サービスなど)、最善の方法は、ActivePivotに結果全体を伝えるこのポスト プロセッサのストリームハンドラを作成することです。このポスト プロセッサの数は、 N秒ごとに変更されました。

次のような固定期間で (空の) イベントを送信する TickingStream を作成できます。

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import java.util.Timer;
import java.util.TimerTask;

import com.quartetfs.biz.pivot.IActivePivotSession;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine;
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector;

/**
 * Stream sending an event at a regular rate.
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY)
public class TickingStream extends AStream<Void> {

    private static final long serialVersionUID = 1L;

    public static final String PLUGIN_KEY = "TICKING";

    /** The default ticking period, in ms. **/
    protected static final long DEFAULT_PERIOD = 1000;

    /** The ticking period, in ms. **/
    protected long period = DEFAULT_PERIOD;

    /** The task responsible for sending the ticking events. */
    protected final TimerTask sendEventTask;

    /** The timer that schedules the {@link #sendEventTask}. */
    protected Timer tickingTimer;

    /**
     * Create a ticking stream.
     *
     * @param engine
     * @param session
     */
    public TickingStream(IAggregatesContinuousQueryEngine engine,
            IActivePivotSession session) {
        super(engine, session);

        // Create the task that will send the events.
        sendEventTask = new TimerTask() {

            @Override
            public void run() {
                sendEvent(null);
            }
        };
        // Schedule this task with the default period:
        rescheduleTask();
    }

    /**
     * Schedule the {@link #sendEventTask} with the {@link #period} period.
     * Removes also all previous scheduling of this task.
     */
    protected void rescheduleTask() {
        if (tickingTimer != null) {
            tickingTimer.cancel();
        }
        tickingTimer = new Timer();
        tickingTimer.schedule(sendEventTask, 0, period);
    }

    /**
     * Change the ticking period of this stream. This will reschedule the task
     * according to this new period. This setter will be called via
     * {@link ExtendedPluginInjector extended plugin injection}
     *
     * @param period the period to set. Must be strictly positive.
     *
     * @throws IllegalArgumentException if period is smaller or equal to 0.
     */
    public void setPeriod(long period) {
        if (period <= 0) {
            throw new IllegalArgumentException("Non-positive period.");
        }
        this.period = period;
        rescheduleTask();
    }

    /** {@inheritDoc} */
    @Override
    public Class<Void> getEventType() {
        return Void.class;
    }

    /** {@inheritDoc} */
    @Override
    public String getType() {
        return PLUGIN_KEY;
    }
}

そして、各ティックでこのハンドラーを持つポスト プロセッサに関連するすべての継続的なクエリの一部を更新するように要求するハンドラー:

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import java.util.Collections;

import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.query.aggregates.IImpact;
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler;
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact;
import com.quartetfs.fwk.QuartetExtendedPluginValue;

/**
 * The handler associated with a {@link TickingStream}.
 *
 * This handler asks for a full refresh of the locations queried on
 * post-processors with this handler each time it receives a tick.
 * <p>
 * This is the handler to use for post processors that have unpredictable data
 * sources which prevent the creation of a stream and a handler that can decide
 * which subset of the currently queried locations should be updated in a
 * continuous query.
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY)
public class TickingHandler extends AAggregatesContinuousHandler<Void> {

    private static final long serialVersionUID = 1L;

    /**
     * @param pivot
     */
    public TickingHandler(IActivePivot pivot) {
        super(pivot);
    }

    /**
     * {@inheritDoc}
     * <p>
     * The impact on a queried location is the whole location since there is no
     * way for us to know which part of the location should be updated or not.
     */
    @Override
    public IImpact computeImpact(ILocation location, Void event) {
        return new Impact(location, Collections.singleton(location), Collections.singleton(location));
    }

    /** {@inheritDoc} */
    @Override
    public String getStreamKey() {
        // This handler is made to be used with the TickingStream.
        return TickingStream.PLUGIN_KEY;
    }

    /** {@inheritDoc} */
    @Override
    public String getType() {
        return TickingStream.PLUGIN_KEY;
    }
}

そして、このハンドラーを次のように使用するようにポスト プロセッサーを構成します。

<measure name="..." folder="..." aggregationFunctions="...">
            <postProcessor pluginKey="yourPPpluginKey">
                <properties>
                    <entry key="continuousQueryHandlerKeys" value="TICKING" />
                </properties>
            </postProcessor>
        </measure>

ある意味では、ポスト プロセッサが毎秒 (デフォルトの期間) 呼び出されるため、Google Finance サービスをポーリングすることになります。ただし、これはユーザーがポスト プロセッサを使用して継続的にクエリを実行している場合にのみ発生し、ポスト プロセッサはユーザーがクエリを実行した場所でのみ呼び出されるため、Google Finance の非常に小さなサブセットのみが必要になることを願っています。情報。さらに、ポスト プロセッサへの呼び出しは複数のユーザー間で共有されるため、Google Finance に対して最小限のクエリを発行できます。

于 2013-02-07T10:41:10.240 に答える