4

メールのリストを取得してpostgresデータベースに保存するインポーターがあります。テーブルレスインポータークラス内のコードスニペットは次のとおりです。

query_temporary_table = "CREATE TEMPORARY TABLE subscriber_imports (email CHARACTER VARYING(255)) ON COMMIT DROP;"
query_copy            = "COPY subscriber_imports(email) FROM STDIN WITH CSV;"
query_delete          = "DELETE FROM subscriber_imports WHERE email IN (SELECT email FROM subscribers WHERE suppressed_at IS NOT NULL OR list_id = #{list.id}) RETURNING email;"
query_insert          = "INSERT INTO subscribers(email, list_id, created_at, updated_at) SELECT email, #{list.id}, NOW(), NOW() FROM subscriber_imports RETURNING id;"

conn = ActiveRecord::Base.connection_pool.checkout
conn.transaction do
  raw = conn.raw_connection

  raw.exec(query_temporary_table)
  raw.exec(query_copy)
  CSV.read(csv.path, headers: true).each do |row|
    raw.put_copy_data row['email']+"\n" unless row.nil?
  end
  raw.put_copy_end
  while res = raw.get_result do; end # very important to do this after a copy

  result_delete = raw.exec(query_delete)
  result_insert = raw.exec(query_insert)

  ActiveRecord::Base.connection_pool.checkin(conn)
  {
    deleted:  result_delete.count,
    inserted: result_insert.count,
    updated:  0
  }
end

私が抱えている問題は、アップロードしようとすると例外が発生することです。

PG::ERROR: another command is already in progress: ROLLBACK

これはすべて1つのアクションで実行され、私が行っている他のクエリはユーザー検証のみであり、インポートの重複を防ぐDBミューテックスがあります。このクエリは、pg gemを0.13.2から0.14.1に更新することを含む最新のプッシュまでは正常に機能しました(他の「無関係な」コードとともに)。

エラーは最初はステージングサーバーで発生しましたが、その後ローカルで再現することができ、アイデアがありませんでした。

質問をより明確にする必要がある場合は、お知らせください。

ありがとう

4

1 に答える 1

12

私自身の答えを見つけました。これは、「COPY」を使用して大量のデータをインポートするときに誰かが同じ問題を見つけた場合に役立つ可能性があります

CSV.read() ブロック内で例外がスローされており、それをキャッチしましたが、プロセスを正しく終了していませんでした。

  begin
    CSV.read(csv.path, headers: true).each do |row|
      raw.put_copy_data row['email']+"\n" unless row.nil?
    end
  ensure
    raw.put_copy_end
    while res = raw.get_result do; end # very important to do this after a copy
  end

このブロックにより、COPY コマンドが確実に完了します。これも最後に追加して、インポートが成功した場合にフローを中断することなく、接続を解放してプールに戻します。

rescue
  ActiveRecord::Base.connection_pool.checkin(conn)
于 2012-09-26T18:37:25.570 に答える