7

binaryPostgres のCOPY コマンドで使用される形式を記述するための Java ライブラリ (または単なるコード) を見つけた人はいますか?

非常に単純に見えますが、誰かが正しいタプル データ形式をすでに理解している場合は、そこから始めたほうがよいでしょう。

実際、すべてのデータ型のフォーマットの説明だけでも役に立ちます。

ありがとう。

4

3 に答える 3

9

PostgreSQL の Binary Copy Protocol を実装するPgBulkInsertを試すことができます。

Maven Central Repository からも入手できます。

免責事項: 私はプロジェクトの作成者です。

PostgreSQL バイナリ コピー プロトコル

私は自分のプロジェクトを宣伝するだけではなく、プロトコルについても書きたいと思っています。

まず、クラスを作成しました。このクラスPgBinaryWriterにはDataOutputStream、新しい行を開始するメソッドである Binary Protocol Header を書き込むメソッドがあります (Binary Copy Protocol では、挿入する行ごとに列数を書き込む必要があります)。 ) と、特定の Java 型を記述するためのwriteを受け取るメソッド。IValueHandler<TTargetType>

は、ストリームをフラッシュして閉じる前にストリームにa を書き込む必要があるため、PgBinaryWriteran を実装します。AutoClosable-1

IValueHandler<TTargetType>aDataOutputStreamと a の値を取ります。指定された値を PostgreSQL Binary Protocol Format で書き込む責任があります。

PgBinaryWriter

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;


import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;

public class PgBinaryWriter implements AutoCloseable {

    /** The ByteBuffer to write the output. */
    private transient DataOutputStream buffer;

    public PgBinaryWriter() {
    }

    public void open(final OutputStream out) {
        buffer = new DataOutputStream(new BufferedOutputStream(out));

        writeHeader();
    }

    private void writeHeader() {
        try {

            // 11 bytes required header
            buffer.writeBytes("PGCOPY\n\377\r\n\0");
            // 32 bit integer indicating no OID
            buffer.writeInt(0);
            // 32 bit header extension area length
            buffer.writeInt(0);

        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public void startRow(int numColumns) {
        try {
            buffer.writeShort(numColumns);
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
        handler.handle(buffer, value);
    }

    @Override
    public void close() {
        try {
            buffer.writeShort(-1);

            buffer.flush();
            buffer.close();
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }
}

ValueHandler

anIValueHandlerは単純なインターフェースで、と 値handleを取得するメソッドを持ってDataOutputStreamいます。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public interface IValueHandler<TTargetType> extends ValueHandler {

    void handle(DataOutputStream buffer, final TTargetType value);

    Type getTargetType();

}

プロトコルについて知っておくことは重要です-1。値が null の場合は a を記述する必要があります。このために、ケースを処理する抽象基本クラスを作成しました。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

    @Override
    public void handle(DataOutputStream buffer, final T value) {
        try {
            if (value == null) {
                buffer.writeInt(-1);
                return;
            }
            internalHandle(buffer, value);
        } catch (Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}

次に、さまざまな Java タイプのハンドラーを実装できます。の例を次に示しlongます。他の実装は GitHub リポジトリ ( handlers ) にあります。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public class LongValueHandler extends BaseValueHandler<Long> {

    @Override
    protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
        buffer.writeInt(8);
        buffer.writeLong(value);
    }

    @Override
    public Type getTargetType() {
        return Long.class;
    }
}

PgBinaryWriter の使用

いよいよ部品の接続に入ります。さらにいくつかの部分を抽象化したことに注意してください。コード内の実装の詳細をさらに調べる必要がある場合があります。

public abstract class PgBulkInsert<TEntity> {

    // ... 

    public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        int columnCount = columns.size();

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert all entities:                
            entities.forEach(entity -> {

                // Start a New Row:
                bw.startRow(columnCount);
                
                // Insert the Column Data:
                columns.forEach(column -> {
                    try {
                        column.getWrite().invoke(bw, entity);
                    } catch (Exception e) {
                        throw new SaveEntityFailedException(e);
                    }
                });
            });
        }
    }
    
    private String getCopyCommand()
    {
        String commaSeparatedColumns = columns.stream()
                .map(x -> x.columnName)
                .collect(Collectors.joining(", "));

        return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
                table.GetFullQualifiedTableName(),
                commaSeparatedColumns);
    }
}

PgBulkInsert

PgBulkInsert は、次の PostgreSQL データ型をサポートしています。

基本的な使い方

多数の人を PostgreSQL データベースに一括挿入する必要があると想像してください。それぞれPersonに名、姓、生年月日があります。

データベース テーブル

PostgreSQL データベースのテーブルは次のようになります。

CREATE TABLE sample.unit_test
(
    first_name text,
    last_name text,
    birth_date date
);

ドメイン モデル

アプリケーションのドメイン モデルは次のようになります。

private class Person {

    private String firstName;

    private String lastName;

    private LocalDate birthDate;

    public Person() {}

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public LocalDate getBirthDate() {
        return birthDate;
    }

    public void setBirthDate(LocalDate birthDate) {
        this.birthDate = birthDate;
    }
    
}

バルクインサーター

PgBulkInsert<Person>次に、テーブルとドメイン モデル間のマッピングを定義するを実装する必要があります。

public class PersonBulkInserter extends PgBulkInsert<Person>
{
    public PersonBulkInserter() {
        super("sample", "unit_test");

        MapString("first_name", Person::getFirstName);
        MapString("last_name", Person::getLastName);
        MapDate("birth_date", Person::getBirthDate);
    }
}

バルク インサータの使用

最後に、ユニット テストを記述100000して、データベースに Person を挿入します。単体テスト全体は GitHub: IntegrationTest.javaにあります。

@Test
public void bulkInsertPersonDataTest() throws SQLException {
    // Create a large list of Persons:
    List<Person> persons = getPersonList(100000);
    
    // Create the BulkInserter:
    PersonBulkInserter personBulkInserter = new PersonBulkInserter();
    
    // Now save all entities of a given stream:
    personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
    
    // And assert all have been written to the database:
    Assert.assertEquals(100000, getRowCount());
}

private List<Person> getPersonList(int numPersons) {
    List<Person> persons = new ArrayList<>();

    for (int pos = 0; pos < numPersons; pos++) {
        Person p = new Person();

        p.setFirstName("Philipp");
        p.setLastName("Wagner");
        p.setBirthDate(LocalDate.of(1986, 5, 12));

        persons.add(p);
    }

    return persons;
}
于 2016-02-11T14:39:31.703 に答える
2

CopyManagerJDBC ドライバーからを使用することだけを検討しましたか? それ以外の場合は、おそらく から実装を派生させることができますQueryExecutorImpl

于 2013-01-09T20:23:32.487 に答える