メモリと時間の両方の観点から、大きなバイナリデータファイルを効率的に処理する方法が必要です。
私は、多次元の多くのデータポイントで構成される大きなバイナリデータファイルを処理するマルチスレッドJavaアプリケーションに取り組んでいます。データポイントのディメンションは同じで、各ポイントは約100kbです。データポイントの数はおよそ10,000から100,000です。データファイルは、テスト段階では数ギガバイトですが、将来的には数ギガバイトになる予定です。
お客様は、アプリケーションの実行中にメモリの問題に直面しているため、良好なパフォーマンスを提供しながら、処理に必要なメモリを削減するデータポイントのリストに取り組んでいます。
Javaはプロジェクトの要件であり、クライアントのシステムの現在のメモリによって制約されます。クライアントシステムには多くのコアがありますが、それは共有システムであり、メモリが現在の制限要因です。
データポイントのセットは、アプリケーションで繰り返し使用されます。ポイントは順番に処理される場合があります。それ以外の場合は、2つの異なるポイントのすべての組み合わせを含む、ポイントのサブセットが処理されます。サブセット内では、ポイントは任意の順序で処理できますが、サブセット内のポイントは任意に離れている場合があります。データファイルは、テキストデータファイルを解析して値をバイナリファイルに書き込むことによって作成する単純なバイナリファイルです。現在、データは倍精度であるため、バイナリデータファイルに一連のdoubleとしてデータポイントを連続して書き込みます。(各テキストファイルのデータポイントを解析し、すべてをメモリに保持するのではなく、すぐにバイナリファイルに書き込みます。)将来、float、intなどのデータを処理する可能性があります。
SOや他のインターネットサイトを検索しました。これまで、必要に応じてバイナリファイルからポイントを読み取るなど、いくつかのアプローチを試しましたが、すべてのデータポイントのリストをメモリに同時に保持する場合に比べてパフォーマンスが低下しました。これは、より小さな次元のより少ない数のポイントを使用するテストではうまく機能しますが、それらのテストのデータセットは、実際のデータセットよりも数桁小さくなります。私がこれまでに試したアプローチは、すべてのポイントをメモリに保持するよりも数百倍または数千倍遅くなります。
私は直接ByteBuffersとMappedByteBuffersを試しました。最善のアプローチは、以下の関連部分を抽出したクラスです。バイナリデータをMappedByteBufferの配列に読み込みます。次に、以下のget(int index)メソッドを介してデータポイントが要求されると、このメソッドは関連するバッファーをロードし、関連するバイトをバイト配列に読み取り、バイトをdouble配列に変換して、DataPointオブジェクトを作成します。データファイル全体を物理メモリに収める方法がないため、MappedByteBufferの配列を使用しました。スレッドがデータを読み込むための個別のバイト配列を持つように、バイト配列の配列を使用しました。次に、ブロッキングを最小限に抑えるために、MappedByteBufferへの実際のアクセスでのみ同期しました。私がJavaクラスライブラリを理解しているように、バッファはスレッドセーフではありません。
フィードバックは大歓迎です。特に、MappedByteBuffersの同期に興味があります。
final static private int DOUBLE_BYTE_SIZE = Double.SIZE / Byte.SIZE;
public enum DataType {
CHAR,
DOUBLE,
FLOAT,
INT,
LONG,
SHORT;
}
final static private int numberOfBuffers = 8;
private MappedByteBuffer[] buffers = null;
private int bufferSize = -1;
private byte[][] readArray = null;
private DataType dataType;
private int sizeOfVector;
private int byteSizeOfVector;
private File binFile;
private int size = -1;
private int makeList(File binaryFile, DataType argDataType, int numberOfComponents) {
FileInputStream fis = null;
FileChannel fc = null;
try {
dataType = argDataType;
sizeOfVector = numberOfComponents;
fis = new FileInputStream(binaryFile);
fc = fis.getChannel();
long fileSize = fc.size();
switch (dataType) {
case DOUBLE:
byteSizeOfVector = DOUBLE_BYTE_SIZE * sizeOfVector;
break;
default:
break;
}
size = (int) fileSize / byteSizeOfVector;
bufferSize = size / numberOfBuffers;
buffers = new MappedByteBuffer[numberOfBuffers];
long remaining = fileSize;
long position = 0;
int bufferNumber = 0;
while(remaining > 0) {
long length = Math.min(remaining, bufferSize * byteSizeOfVector);
buffers[bufferNumber] = fc.map(MapMode.READ_ONLY, position, length);
position += length;
remaining -= length;
bufferNumber++;
}
readArray = new byte[numberOfBuffers][byteSizeOfVector];
} catch (IOException ex) {
return -1;
} finally {
try {
if(fis != null) {
fis.close();
}
if(fc != null) {
fc.close();
}
} catch (IOException exClose) {
return -1;
}
}
return 0;
}
private static long makeLong(byte[] data) {
if (data == null || data.length != 8) return 0x0;
return (long)(
(long) (0xFF & data[0]) << 56 |
(long) (0xFF & data[1]) << 48 |
(long) (0xFF & data[2]) << 40 |
(long) (0xFF & data[3]) << 32 |
(long) (0xFF & data[4]) << 24 |
(long) (0xFF & data[5]) << 16 |
(long) (0xFF & data[6]) << 8 |
(long) (0xFF & data[7]) << 0
);
}
private static double makeDouble(byte[] data) {
if (data == null || data.length != 8) return 0x0;
return Double.longBitsToDouble(makeLong(data));
}
private static double[] makeDoubleArray(byte[] data) {
if (data == null) return null;
if (data.length % 8 != 0) return null;
double[] doubleArray = new double[data.length / 8];
for (int index = 0; index < dbls.length; index++) {
doubleArray[index] = makeDouble(new byte[] {
data[(index*8)],
data[(index*8)+1],
data[(index*8)+2],
data[(index*8)+3],
data[(index*8)+4],
data[(index*8)+5],
data[(index*8)+6],
data[(index*8)+7],
}
);
}
return doubleArray;
}
@Override
public DataPoint get(int index) {
if(index > size() - 1) {
throw new IndexOutOfBoundsException("Index exceeds length of list.");
} else if(index < 0) {
throw new IndexOutOfBoundsException("Index is less than zero.");
}
int bufferNumber = index / bufferSize;
int bufferPosition = index % bufferSize;
MappedByteBuffer buffer = buffers[bufferNumber];
synchronized (buffer) {
buffer.load();
buffer.position(bufferPosition * sizeOfVector);
buffer.get(readArray[bufferNumber]);
}
switch(dataType) {
case DOUBLE:
return new DoublePoint(makeDoubleArray(readArray[bufferNumber]));
default:
return null;
}
}