1

同じデータベース テーブルからの同時読み取りでの重複の回避

タスクのリストを含むテーブルがあります

Table RecordsTable  
    RecordID
    RecordName
    ...
    ...
    IsProcessed

複数のワーカー マシンがテーブルから読み取り、タスクが処理されると IsProcessed を true としてマークします。

したがって、次のコードを重複せずに機能させたい場合

C# の疑似コード

//get first 10 records that are not processed based on some other conditions
var recordSet = objectontext.recordstable.Where(...).Where(c => c.IsProcessed == false).Take(10);
//loop through the recordset in a transaction 
foreach(record singleRecord in recordSet)
{
    bool result = ProcessRecord();
    //Mark isProcessed as true 
    if(result)
        singleRecord.IsProcessed = true;
    objectContext.Savechanges();
}

レコードの重複処理を回避したい (ProcessRecords() にはメーラーなどが含まれているため)。上記のコード全体をトランザクションでラップすると、2 つの異なるワーカーからの 2 つの呼び出しが重複しないレコードになるということですか?

workerA が最初に取得したテーブルへの呼び出しを発行すると、

var recordSetWorkerA = objectontext.recordstable.Where(somecondition...).Where(c => c.IsProcessed == false).Take(10);

ワーカー A が既にトランザクションに参加している後にワーカー B が呼び出しを発行した場合、次のステートメントは実行に失敗しますか?

var recordSetWorkerB = objectontext.recordstable.Where(somecondition...).Where(c => c.IsProcessed == false).Take(10);

注目すべきパターンはありますか。

4

2 に答える 2

1

1つのオプションは、isProcessedを{ready、processing、processed}のトライステート列挙型にすることです。ActiveRecordでこれを行う方法はわかりませんが、次のようなSQLステートメントが必要です。

UPDATE RecordsTable
SET ProcessedState = 'processing'
WHERE RecordId = 1
    AND ProcessedState = 'ready';

このステートメントによって正確に1つの行が更新されたことを確認してください。行がゼロの場合、誰かがそのタスクにあなたを打ち負かします。このステートメントが、少なくとも「読み取りコミット」分離レベルで独自のトランザクションで実行されることを確認してください。

于 2012-07-28T13:01:11.300 に答える
1

コードをトランザクションにラップするだけでは不十分です。もちろん、 で例外が発生しSaveChangesますが、手遅れです。

本当に必要なのは、処理が完了しただけでなく、処理済みとしてレコードをマークすることです。2 つの解決策があります。

  1. ワーカーが同じ状態を共有している場合 (複数の同時ワーカー サービスではなく、1 つの AppDomain 内のスレッドであることを意味します)、 を使用ConcurrentDictionaryして、処理中のレコードをマークできます。

    foreach(record singleRecord in recordSet)
    {
        //RecordsInProcess is a globally-available ConcurrentDictionary<recordIdType, record
        if (!RecordsInProcess.TryAdd(singleRecord.RecordId, singleRecord))
           continue; //TryAdd will return false if such an element already exists
    
        bool result = ProcessRecord();
        //Mark isProcessed as true 
        if(result)
            singleRecord.IsProcessed = true;
        objectContext.Savechanges();
        record junk; // we don't need it
        RecordsInProcess.TryRemove(singleRecordId, out junk)
    }
    
  2. ワーカーが分離されているか、より堅牢なものが必要な場合は、レコードをデータベースで処理中としてマークし、その情報をフィルタリングに使用する必要があります。ここでトランザクションを使用する必要があり、非常に慎重に使用する必要があります。非常に簡単にデッドロックが発生するからです。並行性の観点から最も効率的なのは、常にデータベースから未処理のレコードを 1 つだけ取得し、processing何かを行う前にそれをマークしてから処理を続行することです。
于 2012-07-28T13:08:15.783 に答える