2

の着信メッセージへの応答を公開しようとしています messageArrived(...)。しかし、発行がハングし、次の行: logOutgoingMessage(topic, message)が呼び出されることはありません... 最後にデッドロックが発生し、クライアントが切断されます。

これが私のコードです:

@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {

@EJB
private AppliFacade facade;

@PostConstruct
public void start() {
    try {
        // connection options
        connOpts = new MqttConnectOptions();
        connOpts.setKeepAliveInterval(120);         
        connOpts.setCleanSession(true);
        connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);

        client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
        client.setCallback(this);
        connect();

        client.subscribe(SUBSCRIPTION_TOPIC, QoS);
    } catch (MqttException me) {
        log.error("Connection to " + BROKER_URL + " failed");
        logMqttException(me);
    }

}

private void connect() {
    // Tying a cycle of reconnects.
    boolean tryConnecting = true;
    while (tryConnecting) {
        try {
            client.connect(connOpts);
        } catch (Exception e1) {
            log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");             
        }
        if (client.isConnected()) {
            log.info("Connected to Broker " + BROKER_URL);
            tryConnecting = false;
        } else {
            pause();
        }
    }
}

private void publishAMessage(String topic, String pubMsg) {
    MqttMessage message = new MqttMessage(pubMsg.getBytes());
    message.setQos(QoS);
    // Publish the message
    log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
    try {
        // Publish to the broker
        client.publish(topic, message);
        // Wait until the message has been delivered to the broker
        logOutgoingMessage(topic, message);
    } catch (Exception e) {
        log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
    }
}

private String handleRquest(AbstractRequest request) throws JsonProcessingException {
    ...

    return jsonResp;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    // generate the response message ID
    messageId = "EB" + System.currentTimeMillis();

    // log the message
    logIncomingMessage(topic, message);

    // handle the message
    AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);

    // handle the request
    String jsonResp = handleRquest(request);

    // publish message
    publishAMessage(request.getReplyTopic(), jsonResp);
}

/**
 * 
 * Method callback is invoked when a message published by this client is
 * successfully received by the broker.
 * 
 */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // NOT NEEDED
}

}
4

2 に答える 2

4

次のようにコードを変更すると機能します。

MqttDeliveryToken token;
...
MqttTopic mqttTopic = client.getTopic(topic);
try {
  // Publish to the broker
  token = mqttTopic.publish(new MqttMessage(pubMsg.getBytes()));
  logOutgoingMessage(topic, message);
  ...
 }

しかし、最初の実装が機能しない理由がわかりません :x QoS 2 を使用して messageArrived() で発行するのは適切でない可能性があります。

于 2015-08-13T06:14:21.613 に答える
0

このコールバックの実装内で新しいメッセージ (たとえば、このメッセージへの応答) を送信することは可能ですが、処理中のメッセージの確認を送信することが不可能になるため、実装はクライアントを切断してはなりません。デッドロックが発生します。

eclipse.org からの公式リンク

于 2016-02-17T15:05:54.077 に答える