1

私の仕事は、ローカル ドライブから csv ファイルを読み取ることです。一定の間隔で定期的に更新される在庫データが含まれています。次に、読み取ったデータに対していくつかの操作を実行し、定期的に新しい csv ファイルに書き戻します。

私のコードはWikipediaAnalysisコードから適応されています。問題は、実行が 1 回しか行われないことです。最初の実行後、プログラムは終了し、実行中の状態にはなりません。WikipediaAnalysis のように 5 秒間隔 (私が設定) で実行するわけではありません。以下は私のコードです:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = getCsvDataStream(see);
DataStream<Stock> edits = dataStream.flatMap(new Tokenizer());
@SuppressWarnings("serial")
KeyedStream<Stock, String> keyedEdits = edits.keyBy(new KeySelector<Stock, String>() {
            @Override
            public String getKey(Stock event) {
                java.util.Date time = new java.util.Date((long) event.getTimeStamp() * 1000);
                return time.toString();
            }
        });

@SuppressWarnings("serial")
DataStream<Tuple2<Double, Double>> result = keyedEdits.timeWindow(Time.seconds(5))
                .fold(new Tuple2<>(0.0d, 0.0d), new FoldFunction<Stock, Tuple2<Double, Double>>() {
                    @Override
                    public Tuple2<Double, Double> fold(Tuple2<Double, Double> acc, Stock event) {
                        acc.f0 = event.getClose();
                        acc.f1 = event.getOpen();                           
                        return acc;
                    }
                });

提案していただけませんか!!

4

0 に答える 0