0

現在取り組んでいるプロジェクトでは、継続的に実行される 1 つのスレッドから一連のオブジェクトをメイン スレッドに効率的に渡す方法を実装する必要があります。現在のセットアップは次のようなものです。

新しいスレッドを作成するメインスレッドがあります。この新しいスレッドは継続的に動作し、タイマーに基づいてメソッドを呼び出します。このメソッドは、オンライン ソースからメッセージのグループをフェッチし、それらを TreeSet に編成します。

次に、この TreeSet をメイン スレッドに戻す必要があります。これにより、この TreeSet に含まれるメッセージを繰り返しタイマーとは無関係に処理できるようになります。

より良い参照のために、私のコードは次のようになります

// Called by the main thread on start.  
void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this.CancellationToken = false; 
    _windowTimer.scheduleAtFixedRate(task,
            0, this.SQSWindow, TimeUnit.MILLISECONDS);

    this.IsWindowing = true;
}

/////////////////////////////////////////////////////////////////////////////////

private void WindowCallback()
{
    ArrayList<Message> messages = new ArrayList<Message>();

    //TODO create Monitor
    if((!CancellationToken))
    {
        try
        {
            //TODO fix epochWindowTime
            long epochWindowTime = 0;
            int numberOfMessages = 0;
            Map<String, String> attributes;

            // Setup the SQS client
            AmazonSQS client = new AmazonSQSClient(new 
                    ClasspathPropertiesFileCredentialsProvider());

            client.setEndpoint(this.AWSSQSServiceUrl);

            // get the NumberOfMessages to optimize how to 
            // Receive all of the messages from the queue

            GetQueueAttributesRequest attributesRequest = 
                    new GetQueueAttributesRequest();
            attributesRequest.setQueueUrl(this.QueueUrl);
            attributesRequest.withAttributeNames(
                    "ApproximateNumberOfMessages");
            attributes = client.getQueueAttributes(attributesRequest).
                    getAttributes();

            numberOfMessages = Integer.valueOf(attributes.get(
                    "ApproximateNumberOfMessages")).intValue();

            // determine if we need to Receive messages from the Queue
            if (numberOfMessages > 0)
            {

                if (numberOfMessages < 10)
                {
                    // just do it inline it's less expensive than 
                    //spinning threads
                    ReceiveTask(numberOfMessages);
                }
                else
                {
                    //TODO Create a multithreading version for this
                    ReceiveTask(numberOfMessages);
                }
            }

            if (!CancellationToken)
            {

                //TODO testing
                _setLock.lock();

                Iterator<Message> _setIter = _set.iterator();
                //TODO
                while(_setIter.hasNext())
                {
                    Message temp = _setIter.next();

                    Long value = Long.valueOf(temp.getAttributes().
                            get("Timestamp"));
                    if(value.longValue() < epochWindowTime)
                    {
                        messages.add(temp);
                        _set.remove(temp);
                    }
                }

                _setLock.unlock();

                // TODO deduplicate the messages

                // TODO reorder the messages

                // TODO raise new Event with the results
            }

            if ((!CancellationToken) && (messages.size() > 0))
            {
                if (messages.size() < 10)
                {
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
                else
                {
                    //TODO Create a way to divide this work among 
                    //several threads
                    Pair<Integer, Integer> range = 
                            new Pair<Integer, Integer>(Integer.valueOf(0), 
                                    Integer.valueOf(messages.size()));
                    DeleteTask(messages, range);
                }
            }
        }catch (AmazonServiceException ase){
            ase.printStackTrace();
        }catch (AmazonClientException ace) {
            ace.printStackTrace();
        }
    }
}

いくつかのコメントからわかるように、これを処理するための現在の推奨方法は、メッセージがある場合にタイマー スレッドでイベントを作成することです。その後、メイン スレッドはこのイベントをリッスンし、適切に処理します。

現在、Java がイベントを処理する方法、またはそれらを作成/リッスンする方法に慣れていません。また、イベントを作成し、イベントに含まれる情報をスレッド間で渡すことができるかどうかもわかりません。

私の方法が可能かどうかについて、誰かが私にアドバイス/洞察を与えることができますか? もしそうなら、現在の検索の試みが実を結んでいないので、それらを実装する方法に関する情報をどこで見つけることができますか.

そうでない場合は、可能であればソケットを管理する必要がないようにしたいことを念頭に置いて、これをどのように行うかについていくつかの提案を得ることができますか?

編集1:

メイン スレッドは、受信したメッセージに基づいてコマンドを発行したり、必要な情報を取得するためのコマンドを発行したりする役割も果たします。このため、メイン スレッドはメッセージの受信を待つことができず、イベント ベースの方法で処理する必要があります。

4

1 に答える 1

1

生産者と消費者のパターン:

1 つのスレッド (プロデューサー) がオブジェクト (メッセージ) をキューに継続的にスタックします。別のスレッド (消費者) がオブジェクトを読み取り、キューから削除します。

あなたの問題がこれに当てはまる場合は、「BlockingQueue」を試してください。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

簡単で効果的です。

キューが空の場合、コンシューマーは「ブロック」されます。つまり、プロデューサーがいくつかのオブジェクトを配置するまで、スレッドは待機します (したがって、CPU 時間を使用しません)。それ以外の場合、cosumer は継続的にオブジェクトを消費します。また、キューがいっぱいの場合、コンシューマーがいくつかのオブジェクトを消費してキューに空きを作るまで、プロデューサーはブロックされます。逆の場合も同様です。

以下に例を示します: (キューはプロデューサーとコンシューマーの両方で同じオブジェクトである必要があります)


(プロデューサースレッド)

Message message = createMessage();
queue.put(message);

(消費者スレッド)

Message message = queue.take();
handleMessage(message);
于 2013-06-17T10:03:19.607 に答える