6

私が取り組んでいるアプリケーションは、データを複数のテーブルにロードする Java ベースの ETL プロセスです。DBMS は Infobright (データ ウェアハウス向けの MYSQL ベースの DBMS) です。

データのロードはアトミックに行う必要があります。LOAD DATA INFILEただし、パフォーマンス上の理由から、(コマンドを使用して) 同時に複数のテーブルにデータをロードしたいと考えています。これは、複数の接続を開く必要があることを意味します。

ロードをアトミ​​ックかつ並列に実行できるソリューションはありますか? (答えは、ロードするテーブルのエンジンに依存する可能性があると思います。それらのほとんどは、トランザクションを許可する Brighthouse ですが、XA とセーブポイントは許可しません)。

さらに明確にするために、次のような状況は避けたいと思います。

  • 5 つのテーブルにデータをロードします
  • 最初の 4 つのテーブルのロードをコミットします
  • 5 番目のテーブルのコミットが失敗する

この状況では、最初の 4 つのロードは既にコミットされているため、ロールバックできません。

4

2 に答える 2

5

イントロ

私が約束したように、私は完全な例をハックしました。MySQLを使用して、次のような3つのテーブルを作成しました。

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2最初は単一の行が含まれます。

INSERT INTO `test2` (`data`) VALUES ('a');

私は完全なコードをhttp://pastebin.comに投稿しました。)

次の例はいくつかのことを行います。

  1. threads並行して実行されるジョブの数を3決定するセット。
  2. 接続数を作成しますthreads
  3. すべてのテーブルのサンプルデータを吐き出します(デフォルトでは、データはaすべてのテーブルのものです)。
  4. threads実行するジョブの数を作成し、それらにデータをロードします。
  5. スレッドthreadsの数でジョブを実行し、それらの完了を待ちます(成功したかどうか)。
  6. 例外が発生しなかった場合、すべての接続をコミットします。それ以外の場合は、それぞれをロールバックします。
  7. 接続を閉じます(ただし、これらは再利用できます)。

(でJava 7の自動リソース管理機能を使用したことに注意してくださいSQLTask.call()。)

論理

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

データ

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

タスク

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

走る

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

結果

によって返されるデフォルトのデータを考えるとgetTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

また、2番目のジョブがtest2すでに含まれているa(そしてdata列が一意である)という事実は失敗し、例外をスローするため、すべての接続がロールバックされます。

sの代わりにasを返すbと、接続は安全にコミットされます。

これは、と同様に行うことができますLOAD DATA


私の答えに対するOPの回答の後、私は彼女/彼がやりたいことを単純で明確な方法で行うことは不可能であることに気づきました。

基本的に問題は、コミットが成功した後、操作がアトミックであるため、コミットされたデータをロールバックできないことです。与えられたケースで複数のコミットが必要な場合、(すべてのトランザクションの)すべてのデータを追跡しない限り、すべてをロールバックすることはできません何かが発生した場合、正常にコミットされたすべてのデータが削除されます。

コミットとロールバックの問題に関連する良い答えがあります。

于 2011-12-07T17:08:47.403 に答える
0

実際、ICE ではなく新しいバージョンの IEE には、DLP (Distributed Load Processing) と呼ばれる追加機能があります。サイトに PDF ファイルがあり、ここからリンクされています。

http://www.infobright.com/Products/Features/

于 2012-02-16T20:39:05.273 に答える