4

これが間違っている場合は訂正してください。ただし、クラスター内で実行しているキュー サーバーと多数の Java ワーカーがあります。私のキューには非常に小さい作業単位がありますが、それらの多くがあります。これまでのところ、ワーカーのベンチマークとレビューでは、約 200 mb/秒の速度が得られることが示されています。

だから私は自分の帯域幅を介してより多くのワークユニットを取得する方法を見つけようとしています. 現在、CPU 使用率はそれほど高くありません (40 ~ 50%)。これは、ネットワークがデータを送信するよりも高速にデータを処理できるためです。キューを介してより多くの作業を取得したいので、高価な圧縮/圧縮解除を介して喜んで支払います (各コアの半分が現在アイドル状態であるため)。

私はJava LZOとgzipを試しましたが、もっと良いものがあるかどうか疑問に思っていました(よりCPUが高価であっても)?

更新: データは byte[] です。基本的に、キューはその形式でのみByteArrayOutputStream受け取るため、2 つの int と int[] を byte[] 形式に書き込むために使用しています。int[] の値はすべて 0 から 100 までの整数です (または 1000 ですが、ほとんどの数値はゼロです)。リストは 1000 から 10,000 項目の範囲で非常に大きくなります (繰り返しますが、過半数のゼロ.. int[] 内のゼロ以外の数値が 100 を超えることはありません)。

4

5 に答える 5

6

データの構造を利用するカスタム圧縮メカニズムを使用すると、非常に効率的になりそうです。

まず、 のshort[]代わりに (16 ビット データ型) を使用するint[]と、送信されるデータの量が半分 (!) になります。数値が-2^15(-32768) と2^15-1(32767) の間に簡単に収まるため、これを行うことができます。これは驚くほど簡単に実装できます。

第 2 に、ランレングス エンコーディングに似たスキームを使用できます。正の数はその数を文字どおりに表し、負の数は (絶対値を取った後) 多くのゼロを表します。例えば

[10, 40, 0, 0, 0, 30, 0, 100, 0, 0, 0, 0] <=> [10, 40, -3, 30, -1, 100, -4]

shortを置き換えるだけでは実装が難しくなりますintが、最悪の場合 (1000 個の数字、100 個の非ゼロ、いずれも連続していない) では最大 80% の圧縮が提供されます。

圧縮率を計算するためにいくつかのシミュレーションを行いました。上記の方法と、Louis Wasserman と sbridges によって提案された方法をテストしました。どちらも非常にうまく機能しました。

配列の長さとゼロ以外の数値の数が両方とも境界内で均一であると仮定すると、両方の方法で平均して約 5400int秒 (またはshort秒) 節約でき、元のサイズの約 2.5% の圧縮サイズになります! ランレングス エンコーディング方法は、約 1 追加int(または 0.03% 小さい平均圧縮サイズ) を節約するようです。つまり、基本的に違いはありません。したがって、実装が最も簡単なものを使用する必要があります。以下は、50000 個のランダム サンプルの圧縮率のヒストグラムです (非常によく似ています!)。

ヒストグラム

まとめ: shorts の代わりにints を使用し、圧縮方法の 1 つを使用すると、データを元のサイズの約 1% に圧縮できます。

シミュレーションには、次の R スクリプトを使用しました。

SIZE <- 50000

lengths <- sample(1000:10000, SIZE, replace=T)
nonzeros <- sample(1:100, SIZE, replace=T)

f.rle <- function(len, nonzero) {
  indexes <- sort(c(0,sample(1:len, nonzero, F)))
  steps <- diff(indexes)
  sum(steps > 1) + nonzero # one short per run of zeros, and one per zero
}

f.index <- function(len, nonzero) {
  nonzero * 2
}

# using the [value, -1 * number of zeros,...] method
rle.comprs <- mapply(f.rle, lengths, nonzeros)
print(mean(lengths - rle.comprs)) # average number of shorts saved

rle.ratios <- rle.comprs / lengths * 100
print(mean(rle.ratios)) # average compression ratio

# using the [(index, value),...] method
index.comprs <- mapply(f.index, lengths, nonzeros)
print(mean(lengths - index.comprs)) # average number of shorts saved

index.ratios <- index.comprs / lengths * 100
print(mean(index.ratios)) # average compression ratio


par(mfrow=c(2,1))
hist(rle.ratios, breaks=100, freq=F, xlab="Compression ratio (%)", main="Run length encoding")
hist(index.ratios, breaks=100, freq=F, xlab="Compression ratio (%)", main="Store indices")
于 2012-04-29T04:47:35.470 に答える
2

RLE アルゴリズムの実装を書きました。これはバイト配列で動作するため、既存のコードでインライン フィルターとして使用できます。将来データが変更された場合に備えて、大きな値または負の値を安全に処理する必要があります。

ゼロのシーケンスを {0}{qty} としてエンコードします。{qty} は 1 ~ 255 の範囲です。他のすべてのバイトは、バイト自体として格納されます。送信前にバイト配列を圧縮し、受信時にフルサイズに戻します。

public static byte[] squish(byte[] bloated) {
    int size = bloated.length;
    ByteBuffer bb = ByteBuffer.allocate(2 * size);
    bb.putInt(size);
    int zeros = 0;
    for (int i = 0; i < size; i++) {
        if (bloated[i] == 0) {
            if (++zeros == 255) {
                bb.putShort((short) zeros);
                zeros = 0;
            }
        } else {
            if (zeros > 0) {
                bb.putShort((short) zeros);
                zeros = 0;
            }
            bb.put(bloated[i]);
        }
    }
    if (zeros > 0) {
        bb.putShort((short) zeros);
        zeros = 0;
    }
    size = bb.position();
    byte[] buf = new byte[size];
    bb.rewind();
    bb.get(buf, 0, size).array();
    return buf;
}

public static byte[] bloat(byte[] squished) {
    ByteBuffer bb = ByteBuffer.wrap(squished);
    byte[] bloated = new byte[bb.getInt()];
    int pos = 0;
    while (bb.position() < bb.capacity()) {
        byte value = bb.get();
        if (value == 0) {
            bb.position(bb.position() - 1);
            pos += bb.getShort();
        } else {
            bloated[pos++] = value;
        }
    }
    return bloated;
}
于 2012-04-30T00:00:39.967 に答える
2

データを 2 つのvarintとしてエンコードしてみてください。最初の varint はシーケンス内の数値のインデックスで、2 番目は数値そのものです。0 のエントリについては、何も書き込まないでください。

于 2012-04-29T04:56:17.190 に答える
1

7z や gzip と比較して、BZIP2 には感銘を受けました。私は個人的にこの Java 実装を試したことはありませんが、GZIP 呼び出しをこれに置き換えて結果を確認するのは簡単なようです。

http://www.kohsuke.org/bzip2

于 2012-04-29T04:49:12.947 に答える
1

おそらく、データ ストリームで主要なものをすべて試して、どれが最適かを確認する必要があります。また、一部のアルゴリズムは実行に時間がかかり、キューの待ち時間が長くなることも考慮する必要があります。アプリケーションによっては、これが問題になる場合とそうでない場合があります。

データについて何か知っていれば、より良い圧縮が得られることがあります。(dbaupの答えはこのアプローチをうまくカバーしています)

この圧縮アルゴリズムの比較は役に立つかもしれません。記事から:

一般的な圧縮アルゴリズムの圧縮率

于 2012-04-29T04:50:21.500 に答える