8

私の目的は、Kafka をソースとして使用し、Flink をストリーム処理エンジンとして使用して、高スループットのクラスターをセットアップすることです。これが私がやったことです。

マスターとワーカーで次の構成の 2 ノード クラスターをセットアップしました。

マスター flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

ワーカー flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

マスター ノードのslavesファイルは次のようになります。

<WORKER_IP_ADDR>
localhost

両方のノードの flink セットアップは、同じ名前のフォルダーにあります。実行して、マスターでクラスターを起動します

bin/start-cluster-streaming.sh

これにより、Worker ノードでタスク マネージャーが起動します。

私の入力ソースは Kafka です。これがスニペットです。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

ここに私のシンク機能があります

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

これが私の pom.xml の Flink 依存関係です。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

次に、マスターでこのコマンドを使用してパッケージ化されたjarを実行します

bin/flink run flink-test-jar-with-dependencies.jar

ただし、メッセージを Kafka トピックに挿入すると、Kafka トピックから着信するすべてのメッセージを (SinkFunction実装の呼び出しメソッドのデバッグ メッセージを介して) マスター ノードだけで説明できます。

ジョブ マネージャーの UI では、次のように 2 つのタスク マネージャーが表示されます。 ジョブ マネージャー ダッシュボード - タスク マネージャー

また、ダッシュボードは次のようになります: 質問:ここに画像の説明を入力

  1. ワーカー ノードがタスクを取得しないのはなぜですか?
  2. いくつかの構成がありませんか?
4

1 に答える 1

14

Flink で Kafka ソースから読み取る場合、ソース タスクの最大並列度は、特定の Kafka トピックのパーティション数によって制限されます。Kafka パーティションは、Flink のソース タスクが消費できる最小単位です。ソース タスクよりも多くのパーティションがある場合、一部のタスクは複数のパーティションを消費します。

したがって、100 個のタスクすべてに入力を提供するには、Kafka トピックに少なくとも 100 個のパーティションがあることを確認する必要があります。

トピックのパーティション数を変更できない場合は、setParallelismメソッドを使用して並列度を下げて Kafka から最初に読み取ることもできます。または、rebalance前の操作で使用可能なすべてのタスクでデータをシャッフルする方法を使用できます。

于 2015-09-01T10:14:08.617 に答える