1

JSON メッセージ ( Cloudtrail、連結された多くのオブジェクト) を取得していますが、フィルタリングが完了するまでに、Logstash はメッセージを正しく解析していないようです。ハッシュが単純に文字列にダンプされたかのようです。

とにかく、これが入力とフィルターです。

input {
  s3 {
    bucket => "stanson-ops"
    delete => false
    #snipped unimportant bits
    type => "cloudtrail"
  }
}

filter {
  if [type] == "cloudtrail" {
    json { # http://logstash.net/docs/1.4.2/filters/json
      source => "message"
    }
    ruby {
      code => "event['RecordStr'] = event['Records'].join('~~~')"
    }
    split {
      field => "RecordStr"
      terminator => "~~~"
      remove_field => [ "message", "Records" ]
    }
  }
}

完了するまでに、elasticsearch エントリにはRecordStr次のデータを含むキーが含まれます。messageフィールドもフィールドもありませんRecords

{"eventVersion"=>"1.01", "userIdentity"=>{"type"=>"IAMUser", "principalId"=>"xxx"}}

JSON スタイルではなく、解析済みであることに注意してください。(これは、concat->split が機能するために重要です)。

そのため、RecordStrキーは 1 つの値として正しくないように見えます。さらに、Kibana では、フィルタリング可能なフィールドにRecordStr(サブフィールドなし) が含まれます。もう存在しないいくつかのエントリが含まれています: Records.eventVersion, Records.userIdentity.type.

何故ですか?適切なフィールドを取得するにはどうすればよいですか?

edit 1入力の一部です。

{"Records":[{"eventVersion":"1.01","userIdentity":{"type":"IAMUser",

これは整形されていない JSON です。ファイルの本体 (上記) がmessageフィールドにあるように見え、jsonそれを抽出すると、フィールドにレコードの配列ができてしまいRecordsます。RecordStrそのため、結合して分割します。最終的には、それぞれが 1 つのエントリを持つ個別のドキュメントになります。しかし、テンプレート(?)は新しい構造を理解していないようです。

4

1 に答える 1

0

ご要望に応じて、適切な CloudTrail フィールドのインデックスを作成できる方法を考え出しました。変更された入力とフィルター構成は次のとおりです。

input {
  s3 {
    backup_add_prefix => \"processed-logs/\"
    backup_to_bucket => \"test-bucket\"
    bucket => \"test-bucket\"
    delete => true
    interval => 30
    prefix => \"AWSLogs/<account-id>/CloudTrail/\"
    type => \"cloudtrail\"
  }
}

filter {
  if [type] == \"cloudtrail\" {
    json {
      source => \"message\"
    }
    ruby {
      code => \"event.set('RecordStr', event.get('Records').join('~~~'))\"
    }
    split {
      field => \"RecordStr\"
      terminator => \"~~~\"
      remove_field => [ \"message\", \"Records\" ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"=>\", \":\"
      ]
    }
    mutate {
      gsub => [
        \"RecordStr\", \"nil\", \"null\"
      ]
    }
    json {
      skip_on_invalid_json => true
      source => \"RecordStr\"
      target => \"cloudtrail\"
    }
    mutate {
      add_tag => [\"cloudtrail\"]
      remove_field=>[\"RecordStr\", \"@version\"]
    }
    date {
      match => [\"[cloudtrail][eventTime]\",\"ISO8601\"]
    }
  }
}

ここでの重要な観察は、分割が完了すると、イベントで有効な json を所有しなくなるため、mutate の置換 ('=>' から ':' および 'nil' から 'null') を実行する必要があることです。さらに、CloudTrail eventTime からタイムスタンプを取得し、不要なフィールドのクリーンアップを行うと便利であることがわかりました。

于 2018-10-09T00:39:58.957 に答える