現在取り組んでいるプロジェクトでは、継続的に実行される 1 つのスレッドから一連のオブジェクトをメイン スレッドに効率的に渡す方法を実装する必要があります。現在のセットアップは次のようなものです。
新しいスレッドを作成するメインスレッドがあります。この新しいスレッドは継続的に動作し、タイマーに基づいてメソッドを呼び出します。このメソッドは、オンライン ソースからメッセージのグループをフェッチし、それらを TreeSet に編成します。
次に、この TreeSet をメイン スレッドに戻す必要があります。これにより、この TreeSet に含まれるメッセージを繰り返しタイマーとは無関係に処理できるようになります。
より良い参照のために、私のコードは次のようになります
// Called by the main thread on start.
void StartProcesses()
{
if(this.IsWindowing)
{
return;
}
this._windowTimer = Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
WindowCallback();
}
};
this.CancellationToken = false;
_windowTimer.scheduleAtFixedRate(task,
0, this.SQSWindow, TimeUnit.MILLISECONDS);
this.IsWindowing = true;
}
/////////////////////////////////////////////////////////////////////////////////
private void WindowCallback()
{
ArrayList<Message> messages = new ArrayList<Message>();
//TODO create Monitor
if((!CancellationToken))
{
try
{
//TODO fix epochWindowTime
long epochWindowTime = 0;
int numberOfMessages = 0;
Map<String, String> attributes;
// Setup the SQS client
AmazonSQS client = new AmazonSQSClient(new
ClasspathPropertiesFileCredentialsProvider());
client.setEndpoint(this.AWSSQSServiceUrl);
// get the NumberOfMessages to optimize how to
// Receive all of the messages from the queue
GetQueueAttributesRequest attributesRequest =
new GetQueueAttributesRequest();
attributesRequest.setQueueUrl(this.QueueUrl);
attributesRequest.withAttributeNames(
"ApproximateNumberOfMessages");
attributes = client.getQueueAttributes(attributesRequest).
getAttributes();
numberOfMessages = Integer.valueOf(attributes.get(
"ApproximateNumberOfMessages")).intValue();
// determine if we need to Receive messages from the Queue
if (numberOfMessages > 0)
{
if (numberOfMessages < 10)
{
// just do it inline it's less expensive than
//spinning threads
ReceiveTask(numberOfMessages);
}
else
{
//TODO Create a multithreading version for this
ReceiveTask(numberOfMessages);
}
}
if (!CancellationToken)
{
//TODO testing
_setLock.lock();
Iterator<Message> _setIter = _set.iterator();
//TODO
while(_setIter.hasNext())
{
Message temp = _setIter.next();
Long value = Long.valueOf(temp.getAttributes().
get("Timestamp"));
if(value.longValue() < epochWindowTime)
{
messages.add(temp);
_set.remove(temp);
}
}
_setLock.unlock();
// TODO deduplicate the messages
// TODO reorder the messages
// TODO raise new Event with the results
}
if ((!CancellationToken) && (messages.size() > 0))
{
if (messages.size() < 10)
{
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
else
{
//TODO Create a way to divide this work among
//several threads
Pair<Integer, Integer> range =
new Pair<Integer, Integer>(Integer.valueOf(0),
Integer.valueOf(messages.size()));
DeleteTask(messages, range);
}
}
}catch (AmazonServiceException ase){
ase.printStackTrace();
}catch (AmazonClientException ace) {
ace.printStackTrace();
}
}
}
いくつかのコメントからわかるように、これを処理するための現在の推奨方法は、メッセージがある場合にタイマー スレッドでイベントを作成することです。その後、メイン スレッドはこのイベントをリッスンし、適切に処理します。
現在、Java がイベントを処理する方法、またはそれらを作成/リッスンする方法に慣れていません。また、イベントを作成し、イベントに含まれる情報をスレッド間で渡すことができるかどうかもわかりません。
私の方法が可能かどうかについて、誰かが私にアドバイス/洞察を与えることができますか? もしそうなら、現在の検索の試みが実を結んでいないので、それらを実装する方法に関する情報をどこで見つけることができますか.
そうでない場合は、可能であればソケットを管理する必要がないようにしたいことを念頭に置いて、これをどのように行うかについていくつかの提案を得ることができますか?
編集1:
メイン スレッドは、受信したメッセージに基づいてコマンドを発行したり、必要な情報を取得するためのコマンドを発行したりする役割も果たします。このため、メイン スレッドはメッセージの受信を待つことができず、イベント ベースの方法で処理する必要があります。