2

メッセージの種類ごとに約 6,000 個のファイルに解析したい小さなメッセージの大きなファイル (4 ~ 5 GB 圧縮) があります。メッセージは小さいです。タイプに応じて、5 ~ 50 バイトの範囲です。

各メッセージは、固定サイズのタイプ フィールド (6 バイトのキー) で始まります。タイプ「000001」のメッセージを読み取った場合、そのペイロードを 000001.dat などに追加して書き込みます。入力ファイルにはメッセージが混在しています。各出力ファイルに特定のタイプのメッセージのみが含まれる N 個の同種の出力ファイルが必要です。

これらのメッセージを非常に多くの個々のファイルに書き込む効率的で高速な方法は何ですか? できるだけ多くのメモリと処理能力を使用して、できるだけ速く処理したいと考えています。圧縮ファイルまたは非圧縮ファイルをディスクに書き込むことができます。

メッセージタイプのキーと出力ストリームの値を持つハッシュマップを使用することを考えていますが、もっと良い方法があると確信しています。

ありがとう!

4

6 に答える 6

4

通常、Unix ライクなシステムでは、一度に開くファイル ハンドルの数に制限があります。たとえば、私の Linux では、現在 1024 になっていますが、理由があれば変更できます。ただし、開いているファイルはシステムにとって負担になるため、これらの制限には十分な理由があります。

入力に同じキーが複数回出現するかどうかについての私の質問にはまだ回答していません。つまり、データのいくつかの個別のバッチを各ファイルに連結する必要がある場合があります。そうでない場合は、すべての作業を行う必要があり、このような単純な一連のイベントの周りに大規模な管理を設定する意味がないため、Pace の回答が手軽に実行できる最善の方法です。

ただし、入力に同じキーの複数のメッセージがある場合は、多数のファイルを開いたままにしておくと効率的です。ただし、一度に 6000 個すべてを開いたままにしないことをお勧めします。代わりに、先着順で開かれる500のようなものを選びます。つまり、最初の 500 個 (またはそれくらい) の個別のメッセージ キー用にファイルを開き、入力ファイル全体を調べて、これらの 500 個に追加するものを探し、入力で EOF を押すとそれらをすべて閉じます。また、すでに処理済みのキーを保持する必要がありHashSetます。これは、入力ファイルを再度読み込んで、最初のラウンドでキャッチできなかった 500 個のキーの次のバッチを処理するためです。

根拠:ファイルのオープンとクローズは (通常) コストのかかる操作です。できれば、何千ものファイルを一度以上開いたり閉じたりしたくありません。そのため、できるだけ多くのハンドルを開いたままにしておくと、すべてのハンドルが入力の 1 回のパスで満たされます。一方、単一の入力ファイルを介して順次ストリーミングすることは非常に効率的であり、入力ファイルを 12 回パスする必要がある場合でも、6000 個の他のファイルを開いたり閉じたりするのに必要な時間と比較すると、その時間はほとんど無視できます。ファイル。

擬似コード:

