43

このようなものを実装するのはおそらく 10 回目ですが、思いついたソリューションに 100% 満足したことはありません。

「適切な」メッセージング システムの代わりに mysql テーブルを使用することが魅力的である主な理由は、ほとんどのアプリケーションがすでに他のものに何らかのリレーショナル データベースを使用しているからです (これは、私が行ってきたほとんどの作業で mysql を使用する傾向があります)。メッセージング システムを使用します。また、リレーショナル データベースには非常に強力な ACID プロパティがありますが、メッセージング システムにはありません。

最初のアイデアは、次を使用することです。

テーブルジョブを作成します(
  id auto_increment が null でない主キー、
  メッセージ テキストは null ではありません。
  process_id varbinary(255) null デフォルト null、
  キーjobs_key(プロセスID)
);

そして、エンキューは次のようになります。

ジョブ(メッセージ)値に挿入します(「何とか何とか」);

デキューは次のようになります。

始める;
select * process_id が null のジョブから ID 順 asc limit 1;
更新ジョブ セット process_id = ? id = ?; -- 私が得たものは何でも
専念;
-- (id、メッセージ) をアプリケーションに返し、完了後にクリーンアップします。

テーブルとエンキューは見栄えがしますが、デキューはちょっと気になります。ロールバックする可能性はどれくらいですか?それともブロックされる?O(1)風にするにはどのキーを使用すればよいですか?

または、私がやっていることよりも良い解決策はありますか?

4

8 に答える 8

29

デキューはより簡潔になる可能性があります。トランザクションのロールバックに頼るのではなく、明示的なトランザクションなしで 1 つのアトミック ステートメントで実行できます。

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

次に、ジョブをプルできます (括弧 [] は、詳細に応じてオプションを意味します):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
于 2011-09-05T03:55:05.963 に答える
8

私はいくつかのメッセージキューイングシステムを構築しましたが、あなたが言及しているメッセージのタイプはわかりませんが、デキューの場合 (それは言葉ですか?) 私はあなたがしたのと同じことをしました. あなたの方法はシンプルで、きれいで、堅実に見えます。私の仕事が最高というわけではありませんが、多くのサイトの大規模な監視に非常に効果的であることが証明されています. (エラー ログ、大量の電子メール マーケティング キャンペーン、ソーシャル ネットワーキングの通知)

私の投票: 心配いりません!

于 2009-01-08T05:26:23.870 に答える
7

Brian Aker は少し前にキュー エンジンについて話しました。SELECT table FROM DELETE構文についての話もあります。

スループットを気にしなければ、いつでもSELECT GET_LOCK()をミューテックスとして使用できます。例えば:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

そして、本当に派手にしたい場合は、ストアドプロシージャでラップしてください。

于 2009-01-08T04:29:49.410 に答える
2

これは私が使用した解決策で、現在のスレッドの process_id なしで作業するか、テーブルをロックします。

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

結果を $row 配列で取得し、次を実行します。

DELETE from jobs WHERE ID=$row['ID'];

次に、影響を受ける行を取得します (mysql_affected_rows)。影響を受ける行がある場合は、$row 配列でジョブを処理します。影響を受ける行が 0 の場合は、他のプロセスが選択したジョブを既に処理していることを意味します。行がなくなるまで、上記の手順を繰り返します。

これを、100,000 行の「ジョブ」テーブルでテストし、上記を実行する 20 の同時プロセスを生成しました。競合状態は発生しませんでした。上記のクエリを変更して、処理フラグを使用して行を更新し、実際に処理した後に行を削除できます。

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

言うまでもなく、この種の作業には適切なメッセージ キュー (ActiveMQ、RabbitMQ など) を使用する必要があります。ただし、ホストはソフトウェアの更新時に定期的に問題を起こすため、この解決策に頼らなければなりませんでした。

于 2016-04-15T17:57:06.003 に答える
1

このスレッドには、マップ可能である必要がある設計情報があります。

引用するには:

これが私が過去にうまく使ったものです:

MsgQueueテーブルスキーマ

MsgIdアイデンティティ
-NOTNULLMsgTypeCode varchar(20)-NOT NULL
SourceCode varchar(20)-メッセージの挿入プロセス--NULL可能
状態char(1)-'キューに入れられた場合はN'ew、' A'(ctive)if処理中、'完了、デフォルト' N'-NOT NULL
CreateTime datetime-デフォルトGETDATE()-NOT NULL
Msg varchar(255)-NULL可能

あなたのメッセージタイプはあなたが期待するものです-挿入するプロセスと読み取るプロセスの間の契約に準拠し、XMLまたは他の表現の選択で構造化されたメッセージ(JSONは場合によっては便利です実例)。

次に、0からnのプロセスが挿入され、0からnのプロセスがメッセージの読み取りと処理を行うことができます。通常、各読み取りプロセスは単一のメッセージタイプを処理します。プロセスタイプの複数のインスタンスを実行して、負荷分散を行うことができます。

リーダーは1つのメッセージをプルし、メッセージの処理中に状態を「A」アクティブに変更します。完了すると、状態が「完了」に変わります。監査証跡を保持するかどうかに応じて、メッセージを削除するかどうかを指定できます。State ='N'のメッセージは、MsgType / Timestampの順序でプルされるため、MsgType + State+CreateTimeにインデックスがあります。

バリエーション:
「E」エラーの状態。
Readerプロセスコードの列。
状態遷移のタイムスタンプ。

これにより、説明しているような多くのことを実行するための、優れた、スケーラブルで、目に見える、シンプルなメカニズムが提供されます。データベースの基本を理解している場合、それはかなり確実で拡張可能です。アトミック状態遷移トランザクションのため、ロックのロールバックなどの問題は発生していません。

于 2009-01-08T05:16:42.843 に答える
1

Quartz.NET の使用をお勧めします

SQL Server、Oracle、MySql、SQLite、および Firebird のプロバイダーがあります。

于 2009-01-08T03:11:31.083 に答える