2

シナリオは次のとおりです。優先度の高いスレッドによって中断される可能性のある優先度の低いスレッドがいくつかあります。優先度の高いスレッドが優先度の低いスレッドに一時停止を要求するたびに、それらはWait状態になります(まだ待機状態になっていない場合)。ただし、優先度の高いスレッドが優先度の低いスレッドが可能Resumeであることを通知する場合、優先度の低いスレッドは、優先度の低いスレッドに一時停止を要求したすべての優先度の高いスレッドが同意するまで再開しないでください。

Pause()これを解決するために、優先度の高いスレッドから優先度の低いスレッドへの呼び出しをカウンター変数で追跡しています。優先度の高いスレッドが優先度の低いスレッドにを要求するたびにPause()、カウンターの値が1ずつインクリメントされます。インクリメント後にカウンターの値が。の場合1は、スレッドがになかったことを意味するWaitので、状態に移行するように依頼しWaitます。それ以外の場合は、値をインクリメントしcounterます。逆に、優先度の高いスレッドが呼び出されると、値がデクリメントされます。デクリメント後に値がである場合は、優先度の低いスレッドが値をデクリメントできることを意味しResume()ます。counter0Resume

これが私の問題の単純化された実装です。ifステートメント内の比較操作Interlocked.XXXが正しくありません。

if(Interlocked.Increment(ref _remain)== 1)

、読み取り/変更および比較操作はアトミックではないため。

ここで何が欠けていますか?スレッド優先を使用したくありません。

using System;
using System.Collections.Generic;
using System.Threading;

namespace TestConcurrency
{

// I borrowed this class from Joe Duffy's blog and modified it
public class LatchCounter
{
 private long _remain;
 private EventWaitHandle m_event;
 private readonly object _lockObject;

public LatchCounter()
{
    _remain = 0;
    m_event = new ManualResetEvent(true);
    _lockObject = new object();
}

public void Check()
{
    if (Interlocked.Read(ref _remain) > 0)
    {
        m_event.WaitOne();
    }
}

public void Increment()
{
    lock(_lockObject)
    {
       if (Interlocked.Increment(ref _remain) == 1)
           m_event.Reset();
    }
}

public void Decrement()
{
    lock(_lockObject)
    {
       // The last thread to signal also sets the event.
       if (Interlocked.Decrement(ref _remain) == 0)
           m_event.Set();
    }
}
}



public class LowPriorityThreads
{
private List<Thread> _threads;
private LatchCounter _latch;
private int _threadCount = 1;

internal LowPriorityThreads(int threadCount)
{
    _threadCount = threadCount;
    _threads = new List<Thread>();
    for (int i = 0; i < _threadCount; i++)
    {
        _threads.Add(new Thread(ThreadProc));
    }

    _latch = new CountdownLatch();
}


public void Start()
{
    foreach (Thread t in _threads)
    {
        t.Start();
    }
}

void ThreadProc()
{
    while (true)
    {
        //Do something
        Thread.Sleep(Rand.Next());
        _latch.Check();
    }
}

internal void Pause()
{
    _latch.Increment();
}

internal void Resume()
{
    _latch.Decrement();
}
}


public class HighPriorityThreads
{
private Thread _thread;
private LowPriorityThreads _lowPriorityThreads;

internal HighPriorityThreads(LowPriorityThreads lowPriorityThreads)
{
    _lowPriorityThreads = lowPriorityThreads;
    _thread = new Thread(RandomlyInterruptLowPriortyThreads);
}

public void Start()
{
    _thread.Start();
}

void RandomlyInterruptLowPriortyThreads()
{
    while (true)
    {
        Thread.Sleep(Rand.Next());

        _lowPriorityThreads.Pause();

        Thread.Sleep(Rand.Next());
        _lowPriorityThreads.Resume();
    }
}
}

