346

ここで非常によくある質問は、アップサートの実行方法です。これは、MySQL が呼び出すものでINSERT ... ON DUPLICATE UPDATEあり、標準ではMERGE操作の一部としてサポートされています。

PostgreSQL がそれを直接サポートしていない場合 (pg 9.5 より前)、どうすればよいでしょうか? 次の点を考慮してください。

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

(2, 'Joe')ここで、タプル,を「アップサート」したいとします(3, 'Alan')。したがって、新しいテーブルの内容は次のようになります。

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

について議論するとき、それは人々が話していることupsertです。重要なことは、明示的なロックを使用するか、結果として生じる競合状態から防御することにより、同じテーブルで複数のトランザクションが動作している場合、どのアプローチも安全でなければならないということです。

このトピックは、 Insert、on duplicate update in PostgreSQL? で広く議論されています。、しかしそれはMySQL構文の代替に関するものであり、時間の経過とともに関連のない詳細がかなり増えています. 私は決定的な答えに取り組んでいます。

これらの手法は、「存在しない場合は挿入し、そうでない場合は何もしない」、つまり「重複キーを無視して挿入する」場合にも役立ちます。

4

6 に答える 6

478

9.5 以降:

PostgreSQL 9.5 以降のサポートINSERT ... ON CONFLICT (key) DO UPDATE(およびON CONFLICT (key) DO NOTHING)、つまり upsert。

との比較ON DUPLICATE KEY UPDATE

簡単な説明

使用方法については、マニュアルを参照してください。具体的には、構文図のconflict_action句と説明文を参照してください。

以下に示す 9.4 以前のソリューションとは異なり、この機能は複数の競合する行で機能し、排他ロックや再試行ループを必要としません。

機能を追加するコミットはここにあり、その開発に関する議論はここにあります


9.5 を使用していて、下位互換性を維持する必要がない場合は、ここで読むのをやめてください


9.4 以前:

PostgreSQL にはビルトインUPSERT(またはMERGE) 機能がなく、同時使用に直面して効率的に行うことは非常に困難です。

この記事では、この問題について有益な詳細を説明しています。

一般に、次の 2 つのオプションから選択する必要があります。

  • 再試行ループ内の個々の挿入/更新操作。また
  • テーブルをロックしてバッチマージを行う

個々の行の再試行ループ

再試行ループで個々の行のアップサートを使用することは、挿入を同時に実行しようとする多数の接続が必要な場合に適したオプションです。

PostgreSQL のドキュメントには、データベース内のループでこれを実行できる便利な手順が含まれています。ほとんどの単純なソリューションとは異なり、更新の損失や競合の挿入を防ぎます。ただし、READ COMMITTEDモードでのみ機能し、トランザクションで行う唯一のことである場合にのみ安全です。トリガーまたはセカンダリ一意キーによって一意違反が発生した場合、関数は正しく機能しません。

この戦略は非常に非効率的です。実用的な場合は常に、作業をキューに入れ、代わりに以下で説明するように一括 upsert を実行する必要があります。

この問題に対して試みられた解決策の多くは、ロールバックを考慮していないため、更新が不完全になります。2 つのトランザクションが競合します。そのうちの 1 つが成功しましたINSERT。もう1つは重複キーエラーを取得し、UPDATE代わりに実行します。のロールバックまたはコミットをUPDATE待機しているブロック。INSERTロールバックすると、UPDATE条件の再チェックはゼロ行と一致するため、UPDATEコミットしても実際には予期したアップサートを実行していません。結果の行数を確認し、必要に応じて再試行する必要があります。

試行されたソリューションの中には、SELECT レースを考慮していないものもあります。明白で単純なことを試してみると:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

次に、2 つを同時に実行すると、いくつかの障害モードがあります。1 つは、更新の再チェックに関する既に議論されている問題です。もう 1 つは、両方UPDATEが同時に、ゼロ行に一致して継続する場合です。次に、両方ともEXISTSテストを行います。これはINSERT. どちらもゼロ行を取得するため、どちらもINSERT. 1 つは重複キー エラーで失敗します。

これが、再試行ループが必要な理由です。巧妙な SQL を使用すれば、重複キー エラーや更新の消失を防ぐことができると思うかもしれませんが、実際にはできません。行数を確認するか、重複キー エラーを処理して (選択したアプローチに応じて)、再試行する必要があります。

これについて独自のソリューションを展開しないでください。メッセージ キューイングと同様に、おそらく間違っています。

ロック付き一括アップサート

古い既存のデータ セットにマージする新しいデータ セットがある場合、一括アップサートを実行したい場合があります。これは、個々の行のアップサートよりもはるかに効率的であり、実用的な場合はいつでも優先する必要があります。

この場合、通常は次のプロセスに従います。

  • CREATETEMPORARYテーブル_

  • COPYまたは、新しいデータを一時テーブルに一括挿入します

  • LOCKターゲット表IN EXCLUSIVE MODE。これにより、他のトランザクションSELECTはテーブルに変更を加えることができなくなります。

  • UPDATE ... FROM一時テーブルの値を使用して既存のレコードを実行します。

  • INSERTターゲット テーブルにまだ存在しない行を実行します。

  • COMMIT、ロックを解除します。

たとえば、質問に示されている例では、複数値を使用しINSERTて一時テーブルにデータを入力します。

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

関連資料

どうMERGEですか?

SQL 標準MERGEでは、実際には同時実行性セマンティクスの定義が不十分であり、最初にテーブルをロックせずにアップサートするのには適していません。

これは、データ マージには非常に便利な OLAP ステートメントですが、実際には同時実行セーフな upsert には有用なソリューションではありません。他のDBMS を使用MERGEして upsert に使用する人へのアドバイスはたくさんありますが、実際には間違っています。

その他のデータベース:

于 2013-06-24T02:57:02.627 に答える
33

9.5 より前のバージョンの PostgreSQL での単一挿入の問題に対する別の解決策に貢献しようとしています。アイデアは、最初に挿入を実行しようとするだけで、レコードが既に存在する場合は更新することです。

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

このソリューションは、テーブルの行が削除されていない場合にのみ適用できることに注意してください。

このソリューションの効率についてはわかりませんが、十分に合理的であるように思えます。

于 2015-06-14T13:14:43.847 に答える
0

この質問はクローズされたので、SQLAlchemy を使用して行う方法についてここに投稿します。再帰を介して、一括挿入または一括更新を再試行し、競合状態と検証エラーに対処します。

まずは輸入物

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

いくつかのヘルパー関数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

そして最後に upsert 関数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

使い方はこちら

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

これが持つ利点bulk_save_objectsは、挿入時に関係、エラーチェックなどを処理できることです (一括操作とは異なります)。

于 2017-04-26T11:30:37.220 に答える