3

だから私が取り組もうとしている問題は次のとおりです。

  • 特定の頻度でメッセージを送信するデータ ソースが必要です
  • 各メッセージを個別に処理する必要がある N 個のニューラル ネットワークがあります。
  • すべてのニューラル ネットワークからの出力が集約され、各メッセージの N 個の出力がすべて収集された場合にのみ、メッセージが完全に処理されたと宣言されます。
  • 最後に、メッセージが完全に処理されるまでにかかった時間を測定する必要があります (メッセージが送信されてから、そのメッセージからの N 個のニューラル ネットワーク出力がすべて収集されるまでの時間)。

スパーク ストリーミングを使用して、このようなタスクにどのようにアプローチするのか興味があります。

私の現在の実装では、3 種類のコンポーネントを使用しています。1 つはカスタム レシーバー、もう 1 つは Function を実装する 2 つのクラスで、1 つはニューラル ネット用、もう 1 つはエンド アグリゲーター用です。

大まかに言えば、私のアプリケーションは次のように構築されています。

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));

Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);

for(int i = 0; i < numberOfNets; i++){
    rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}

ただし、これに関する主な問題は、4 ノード クラスターにサブミットされた場合よりもローカル モードの方が高速に実行されることです。

私の実装はそもそも間違っていますか、それともここで何か他のことが起こっていますか?

ここにも完全な投稿がありますhttp://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html 3つのそれぞれの実装に関する詳細が記載されています前述のコンポーネント。

4

1 に答える 1

5

オブジェクトのインスタンス化とシリアル化が何度も繰り返されるようです。後者は、クラスターでのパフォーマンスに影響を与える可能性があります。

ニューラル ネットワークのインスタンス化は 1 回だけ試してください。それらがシリアライズ可能であることを確認する必要があります。複数の s + のflatMap代わりに使用する必要があります。これらの行に沿ったもの:mapunion

// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
    neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}

// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
    @Override
    public Iterable<Result> call(Item item) {
        List<Result> results = new ArrayList<>(numberOfNets);
        for (int i = 0; i < numberOfNets; i++) {
            results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
        }
        return results;
    }
});

// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);

この方法でネットワークを初期化する余裕がある場合は、かなりの時間を節約できます。また、unionリンクされた投稿に含めたものは不要に見え、パフォーマンスに悪影響を及ぼしてflatMapいます。

最後に、クラスター内のパフォーマンスをさらに調整するために、Kryo シリアライザーを使用できます

于 2014-09-10T13:23:01.000 に答える