私は MQTT を学ぼうとしていて、いじっています。パブリッシング用のクライアントとサブスクライブ用のクライアントを作成しました (以下を参照)。
サブスクライブ クライアントを実行してから (サブスクライブの実行中に) パブリッシュ クライアントを実行すると、すべて正常に動作します。サブスクライブ クライアントは、トピックに発行されたメッセージを正しく受信します。
しかし、最初にパブリッシュ クライアントを実行し (つまり、メッセージをトピックにパブリッシュ)、次にサブスクライブ クライアントを実行すると、メッセージを受信しません。
つまり、最初にサブ クライアントに接続し、サブ クライアントが接続されている間に pub クライアントでメッセージを発行すると、すべて正常に動作します。ただし、最初にメッセージを公開してからサブ クライアントに接続すると、メッセージを受信しません。私の理解では、クライアントと接続してトピックにサブスクライブすると、トピックに存在するメッセージを受信する必要があります。
同様の問題のように見えるものを見つけました: mqtt paho のサブスクライブされたトピックに既に発行されたメッセージを受信できませんが、そのケースは少し異なるようです。別の QoS 設定または cleanSession フラグを変更しようとしましたが、問題は解決しませんでした。
どんな助けでも大歓迎です!
パブリッシュ クライアント:
public class MQTT_Client_Pub implements MqttCallback{
MqttClient client;
public static void main(String[] args) {
new MQTT_Client_Pub().mqttPub();
}
public void mqttPub(){
try {
this.setConnection();
// Connect
client.connect();
// Create new message
MqttMessage message = new MqttMessage();
message.setPayload("A single test message from b112358".getBytes());
message.setQos(0);
// Publish message to a topic
System.out.println("Publishing a message.");
client.publish("pahodemo/test/b112358", message);
// Disconnect
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
public void setConnection(){
// Client
try{
client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub");
} catch (MqttException e) {
e.printStackTrace();
}
// Connection Options
MqttConnectOptions options = new MqttConnectOptions();
// Set the will
options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
// Set Callback
client.setCallback(this);
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered to the broker.");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {}
public void connectionLost(Throwable cause) {}
}
購読クライアント:
public class MQTT_Client_Sub implements MqttCallback{
MqttClient client;
public static void main(String[] args) {
new MQTT_Client_Sub().mqttSub();
}
public void mqttSub(){
try {
// Set connection
this.setConnection();
// Connect
client.connect();
// Subscribe
client.subscribe("pahodemo/test/b112358", 0);
// Disconnect
// client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void setConnection(){
try {
// Client
client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub");
} catch (MqttException e) {
e.printStackTrace();
}
// Connection Options
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
// Set the will
options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
client.setCallback(this);
}
public void deliveryComplete(IMqttDeliveryToken token) {}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes());
}
public void connectionLost(Throwable cause) {}
}