2

複数のログ ファイルを並列解析するために 2 プロセス プールを使用しています。

po = Pool(processes=2)
pool_object = po.apply_async(log_parse, (hostgroup_sender_dir, hostname, host_depot_dir,        synced_log, prev_last_pos, get_report_rate), )

(curr_last_pos, remote_report_datetime, report_gen_rate) = pool_object.get()

ただし、最初の実行ではかなり遅く、約 12 個の ~20Mb ファイルで ~16 分です。

ログの新しいバイトを 2 ~ 3 分ごとに解析することを考えると、次の反復では大きな問題はありませんが、最初の実行での方法には改善の余地があることは確かです。ログをいくつかの小さいサイズのスプライスに事前に分割すると (pyparse がログ全体をオンスでメモリに割り当てる必要がないように) 高速化されますか?

まだデュアル コアの開発用 VM で実行していますが、すぐにクアッド コアの物理サーバーに移行する必要があり (追加のクアッド コア CPU を取得しようとします)、最大 50 を管理できる必要がある場合があります。ログ。

ログからのスプライス、

log_splice = """
# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003    Type:  Periodic     #
# Report number 1790                                        State: Active      #
################################################################################
# Running since                  : 2011-08-12 04:40:06.153                     #
# Total execution time           :  18 day(s) 15:19:53.850                     #
# Last report date               : 2011-08-30 19:45:00.002                     #
# Time since last periodic report:   0 day(s) 00:15:00.000                     #
################################################################################
                            ----------------------------------------------------
                            |       Periodic        |          Global          |
----------------------------|-----------------------|--------------------------|
Simultaneous Accesses       |  Curr  Max Cumulative |      Max    Cumulative   |
--------------------------- |  ---- ---- ---------- |     ---- -------------   |
Accesses                    |     1    5          - |      180             -   |
- in start/stop state       |     1    5      12736 |      180      16314223   |
-------------------------------------------------------------------------------|
Accesses per Second         |    Max   Occurr. Date |      Max Occurrence Date |
--------------------------- | ------ -------------- |   ------ --------------- |
Accesses per second         |  21.00 08-30 19:52:33 |    40.04  08-16 20:19:18 |
-------------------------------------------------------------------------------|
Service Statistics          |  Success    Total  %  |   Success      Total  %  |
--------------------------- | -------- -------- --- | --------- ---------- --- |
Services accepted accesses  |    17926    17927  99 |  21635954   21637230 -98 |
- 98: NF                    |     7546     7546 100 |  10992492   10992492 100 |
- 99: XFC                   |    10380    10380 100 |  10643462   10643462 100 |
 ----------------------------------------------------------------------------- |
Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |
- 98: NF                    |     7547     7547 100 |  10991401   10992492  99 |
- 99: XFC                   |     5189     5189 100 |   5320165    5321730  99 |
 ----------------------------------------------------------------------------- |
""" 

pyparseを使用して、

unparsed_log_data = input_log.read()

#------------------------------------------------------------------------
# Define Grammars
#------------------------------------------------------------------------
integer = Word( nums )

# XX_MAIN     ( 4801) Report at 2010-01-25 06:55:00
binary_name = "# XX_MAIN"
pid = "(" + Word(nums) + ")"
report_id = Suppress(binary_name) + Suppress(pid)

# Word as a contiguous set of characters found in the string nums
year = Word(nums, max=4)
month = Word(nums, max=2)
day = Word(nums, max=2)
# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + Word(nums, max=2) + ":" + Word(nums,     max=2) + Suppress("."))
timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)("timestamp")

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf

# Service Statistics          |  Success    Total  %  | 
# Services succ. terminations |       40       40 100 |   3494775    3497059  99 |
partial_report_ignore = Suppress(SkipTo("Services succ. terminations", include=True))
succ_term_bnf = Suppress("|") + integer("succTerms") + integer("totalTerms")
terminations_report_bnf = report_bnf + partial_report_ignore + succ_term_bnf

# Apply the BNF to the unparsed data
terms_parsing = terminations_report_bnf.searchString(unparsed_log_data)
4

1 に答える 1

3

単一のログ エントリを解析するパーサーを構築します。これにより、次の 2 つのことが達成されます。

  1. 問題を簡単に並列化できるチャンクに分割します
  2. ログ データの最初のバックログが処理された後、増分ログ処理を処理するようにパーサーを配置します。

並列化されたチャンク サイズは適切にパッケージ化された単一のアイテムであり、各プロセスはアイテムを個別に解析できます (ログ メッセージ間で状態や経過時間の情報を繰り越す必要がない場合)。

編集(この質問は、pyparsing チューニングに関するトピックに変わりました...)

Combine(lots+of+expressions+here)pyparsing Regex 式を使用して構築された低レベルのプリミティブを定義する方が良いことがわかりました。これは通常、次のような実数やタイムスタンプなどの式に適用されます。

# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
yearly_day_bnf = Regex(r"\d{4}-\d{2}-\d{2}")

# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + Suppress("."))
clock24h_bnf = Regex(r"\d{2}:\d{2}:\d{2}\.")
clock24h_bnf.setParseAction(lambda tokens:tokens[0][:-1])

timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)
timestamp_bnf = Regex(r"\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}:\d{2}")

ただし、無理をする必要はありません。のようなものinteger=Word(nums)は、すでにカバーの下で RE を生成しています。

また、timestamp_bnf から結果名を削除したことに注意してください。私は通常、プリミティブ定義から結果名を除外し、それらをより高いレベルの式にアセンブルするときに追加します。これにより、次のように、同じプリミティブを異なる名前で複数回使用できます。

summary = ("Started:" + timestamp_bnf("startTime") + 
           "Ended:" + timestamp_bnf("endTime"))

これは、解析された構造を整理するのにも役立ちます。

結果の名前を上位の式に移動すると、フィールドによりわかりやすい名前を付けることができます。

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf("reportTime")

文法を見ると、このレポート情報のすべてを解読しているわけではなく、次の行からレポート時間を抽出しているだけです。

# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003

この行からの 2 つの整数フィールド:

Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |

代わりにこれを試してください:

report_bnf = report_id + Suppress("Report at") + timestamp_bnf("reportTime")
succ_term_bnf = (Suppress("Services succ. terminations") + Suppress("|") + 
                        integer("succTerms") + integer("totalTerms"))
log_data_sources_bnf = report_bnf | succ_term_bnf

extractLogData = lambda logentry : sum(log_data_sources_bnf.searchString(logentry))

print extractLogData(log_slice).dump()

Pyparsing は常に RE よりも遅くなります。また、あなたの場合の pyparsing パーサーはプロトタイピングの足がかりにすぎない可能性があります。pyparsing パーサーで 500 倍のパフォーマンスを得ることはできないと確信しています。RE ベースのソリューションを使用して Mb 相当のログ ファイルを処理する必要があるかもしれません。

于 2011-08-29T23:58:12.267 に答える