トリガーで実行する必要があるタスクがあります (実行する必要があるときにその関数にブール値の true を渡すなど)。
これには、Thread、TimerTask、ScheduledThreadPool などの任意のスレッド化メカニズムを使用できます。トリガー間隔が変動するため、使用するスレッドまたはメカニズムは、タスクの後にリソースを解放する必要がある最善の方法を提案してください。
トリガーで実行する必要があるタスクがあります (実行する必要があるときにその関数にブール値の true を渡すなど)。
これには、Thread、TimerTask、ScheduledThreadPool などの任意のスレッド化メカニズムを使用できます。トリガー間隔が変動するため、使用するスレッドまたはメカニズムは、タスクの後にリソースを解放する必要がある最善の方法を提案してください。
監視可能なイベント ストリームを処理するためのリアクティブ プログラミング API を提供するRxJavaを使用できます。Reactive Extensions for .NETに基づいています。イベントは時間ベースの場合があります (例: Observable.interval()
)。それらはコレクション ( Observable.from(Iterable)
) から取得できます。を使用して自分で公開することもできますSubject
。
//
// NOTE: For brevity, I'm using Java 8 syntax (lambdas and method references).
//
public static void main(String[] args) throws Throwable {
System.out.println("Main Thread: " + Thread.currentThread().getId());
final CountDownLatch done = new CountDownLatch(1);
final Observable<Trade> trades = simulateTrades();
trades.where(t -> Math.abs(t.quantity) * t.price >= 250000)
.observeOn(Schedulers.threadPoolForIO())
.subscribe(
Main::logLargeTrade,
e -> { e.printStackTrace(); done.countDown(); },
done::countDown
);
done.await();
System.out.println("Done!");
}
private static Observable<Trade> simulateTrades() {
final Random r = new Random();
return Observable.interval(50L, TimeUnit.MILLISECONDS)
.take(100)
.map(
t -> new Trade(
Instant.now(),
"AAPL",
(r.nextInt(9) + 1) * 100,
500d + r.nextDouble() * 5d
)
);
}
private static void logLargeTrade(Trade t) {
System.out.printf(
"[%d: %s] Large Trade: %d %s @ %f%n",
Thread.currentThread().getId(),
t.timestamp.atOffset(ZoneOffset.UTC).toLocalDateTime(),
t.quantity,
t.symbol,
t.price
);
}
final static class Trade {
final Instant timestamp;
final String symbol;
final double price;
final int quantity;
Trade(Instant time, String symbol, int quantity, double price) {
this.timestamp = time;
this.symbol = symbol;
this.quantity = quantity;
this.price = price;
}
}
ここに、trades
Trade イベントのストリームがあります。トリガー条件は、少なくとも $250,000 の値を持つトランザクションに一致するため、where()
その条件に一致する取引のみを含めるために使用します。トリガー アクションをスレッド プールで実行する必要があるため、スレッド プールを使用observeOn()
するスケジューラを指定するために使用します。 subscribe()
ご想像のとおり、フィルタリングされたイベント ストリームへのサブスクリプションが作成されます。この特定のオーバーロードにより、、、およびコールバックをラムダとして渡すことがonNext
できonError
ますonCompleted
。
このsimulateTrades()
メソッドは、サブスクライブするイベント ストリームを作成します。通常、これらのイベントはメッセージング システムを介して受信されるか、プロセス内の別のコンポーネントによって発行されます。例として、タイマー間隔を使用して 1 秒あたり 10 取引を投稿し、100 取引後に終了します。