私の目的は、Kafka をソースとして使用し、Flink をストリーム処理エンジンとして使用して、高スループットのクラスターをセットアップすることです。これが私がやったことです。
マスターとワーカーで次の構成の 2 ノード クラスターをセットアップしました。
マスター flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
ワーカー flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
マスター ノードのslavesファイルは次のようになります。
<WORKER_IP_ADDR>
localhost
両方のノードの flink セットアップは、同じ名前のフォルダーにあります。実行して、マスターでクラスターを起動します
bin/start-cluster-streaming.sh
これにより、Worker ノードでタスク マネージャーが起動します。
私の入力ソースは Kafka です。これがスニペットです。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
ここに私のシンク機能があります
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
これが私の pom.xml の Flink 依存関係です。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
次に、マスターでこのコマンドを使用してパッケージ化されたjarを実行します
bin/flink run flink-test-jar-with-dependencies.jar
ただし、メッセージを Kafka トピックに挿入すると、Kafka トピックから着信するすべてのメッセージを (SinkFunction実装の呼び出しメソッドのデバッグ メッセージを介して) マスター ノードだけで説明できます。
ジョブ マネージャーの UI では、次のように 2 つのタスク マネージャーが表示されます。

- ワーカー ノードがタスクを取得しないのはなぜですか?
- いくつかの構成がありませんか?
