私は新しいトリックを学ぼうとしている老犬です。私は PowerBuilder という言語に非常に精通しており、その言語では、非同期で処理を行う場合、新しいスレッドでオブジェクトを生成します。繰り返しますが、オブジェクト全体が別のスレッドでインスタンス化され、別の実行コンテキストを持ちます。そのオブジェクトのすべてのメソッドは、その別のスレッドのコンテキストで実行されます。
さて、私は C# を使用していくつかの非同期実行を実装しようとしていますが、.NET のスレッド モデルは私にはまったく異なっているように感じます。オブジェクトを 1 つのスレッドでインスタンス化しているように見えますが、特定のメソッドを別のスレッドで実行するように (呼び出しごとに) 指定できます。
違いは微妙なようですが、私はイライラします。私の昔ながらの考え方は、「ボブという名前のヘルパーがいます。ボブは出かけて何かをします」と言います。新しい考え方は、私が正しく理解しているなら、「私はボブです。必要に応じて、お腹をさすったり、同時に頭を撫でたりすることもできます」です。
私の実際のコーディングの問題: TCP 経由でメッセージを受け取り、それらを解析して使用可能なデータにし、そのデータをデータベースに入れるインターフェイス エンジンを作成しています。メッセージの「解析」には約 1 秒かかります。解析されたデータに応じて、データベース操作にかかる時間は 1 秒未満の場合もあれば、10 秒かかる場合もあります。(すべての時間は、問題を明確にするために構成されています。)
私の昔ながらの考え方では、データベース クラスは別のスレッドに存在し、ConcurrentQueue
. そのキューでスピンし、そこにある可能性のあるものをすべて処理します。一方、パーサーはメッセージをそのキューにプッシュする必要があります。これらのメッセージは、(デリゲート?) 「のデータに基づいて注文を作成するthis object
」または「のデータに基づいて注文を更新する」のようなものですthis object
。「キュー」内の「メッセージ」を厳密なシングルスレッド FIFO 順序で実際に処理したいことに注意してください。
基本的に、データベース接続が常にパーサーに追いつくとは限りません。データベース プロセスが追いつこうとしている間、パーサーが遅くならないようにする方法が必要です。アドバイス?
-- 編集: コードで! 誰もが を使うように言っていますBlockingCollection
。ここでは、最終目標とそれに付随するコードの簡単な説明を示します。
これは Windows サービスになります。起動すると、複数の「環境」が生成され、各「環境」には 1 つの「dbworker」と 1 つの「インターフェース」が含まれます。「インターフェース」には、1 つの「パーサー」と 1 つの「リスナー」があります。
class cEnvironment {
private cDBWorker MyDatabase;
private cInterface MyInterface;
public void OnStart () {
MyDatabase = new cDBWorker ();
MyInterface = new cInterface ();
MyInterface.OrderReceived += this.InterfaceOrderReceivedEventHandler;
MyDatabase.OnStart ();
MyInterface.OnStart ();
}
public void OnStop () {
MyInterface.OnStop ();
MyDatabase.OnStop ();
MyInterface.OrderReceived -= this.InterfaceOrderReceivedEventHandler;
}
void InterfaceOrderReceivedEventHandler (object sender, OrderReceivedEventArgs e) {
MyDatabase.OrderQueue.Add (e.Order);
}
}
class cDBWorker {
public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> ();
private Task ProcessingTask;
public void OnStart () {
ProcessingTask = Task.Factory.StartNew (() => Process (), TaskCreationOptions.LongRunning);
}
public void OnStop () {
OrderQueue.CompleteAdding ();
ProcessingTask.Wait ();
}
public void Process () {
foreach (cOrder Order in OrderQueue.GetConsumingEnumerable ()) {
switch (Order.OrderType) {
case 1:
SuperFastMethod (Order);
break;
case 2:
ReallySlowMethod (Order);
break;
}
}
}
public void SuperFastMethod (cOrder Order) {
}
public void ReallySlowMethod (cOrder Order) {
}
}
class cInterface {
protected cListener MyListener;
protected cParser MyParser;
public void OnStart () {
MyListener = new cListener ();
MyParser = new cParser ();
MyListener.DataReceived += this.ListenerDataReceivedHandler;
MyListener.OnStart ();
}
public void OnStop () {
MyListener.OnStop ();
MyListener.DataReceived -= this.ListenerDataReceivedHandler;
}
public event OrderReceivedEventHandler OrderReceived;
protected virtual void OnOrderReceived (OrderReceivedEventArgs e) {
if (OrderReceived != null)
OrderReceived (this, e);
}
void ListenerDataReceivedHandler (object sender, DataReceivedEventArgs e) {
foreach (string Message in MyParser.GetMessages (e.RawData)) {
OnOrderReceived (new OrderReceivedEventArgs (MyParser.ParseMessage (Message)));
}
}
コンパイルします。(SHIP IT!) でも、それは私が正しくやっているということですか?