いくつかのルートのターンポイントを示す2Dポイントを放出するaobservableがあるとします
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
これは、ロボットが origin から開始し(0,0)
、次に長方形の左下隅、次に右下隅に移動し、その後元に戻り、最後に原点に戻ることを意味します。
ここで、ロボットが一定の速度、たとえば 1 秒あたり 1 単位で動いているかのように、そのポイントにフィードするオブザーバブルが必要だとします。
そのため、最初のオペレーターはそれを受信(0,0)
し、そのまま再送信します。
(0, 10)
次に、このポイントまでの距離が単位であり、そのポイントに到達するのに数秒10
かかることを受信して確認します。10
したがって、新しいオブザーバブルは発行する必要があります
(0, 1)
(0, 2)
(0, 3)
(0, 4)
(0, 5)
(0, 6)
(0, 7)
(0, 8)
(0, 9)
(0, 10)
最後の受信ポイントに到達して再送信されるまで、毎秒 1 ペア。次に、オペレーターは次のポイントを取り、それで同じことを行う必要があります。
ReactiveX でこれを達成するにはどうすればよいですか?
ここで「バックプレッシャー」を実装して、結果として発生するすべてのものが時間内に再送信されるまで、観測可能なソースを遅くする必要があると思いますか?
アップデート
以下のコードを書きましたが、動作します。2 つflatMap
の s を使用します。1 つは追加のポイントを提供し、各ポイントにタイムスタンプ (秒単位) を付け、もう 1 つはflatMap
タイムスタンプに応じた遅延を導入します。
コードは機能しますが、複雑に見えます。問題は解決しません: ReactiveX ライブラリにこれらの種類の既製のツールが含まれているかどうか:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class EmitByTime {
public static double speed = 1;
public static double[][] points = new double[][]{
{0, 0},
{0, 10},
{1, 10},
{1, 0},
{0, 0}
};
public static Observable<double[]> routePoints() {
return Observable
.from(points);
}
public static Observable<double[]> routeFeeder() {
return routePoints()
.flatMap(new Func1<double[], Observable<double[]>>() {
double[] previous = null;
@Override
public Observable<double[]> call(double[] doubles) {
if( previous == null) {
previous = new double[] {doubles[0], doubles[1], 0};
return Observable.just(previous);
}
else {
double dx = doubles[0] - previous[0];
double dy = doubles[1] - previous[1];
double dist = Math.sqrt( dx*dx + dy*dy );
dx /= dist;
dy /= dist;
double vx = dx * speed;
double vy = dy * speed;
double period = dist / speed;
double end = previous[2] + period;
double start = Math.ceil(previous[2] + 1);
ArrayList<double[]> sequence = new ArrayList<>();
for(double t = start; t < end; t+=1) {
double tt = (t - previous[2]);
double x = previous[0] + tt * vx;
double y = previous[1] + tt * vy;
sequence.add(new double[] {x, y, t});
}
previous = new double[] {doubles[0], doubles[1], end};
sequence.add(previous);
return Observable.from(sequence);
}
}
})
.flatMap(new Func1<double[], Observable<double[]>>() {
long origin = System.currentTimeMillis();
@Override
public Observable<double[]> call(double[] doubles) {
long now = System.currentTimeMillis();
long due = Math.round(doubles[2]*1000) + origin;
if( due <= now ) {
return Observable.just(doubles);
}
else {
return Observable.just(doubles).delay(due-now, TimeUnit.MILLISECONDS);
}
}
})
;
}
public static void main(String[] args) throws InterruptedException {
routeFeeder().subscribe(new Action1<double[]>() {
@Override
public void call(double[] doubles) {
System.out.println(Arrays.toString(doubles));
}
});
Thread.sleep(20000);
}
}