0

現在、私は次の問題で立ち往生しています: KafkaConsumer を使用して Kafka トピックからメッセージを読んでいます。メッセージは文字列で、次の形式になっています { "a" : "b", "a1" : "b1", "c2" : "c3" } 。 FlowFile のペイロード内に保存されます。

その文字列をjsonまたは理想的にはcsvに変換したいのですが、その方法がわかりません。

私はNiFiを初めて使用し、可能な限り調査しましたが、見つけた答えは、jsonからavroまたは同様のものへの変換に関するものでしたが、jsonまたはavroへの文字列は決してありませんでした。また、Kafka メッセージは属性ではなく FlowFile のペイロードにあることもわかりました。例には常に属性が含まれているため、それを取得する方法がわかりません。

つまり、文字列である FlowFile のペイロードを、組み込みプロセッサを使用して json/cvs に変換できますか。

4

2 に答える 2

0

私はこれをやってしまった:

  1. ConsumeKafka は私に文字列を与えます:

{ "a" : "b", "a1" : "b1" }

  1. EvaluateJsonPath はプロパティを追加して属性を作成します

a -> $.a //results in attribute named a with value b

a1 -> $.a1 //results in attribute named a1 with value b1

  1. ReplaceText は EvaluateJsonPath から属性を取得して、フォーマットされた 1 つの csv を形成します。

Replacement value -> ${'a'},${'a1'}

これは単一の行になりますが、改行はありません:

b,b1

\n'\n'"\n" を追加する新しい行を追加するには、機能しませんでした。 機能したのは、置換値フィールドに入力中にShift+Enterを押すことで、空の新しい行が作成されました。

于 2017-08-23T06:37:30.907 に答える