5

Flink の KafkaSource で簡単なテスト プログラムを実行しようとしています。私は以下を使用しています:

  • フリンク 0.9
  • スカラ 2.10.4
  • カフカ 0.8.2.1

hereおよびhere で説明されているように、ドキュメントに従ってKafkaSourceをテストしました(依存関係を追加し、プラグインにKafkaコネクタflink-connector-kafkaをバンドルします)。

以下は私の簡単なテストプログラムです:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

ただし、コンパイルでは常に KafkaSource が見つからないというエラーが表示されます。

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

ここで何が恋しいですか?

4

3 に答える 3

3

私はsbtユーザーなので、以下を使用しましたbuild.sbt

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

これにより、プログラムを実行できました:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

出力:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
于 2015-07-15T07:32:01.810 に答える
1

問題は、SBT と Maven プロファイルがうまく連携しないことにあるようです。

Flink POM は、Scala バージョン (2.10、2.11、...) を変数として参照し、一部はビルド プロファイルで定義されます。プロファイルが SBT から適切に評価されないため、パッケージ化が正しく機能しません。

これを修正するための問題と保留中のプル リクエストがあります: https://issues.apache.org/jira/browse/FLINK-2408

于 2015-08-10T14:06:29.093 に答える