processedSet = [ ]
keysWaiting = true
MAXFILE = 500
handlesMap = [ ]
while (keysWaiting) {
  keysWaiting = false
  open/rewind input file
  while (not EOF(input file)) {
    read message
    if (handlesMap.containsKey(messageKey)) {
       write data to handlesMap.get(messageKey)
    } else if (processedSet.contains(messageKey) {
       continue // already processed
    } else if (handlesMap.size < MAXFILE) {
       handlesMap.put(messageKey, new FileOutputStream(messageKey + ".dat")
       processedSet.add(messageKey)
       write data to handlesMap.get(messageKey)
    else
       keysWaiting = true
    endif
  }
  for all handlesMap.values() {
     close file handle
  }
  handlesMap.clear
}
于 2009-12-30T20:49:16.533 に答える
4

ハッシュ マップは必要ないかもしれません。あなたはただ...

  1. メッセージを読む
  2. 新しいファイルを追加モードで開く
  3. メッセージを新しいファイルに書き込む
  4. 新しいファイルを閉じる

ただし、多くの開閉を行うため、これが高速になるかどうかはわかりません。

于 2009-12-30T20:28:52.040 に答える
2

ある種のインテリジェントなプーリングをお勧めします。パフォーマンスを向上させるために最大/最も頻繁に使用されるファイルを開いたままにし、残りを閉じてリソースを節約します。

メインファイルの大部分がレコードタイプ1〜5で構成されている場合は、必要な限りそれらのファイルを開いたままにします。他のものは、リソースのシステムを枯渇させないように、必要に応じて開閉できます。

于 2009-12-30T20:35:04.273 に答える
1

私はあなたの質問についていくつかの仮定をするつもりです:

  • 各メッセージは、固定サイズのフィールドとして、メッセージ タイプで始まります
  • メッセージの混合を含む異種入力ファイルがあります。各出力ファイルには特定のタイプのメッセージのみが含まれる N 個の同種の出力ファイルが必要です。

頭に浮かぶアプローチはファンクター ベースです。特定のメッセージを処理するオブジェクトへのメッセージ型のマッピングを作成します。main() は、固定メッセージ ヘッダーを読み取り、マップから適切なファンクターを見つけて呼び出すディスパッチ ループです。

一度に 6,000 個のファイル (メッセージの種類ごとに 1 つ) を開いておくことはおそらく不可能です。ほとんどのオペレーティング システムでは、同時に開くことができるファイル数は約 1,024 に制限されています (ただし、Linux では、これを制御するカーネル パラメータを変更できます)。したがって、これは、ファイルを繰り返し開いたり閉じたりすることを意味します。

おそらく最良の方法は、すべてのファンクタに固定カウント バッファを設定して、たとえば 10 メッセージ後に開き、書き込み、閉じるようにすることです。メッセージが最大 50 バイトの場合、500 バイト (10 x 50) x 6,000 がいつでもメモリに残ります。

おそらく、固定サイズのバイト配列を保持するファンクターを作成し、その配列に一度に N バイトを読み取る汎用ファンクター クラスを作成します。

public class MessageProcessor
{
    int _msgSize;                   // the number of bytes to read per message
    byte[] _buf = new byte[1024];   // bigger than I said, but it's only 6 Mb total
    int _curSize;                   // when this approaches _buf.length, write
于 2009-12-30T20:49:54.677 に答える
0

通常、システムで開いているファイルには制限があり、いずれにせよ、多かれ少なかれランダムな順序で何千もの小さなファイルにアクセスすると、システムが非常に遅くなります。

大きなファイルを個々のメッセージのファイル (または、メモリがある場合はある種のメモリ内テーブル) に分割し、それをメッセージ タイプ別に並べ替えることを検討してください。それが完了したら、メッセージを適切なファイルに書き出します。

于 2009-12-30T20:53:50.103 に答える
0

多くのファイルに対して多くの小さな書き込みを行っているため、特に最も単純な設計では、新しい書き込みごとに新しいファイルのオープン/クローズがほぼ確実に行われることを考えると、書き込みの数を最小限に抑えたいと考えています。

代わりに、各キーをバッファーにマップしてみませんか? 最後に、各バッファをディスクに書き込みます。または、保持するメモリが多すぎることが懸念される場合は、1K、5K、または任意の行ごとに書き込むようにバッファーを構成できます。例えば

public class HashLogger {

          private HashMap<String,MessageBuffer> logs;

          public void write(String messageKey, String message)
          {
              if (!logs.contains(messageKey)) { logs.put(messageKey, new MessageBuffer(messageKey); }
              logs.get(messageKey).write(message);
          }

         public void flush()
         {
             for (MessageBuffer buffer: logs.values())
             {
                buffer.flush();
             }
            // ...flush all the buffers when you're done...
         }

    private class MessageBuffer {
             private MessageBuffer(String name){ ... }
             void flush(){ .. something here to write to a file specified by name ... }
             void write(String message){
             //... something here to add to internal buffer, or StringBuilder, or whatever... 
             //... you could also have something here that flushes if the internal builder gets larger than N lines ...
     }
}

バッファリングされたロギングを使用するように構成できる個別の Log4j ロガーを作成することもできます。slf4j のような最新のロギング フレームワークもこれをサポートしていない場合は驚くでしょう。

于 2009-12-30T20:38:04.093 に答える