2

Python コードから単純なストリーム データを作成し、ログ ファイルに追加して、Kinesis Agent->Kinesis Firehose を使用してストリーミング データを Redshift クラスターにロードしようとしています。

Python コードが正常に動作していることを確認し、ログ ファイルに追加されるストリーミング データを作成します。キネシス エージェントが正常に開始され、検証が行われ、ログ ファイルからデータを取得して Firehose ストリームにプッシュするための適切な構成も行われました。

Python ファイルは、緯度/経度の位置をランダムに記録する単純なコードです。これは、サンプルのファイアホース ストリーミングを行うためだけのものであるため、サンプル データを使用します。

latitude = 19.99
longitude = 73.78
file_n = '/tmp/random_lat_lon.log'

def generate_random_data(lat, lon, num_rows, file_name):
    with open(file_name, 'w+', 1) as output:
#        for _ in xrange(num_rows):
         while True:
            hex1 = '%012x' % random.randrange(16**12)
            flt = float(random.randint(0,100))
            dec_lat = random.random()/100
            dec_lon = random.random()/100
            output.write('%s,%.1f,%.6f,%.6f \n' % (hex1.lower(), flt, lon+dec_lon, lat+dec_lat))
            time.sleep(5)

generate_random_data(latitude, longitude, 5, file_n)

random_lat_lon.log ファイルの出力:

> 83d6c9f7a0be,25.0,73.782042,19.997504
> 18b69c5c5248,25.0,73.788921,19.995153
> 6a0d182996f0,91.0,73.783399,19.998097
> 431ba9e4f38e,0.0,73.781139,19.995481

kinesis-Agent を確認すると、機能していないことがわかり、次のエラー トレースが表示されます。

(FileTailer[kinesis:python-stream:/tmp/random_lat_lon.log*]) com.amazon.kinesis.streaming.agent.tailing.FileTailer [ERROR] FileTailer[kinesis:python-stream:/tmp/random_lat_lon.log*]: Error when processing current input file or when tracking its status.
java.lang.IllegalStateException
        at com.google.common.base.Preconditions.checkState(Preconditions.java:158)
        at com.amazon.kinesis.streaming.agent.tailing.TrackedFileRotationAnalyzer.findCurrentOpenFileAfterTruncate(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.SourceFileTracker.updateCurrentFile(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.SourceFileTracker.refresh(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.FileTailer.updateRecordParser(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.FileTailer.processRecords(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.FileTailer.runOnce(Unknown Source)
        at com.amazon.kinesis.streaming.agent.tailing.FileTailer.run(Unknown Source)
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
        at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
        at java.lang.Thread.run(Thread.java:748)

私の kinesis-Agent.json 構成は次のとおりです。

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "https://kinesis.us-east-1.amazonaws.com",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/tmp/random_lat_lon.log*",
      "kinesisStream": "python-stream"
    }
  ]
}

これは、Kinesis Firehose (Python を使用) を使用した最初のサンプル ラボ体験です。私が理解できなかった何かが欠けています。

誰かが提案を手伝ってくれますか。詳細が必要な場合はお知らせください。

よろしく

4

1 に答える 1