4

地理座標点を含む json オブジェクトを扱っています。ポリゴン マッチングのポイントを評価するために、ローカルにある postgis サーバーに対してこれらのポイントを実行したいと思います。

既存のプロセッサでこれを行うことを望んでいます-「EvaluateJsonPath」プロセッサを使用して緯度/経度座標を属性に正常に抽出し、「ExecuteSQL」を使用してローカル postgis データストアにクエリを正常に発行しています。これにより、avro 応答が残り、"ConvertAvroToJSON" プロセッサを使用して JSON に変換できます。

クエリの結果を元の JSON オブジェクトとマージする方法に概念的な問題があります。現状では、同じフラグメント ID を持つ 2 つのフロー ファイルがあり、理論的には「mergecontent」でマージできますが、次のようになります。

{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}

SQLクエリの結果を元のjson構造にマージするための提案された戦略はありますか?私の結果は代わりに次のようになります:

{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}

nifi 6.0、postgres 9.5.2、および postgis 2.2.1 を実行しています。

https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.htmlで replaceText プロセッサを使用することへの言及を見ましたが、これは属性からコンテンツを本体にマージしているようですコンテンツ。元のコンテンツと SQL 応答のコンテンツ、またはコンテンツなしで SQL 応答から抽出された属性のいずれかをマージするポイントがありません。

編集:

次の Groovy スクリプトは、必要なことを実行しているように見えます。私はグルーヴィーなコーダーではないので、改善は大歓迎です。

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def originaljsontext = flowFile.getAttribute('original.json')
        def originaljson = slurper.parseText(originaljsontext)
        originaljson.put("point_polygon_info", obj)
        outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
4

1 に答える 1