 class Program
 {
  static void Main(string[] args)
  {
    LowPriorityThreads lowPriorityThreads = new LowPriorityThreads(3);
    HighPriorityThreads highPriorityThreadOne = new HighPriorityThreads(lowPriorityThreads);
    HighPriorityThreads highPriorityThreadTwo = new HighPriorityThreads(lowPriorityThreads);

    lowPriorityThreads.Start();
    highPriorityThreadOne.Start();
    highPriorityThreadTwo.Start();
}
}


class Rand
{
internal static int Next()
{
    // Guid idea has been borrowed from somewhere on StackOverFlow coz I like it
    return new System.Random(Guid.NewGuid().GetHashCode()).Next() % 30000;
}
}
4

1 に答える 1

0

私はあなたの要件については知らないので、ここでは説明しません。実装に関しては、スレッド間の相互作用を処理し、「実行可能な」オブジェクトのファクトリとしても機能する「ディスパッチャ」クラスを導入します。

もちろん、実装は非常にラフであり、批判の余地があります。

class Program
{
    static void Main(string[] args)
    {
        ThreadDispatcher td=new ThreadDispatcher();
        Runner r1 = td.CreateHpThread(d=>OnHpThreadRun(d,1));
        Runner r2 = td.CreateHpThread(d => OnHpThreadRun(d, 2));

        Runner l1 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 1"));
        Runner l2 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 2"));
        Runner l3 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 3"));


        l1.Start();
        l2.Start();
        l3.Start();

        r1.Start();
        r2.Start();

        Console.ReadLine();

        l1.Stop();
        l2.Stop();
        l3.Stop();

        r1.Stop();
        r2.Stop();
    }

    private static void OnHpThreadRun(ThreadDispatcher d,int number)
    {
        Random r=new Random();
        Thread.Sleep(r.Next(100,2000));
        d.CheckedIn();
        Console.WriteLine(string.Format("*** Starting High Priority Thread {0} ***",number));
        Thread.Sleep(r.Next(100, 2000));
        Console.WriteLine(string.Format("+++ Finishing High Priority Thread {0} +++", number));
        Thread.Sleep(300);
        d.CheckedOut();           
    }
}

public abstract class Runner
{
    private Thread _thread;
    protected readonly Action<ThreadDispatcher> _action;
    private readonly ThreadDispatcher _dispathcer;
    private long _running;
    readonly ManualResetEvent _stopEvent=new ManualResetEvent(false);
    protected Runner(Action<ThreadDispatcher> action,ThreadDispatcher dispathcer)
    {
        _action = action;
        _dispathcer = dispathcer;
    }

    public void Start()
    {
        _thread = new Thread(OnThreadStart);
        _running = 1;
        _thread.Start();
    }

    public void Stop()
    {
        _stopEvent.Reset();
        Interlocked.Exchange(ref _running, 0);
        _stopEvent.WaitOne(2000);
        _thread = null;
        Console.WriteLine("The thread has been stopped.");

    }
    protected virtual void OnThreadStart()
    {
        while (Interlocked.Read(ref _running)!=0)
        {
            OnStartWork();
            _action.Invoke(_dispathcer);
            OnFinishWork();
        }
        OnFinishWork();
        _stopEvent.Set();
    }

    protected abstract void OnStartWork();
    protected abstract void OnFinishWork();
}

public class ThreadDispatcher
{
    private readonly ManualResetEvent _signal=new ManualResetEvent(true);
    private int _hpCheckedInThreads;
    private readonly object _lockObject = new object();

    public void CheckedIn()
    {
        lock(_lockObject)
        {
            _hpCheckedInThreads++;
            _signal.Reset();
        }
    }
    public void CheckedOut()
    {
        lock(_lockObject)
        {
            if(_hpCheckedInThreads>0)
                _hpCheckedInThreads--;
            if (_hpCheckedInThreads == 0)
                _signal.Set();
        }
    }

    private class HighPriorityThread:Runner 
    {
        public HighPriorityThread(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) : base(action,dispatcher)
        {
        }

        protected override void OnStartWork()
        {
        }

        protected override void OnFinishWork()
        {
        }
    }
    private class LowPriorityRunner:Runner
    {
        private readonly ThreadDispatcher _dispatcher;
        public LowPriorityRunner(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher)
            : base(action, dispatcher)
        {
            _dispatcher = dispatcher;
        }

        protected override void OnStartWork()
        {
            Console.WriteLine("LP Thread is waiting for a signal.");
            _dispatcher._signal.WaitOne();
            Console.WriteLine("LP Thread got the signal.");
        }

        protected override void OnFinishWork()
        {

        }
    }

    public Runner CreateLpThread(Action<ThreadDispatcher> action)
    {
        return new LowPriorityRunner(action, this);
    }

    public Runner CreateHpThread(Action<ThreadDispatcher> action)
    {
        return new HighPriorityThread(action, this);
    }
}

}

于 2013-02-26T15:57:24.880 に答える