0

次のコードでは、オンライン キューからメッセージのリストを取得する繰り返しタイマーを使用しています。このコードの目的は、繰り返しタイマー スレッドがメッセージをフェッチするだけで、結果をメイン スレッドに渡して処理できることをテストすることでした。

代わりに、イベントリスナーがメインスレッドにあるにもかかわらず、繰り返しタイマースレッドでイベントが発生した後に呼び出されるすべてのアクションは、イベントが処理された後に常に終了します。ある時点でどのスレッドが CPU にあるかによって、これが時折発生する可能性があることは理解していますが、時々、print ステートメントが織り交ぜられているのを確認する必要があります。

また、約 50 のメッセージをキューに追加してこの効果をテストしましたが、それでも同じ結果が得られます。

私のコードは以下です

public class Service implements NewWindowEventArgsListener
{

private final static String accessKey = 
        "secret";

private final static String secretKey = 
        "secret";

private final static String AWSSQSServiceUrl =
        "secret";


private boolean IsWindowing = false;
private ScheduledExecutorService _windowTimer;
private long SQSWindow = 60000;
private NewWindowEventArgs handler = new NewWindowEventArgs();
private static List<Message> messages = new ArrayList<Message>();


public void StartProcesses()
{
    if(this.IsWindowing)
    {
        return;
    }

    handler.addListener(this);

    this._windowTimer = Executors.newSingleThreadScheduledExecutor();

    Runnable task = new Runnable() {
        public void run() {
            WindowCallback();
        }
    };

    this._windowTimer.scheduleAtFixedRate(task,
            0, SQSWindow, TimeUnit.MILLISECONDS);

    IsWindowing = true;
}

private void WindowCallback()
{
    Date now = new Date();
    System.out.println("The service is running: " + now);

    int numberOfMessages = 0;
    ArrayList<String> attributes = new ArrayList<String>();
    AWSCredentials cred = new BasicAWSCredentials(accessKey, secretKey);
    ClientConfiguration config = new ClientConfiguration();
    config.setMaxErrorRetry(10);

    AmazonSQS client = new AmazonSQSClient(cred, config);

    client.setEndpoint(AWSSQSServiceUrl);

    System.out.println("Receiving messages from the Queue.\n");
    ReceiveMessageRequest receiveMessageRequest = 
            new ReceiveMessageRequest(AWSSQSServiceUrl);

    receiveMessageRequest.setMaxNumberOfMessages(10);

    GetQueueAttributesRequest numMessages = 
            new GetQueueAttributesRequest(AWSSQSServiceUrl); 

    attributes.add("ApproximateNumberOfMessages");
    numMessages.setAttributeNames(attributes);

    numberOfMessages = Integer.valueOf(
            (client.getQueueAttributes(numMessages)).getAttributes().
            get("ApproximateNumberOfMessages")).intValue();

    System.out.println("Expected number of Messages: " + numberOfMessages);

    do
    {
        messages.addAll(client.receiveMessage(receiveMessageRequest).
            getMessages());
    }while(messages.size() < numberOfMessages);

    System.out.println("Starting the printing of messages");

    if ( messages.size() > 0)
    {
        System.out.println("A message exists!");
        System.out.println();
        handler.NewWindowEvent(messages);
        System.out.println("//////////////////////////////////");
        System.out.println("\tEmptying message list");
        messages.clear();
        System.out.println("\tMessage list empty");
        System.out.println("//////////////////////////////////");
        System.out.println();
    }
}

@Override
public void JetstreamService_NewWindow(List<Message> messages) {
    System.out.println("Number of messages: " + messages.size() + "\n");

    ObjectMapper mapper = new ObjectMapper();

    try 
    {
        for (Message message : messages)
        {

            //System.out.println(message.getBody() + "\n");
            //byte[] bytes = DatatypeConverter.parseBase64Binary(message.getBody());

            //String messageBody = new String(bytes, "UTF-8");

            //System.out.println(messageBody + "\n");

            AmazonSNSMessage b;

            b = mapper.readValue(message.getBody(), AmazonSNSMessage.class);

            String subject = b.getSubject().trim().toLowerCase();
            System.out.println(subject);

            if (subject.equals("heartbeatevent"))
            {
                HeartbeatEvent heartbeat = new HeartbeatEvent();

                heartbeat.Deserialize(b.getMessage());

                System.out.println(heartbeat.getHeaderEventTime() + "\n");
            }

            else if(subject.equals("logicaldeviceaddedevent"))
            {
                LogicalDeviceAddedEvent logical = 
                        new LogicalDeviceAddedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getLogicalDeviceAddedEventRegion() + "\n");
            }

            else if(subject.equals("logicaldeviceremovedevent"))
            {
                LogicalDeviceRemovedEvent logical = 
                        new LogicalDeviceRemovedEvent();

                logical.Deserialize(b.getMessage());

                System.out.println(
                        logical.getHeaderEventId());

                System.out.println(
                        logical.getHeaderEventTime() + "\n");
            }
        }
    } catch (JsonParseException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (JsonMappingException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }   
}

メインスレッドでメッセージが処理されない理由を説明してください。または、すべてのメッセージが処理された後にクリアメッセージの print ステートメントが常に発生する理由を説明してください。

4

1 に答える 1