5

clojure で実行されている Apache flink から小さな例を取得しようとしていますが、clojure の型のヒントと flink の奇妙な癖のために、現在行き詰まっています。

これが私のコードです:

(ns pipeline.core
 (:import
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String)))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def dataset (.fromElements flink-env (to-array ["please test me"])))

(defn tokenizer [] (reify FlatMapFunction
                 ( flatMap [this value collector] 
                   (println value))))

(.flatMap dataset (tokenizer))

タイプヒントを提供しないと、flink API からエラーが発生します。

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)

タイプヒントを提供する場合:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector collector] 
                   (println value))))

clojure コンパイラーからエラーが発生します。

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065) 

ジェネリック クラスを使用して clojure に型ヒントを追加する方法はありますか? 次のようになります。

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
                   (println value))))

しかし、それはうまくいきません。何か案は?

ライン構成は次のようになります。

(defproject pipeline "0.1.0-SNAPSHOT"
 :description "FIXME: write description"
 :url "http://example.com/FIXME"
 :license {:name "Eclipse Public License"
        :url "http://www.eclipse.org/legal/epl-v10.html"}
 :dependencies [[org.clojure/clojure "1.7.0"]               
             [org.apache.flink/flink-java "0.9.0"]              
             ]
  :aot :all)
4

2 に答える 2

3

Clojure はリフレクションを処理できないため、Flink メソッドを介して手動で戻り値の型を指定する必要がありますreturns

(.returns (.flatMap dataset (tokenizer)) String)

さらに、Flink は匿名クラスを処理できないため、新しいオブジェクトを使用する場合は、新しいオブジェクトdeftypeを定義してインスタンス化するために使用する必要があります。tokenizer

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector] 
                        (println value)))

(.flatMap dataset (tokenizer.))

これは、jar にパックして実行できる完全な「Word-Count-Example」です。

型ヒントとキャストに注意してください。tokenizer出力(int 1)が必要な場合は、それ以外の場合Longは の 2 番目のタイプになりTuple2ます。さらに、String を使用して出力タイプを宣言しますtokenizer(リフレクション タイプも指定する必要があるため、クラス タイプでは不十分です)。最後に(int-array [0])、オーバーロードを解決するためのヒントを入力する必要がありますgroupBy(ヒントがないと、Clojure コンパイラにとってメソッドがあいまいになります)。

(ns org.apache.flink.flink-clojure.WordCount
 (:import
 (org.apache.flink.api.common.functions FlatMapFunction)
 (org.apache.flink.api.java DataSet)
 (org.apache.flink.api.java ExecutionEnvironment)
 (org.apache.flink.api.java.tuple Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String))
 (:require [clojure.string :as str])
 (:gen-class))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def text (.fromElements flink-env (to-array ["please test me and me too"])))

(deftype tokenizer [] FlatMapFunction
                      (flatMap [this value collector]
                        (doseq [v (str/split value #"\s")]
                          (.collect collector (Tuple2. v (int 1))))))

(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))

(def counts (.sum (.groupBy tokens (int-array [0])) 1))

(defn -main []
  (.print counts)
)
于 2015-08-20T16:34:16.697 に答える