Spark Streaming を使用してストリーミング データに Drools Engine (Drools Fusion) を適用しようとしています。
しかし、タイプ JavaDStream の変数をデシリアライズして、それに対してルールを起動することはできません。
JavaDstream と Interger の比較エラーが表示されます。これを解決する方法を提案してください。以下はコードです
メインクラス
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 8089);
class splitFunc implements Function<String, Long> {
public Long call(String x) { return Long.parseLong(x); }
}
JavaDStream<Long> in_amount = lines.map(new splitFunc());
insertEvent(entryPointStoreOne,new Sale(),in_amount);
Sale.java - 事実
import org.apache.spark.streaming.api.java.JavaDStream;
public class Sale {
private JavaDStream<Long> amount;
public Sale (){ }
public void setAmount(JavaDStream<Long> amount)
{
this.amount=amount;
}
public JavaDStream<Long> getAmount()
{
return amount;
}
}
Drools ルール ファイル
declare Sale
@role(event)
end
rule "Store One - Has Passed its Sales Record"
when
$amount : Sale( amount > 1000 ) - FAILING HERE
from entry-point StoreOne
then
System.out.println("Store One has passed its Sales Records");
end