31

MySQLさまざまな InnoDB テーブルで毎秒約 600 行の挿入を処理する必要があるデータベースを設計しています。現在の実装では、バッチ化されていないプリペアド ステートメントを使用しています。ただし、MySQLデータベースへの書き込みがボトルネックになり、時間の経過とともにキューのサイズが増加します。

実装はJavaで書かれており、バージョンはわかりません。MySQLJava コネクタを使用します。JDBC明日への切り替えを検討する必要があります。これらは 2 つの異なるコネクタ パッケージであると想定しています。

この問題に関する次のスレッドを読みました。

そしてmysqlサイトから:

私の質問は次のとおりです。

  • INSERTバッチモードで準備されたステートメントでINSERTを使用する場合と、複数の値を持つ単一のステートメントを使用する場合のパフォーマンスの違いについて、アドバイスや経験がある人はいますか?

  • MySQLJava コネクタと のパフォーマンスの違いは何ですかJDBC。どちらかを使用する必要がありますか?

  • テーブルはアーカイブ用であり、最大 90% の書き込みから最大 10% の読み取り (おそらくそれ以下) が見られます。私はInnoDBを使用しています。これは MyISAM よりも正しい選択ですか?

よろしくお願いいたします。

4

4 に答える 4

40

JDBC は、標準インターフェースを提供するデータベース アクセスの Java SE 標準にすぎないため、特定の JDBC 実装に縛られることはありません。MySQL Java コネクター (Connector/J) は、MySQL データベース専用の JDBC インターフェースの実装です。経験上、私は MySQL を使用して膨大な量のデータを使用するプロジェクトに携わっています。私たちは、生成できるデータに MyISAM を好んで使用しています。これにより、はるかに高いパフォーマンスの損失トランザクションを達成できますが、一般的に言えば、MyISAM の方が高速です。しかし、InnoDB の方が信頼性が高くなります。

INSERT ステートメントのパフォーマンスも約 1 年前に疑問に思っていたところ、コード シェルフに次の古いテスト コードが見つかりました (申し訳ありませんが、少し複雑で、質問の範囲外です)。以下のコードには、テスト データを挿入する 4 つの方法の例が含まれています。

  • 単一 INSERTの;
  • バッチ化され INSERTた s;
  • 手動バルク INSERT(絶対に使用しないでください - 危険です);
  • そして最終的にバルクを準備し INSERTました)。

ランナーとしてTestNGを使用し、次のようなカスタム コードのレガシーを使用します。

  • runWithConnection()メソッド - コールバックが実行された後、接続が閉じられるか、接続プールに戻されることを保証します (ただし、以下のコードは、ステートメントを閉じるという信頼できない戦略を使用しています - コードを減らすために / がなくてもtry) finally;
  • IUnsafeIn<T, E extends Throwable>- 単一のパラメーターを受け入れるが、タイプ E の例外をスローする可能性があるメソッドのカスタム コールバック インターフェイス: void handle(T argument) throws E;.
package test;

import test.IUnsafeIn;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;

