地理座標点を含む json オブジェクトを扱っています。ポリゴン マッチングのポイントを評価するために、ローカルにある postgis サーバーに対してこれらのポイントを実行したいと思います。
既存のプロセッサでこれを行うことを望んでいます-「EvaluateJsonPath」プロセッサを使用して緯度/経度座標を属性に正常に抽出し、「ExecuteSQL」を使用してローカル postgis データストアにクエリを正常に発行しています。これにより、avro 応答が残り、"ConvertAvroToJSON" プロセッサを使用して JSON に変換できます。
クエリの結果を元の JSON オブジェクトとマージする方法に概念的な問題があります。現状では、同じフラグメント ID を持つ 2 つのフロー ファイルがあり、理論的には「mergecontent」でマージできますが、次のようになります。
{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}
SQLクエリの結果を元のjson構造にマージするための提案された戦略はありますか?私の結果は代わりに次のようになります:
{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}
nifi 6.0、postgres 9.5.2、および postgis 2.2.1 を実行しています。
https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.htmlで replaceText プロセッサを使用することへの言及を見ましたが、これは属性からコンテンツを本体にマージしているようですコンテンツ。元のコンテンツと SQL 応答のコンテンツ、またはコンテンツなしで SQL 応答から抽出された属性のいずれかをマージするポイントがありません。
編集:
次の Groovy スクリプトは、必要なことを実行しているように見えます。私はグルーヴィーなコーダーではないので、改善は大歓迎です。
import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper
def flowFile = session.get();
if (flowFile == null) {
return;
}
def slurper = new JsonSlurper()
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def obj = slurper.parseText(text)
def originaljsontext = flowFile.getAttribute('original.json')
def originaljson = slurper.parseText(originaljsontext)
originaljson.put("point_polygon_info", obj)
outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)