3

snappy shell から kafka を使用して Spark ストリーミング テーブルを作成すると、問題が発生します。

「例外「無効な入力 'C'、予想される dmlOperation、挿入、withIdentifier、選択または配置 (行 1、列 1):」

参照: http://snappydatainc.github.io/snappydata/streamingWithSQL/#spark-streaming-overview

ここに私のSQLがあります:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string)
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2',
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
    zkQuorum 'localhost:2181',
    groupId 'streamConsumer',
    topics 'test:01');

シェルは、最初の文字「C」のスクリプトが気に入らないようです。次のコマンドを使用してスクリプトを実行しようとしています。

snappy> run '/scripts/my_test_sensor_script.sql';

どんな助けでも大歓迎です!

4

2 に答える 2

0

マイク、次の特性を実装して、独自の rowConverter クラスを作成する必要があります -

trait StreamToRowsConverter extends Serializable {
  def toRows(message: Any): Seq[Row]
}

次に、DDL でその rowConverter の完全修飾クラス名を指定します。rowConverter はスキーマに固有です。「io.snappydata.app.streaming.KafkaStreamToRowsConverter」は単なるプレースホルダー クラス名であり、独自の rowConverter クラスに置き換える必要があります。

于 2016-08-10T06:10:43.983 に答える