import core.SqlBaseTest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public final class InsertVsBatchInsertTest extends SqlBaseTest {

    private static final int ITERATION_COUNT = 3000;

    private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB";
    private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1";
    private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1";

    private static void withinTimer(String name, Runnable runnable) {
        final long start = currentTimeMillis();
        runnable.run();
        logStdOutF("%20s: %d ms", name, currentTimeMillis() - start);
    }

    @BeforeSuite
    public void createTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @AfterSuite
    public void dropTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @BeforeTest
    public void clearTestTable() {
        runWithConnection(new IUnsafeIn<Connection, SQLException>() {
            @Override
            public void handle(Connection connection) throws SQLException {
                final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY);
                statement.execute();
                statement.close();
            }
        });
    }

    @Test
    public void run1SingleInserts() {
        withinTimer("Single inserts", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.execute();
                            statement.close();
                        }
                    }
                });
            }
        });
    }

    @Test
    public void run2BatchInsert() {
        withinTimer("Batch insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            statement.setInt(1, i);
                            statement.setFloat(2, i);
                            statement.setString(3, valueOf(i));
                            statement.addBatch();
                        }
                        statement.executeBatch();
                        statement.close();
                    }
                });
            }
        });
    }

    @Test
    public void run3DirtyBulkInsert() {
        withinTimer("Dirty bulk insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append(format("(%s, %s, '%s')", i, i, i));
                        }
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        statement.execute();
                        statement.close();
                    }
                });
            }
        });
    }

    @Test
    public void run4SafeBulkInsert() {
        withinTimer("Safe bulk insert", new Runnable() {
            @Override
            public void run() {
                runWithConnection(new IUnsafeIn<Connection, SQLException>() {
                    private String getInsertPlaceholders(int placeholderCount) {
                        final StringBuilder builder = new StringBuilder("(");
                        for ( int i = 0; i < placeholderCount; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append("?");
                        }
                        return builder.append(")").toString();
                    }

                    @SuppressWarnings("AssignmentToForLoopParameter")
                    @Override
                    public void handle(Connection connection) throws SQLException {
                        final int columnCount = 3;
                        final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
                        final String placeholders = getInsertPlaceholders(columnCount);
                        for ( int i = 0; i < ITERATION_COUNT; i++ ) {
                            if ( i != 0 ) {
                                builder.append(",");
                            }
                            builder.append(placeholders);
                        }
                        final int maxParameterIndex = ITERATION_COUNT * columnCount;
                        final String query = builder.toString();
                        final PreparedStatement statement = connection.prepareStatement(query);
                        int valueIndex = 0;
                        for ( int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++ ) {
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                            statement.setObject(parameterIndex++, valueIndex);
                        }
                        statement.execute();
                        statement.close();
                    }
                });
            }
        });
    }

}

@Test アノテーションが付けられたメソッドを見てください。これらは実際にINSERTステートメントを実行します。また、定数を見てくださいCREATE_TABLE_QUERY。ソース コードでは InnoDB を使用しており、MySQL 5.5 がインストールされているマシン (MySQL Connector/J 5.1.12) で次の結果が生成されます。

InnoDB
Single inserts: 74148 ms
Batch insert: 84370 ms
Dirty bulk insert: 178 ms
Safe bulk insert: 118 ms

InnoDB を MyISAM に変更すると、CREATE_TABLE_QUERYパフォーマンスが大幅に向上します。

MyISAM
Single inserts: 604 ms
Batch insert: 447 ms
Dirty bulk insert: 63 ms
Safe bulk insert: 26 ms

お役に立てれば。

更新:

max_allowed_packet4 番目の方法では、 in mysql.ini(セクション) を適切にカスタマイズして、[mysqld]非常に大きなパケットをサポートするのに十分な大きさにする必要があります。

于 2012-07-09T07:00:27.043 に答える
1

影響を受けるテーブルにトリガーはありますか? そうでない場合、1 秒あたり 600 回の挿入は多くないように見えます。

JDBC からのバッチ挿入機能は、同じトランザクションで同じステートメントを複数回発行しますが、複数値 SQL は単一のステートメントですべての値を絞り込みます。複数値ステートメントの場合、insert SQL を動的に構築する必要があります。これは、コードの増加、メモリの増加、SQL インジェクション保護メカニズムなどの点でオーバーヘッドになる可能性があります。最初に通常のバッチ機能を試してください。問題にならないはずです。

バッチでデータを受信しない場合は、挿入する前にバッチ処理することを検討してください。Producer-Consumer の配置を実装するために、別のスレッドで Queue を使用します。これにより、特定の時間が経過するか、キューのサイズがしきい値を超えるまで挿入が保留されます。

挿入が成功したことをプロデューサーに通知したい場合は、さらに配管が必要です。

スレッドでブロックするだけの方が簡単で実用的な場合があります。

if(System.currentTimeMills()-lastInsertTime>TIME_THRESHOLD || queue.size()>SIZE_THRESHOLD) {
    lastInsertTime=System.currentTimeMills();
    // Insert logic
    } else {
    // Do nothing OR sleep for some time OR retry after some time. 
    }
于 2012-07-09T06:50:43.030 に答える