2

Spring の統合ライブラリを使用して、mosquitto に接続してメッセージの読み取り/送信を試みています...しかし、理解できないことがいくつかあります。

1 - アプリを初期化するとき、アプリは mosquitto に接続しますが、mosquitto は数秒で同じ ID を持つ同じアプリから何百もの接続要求を受け取ります。これはログの例です:

New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.
New client connected from 127.0.0.1 as springClient (c1, k60).
Sending CONNACK to springClient (0, 0)
Received SUBSCRIBE from springClient
    0001/001/INF (QoS 1)
springClient 1 0001/001/INF
Sending SUBACK to springClient
New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.

2 - この構成を使用して mosquitto からメッセージを取得できません:

春の XML :

<!-- This is for reading messages -->
<bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="true"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
</bean>

 <int:channel  id="fromBrokerChannel" />

カスタム アダプター:

public class MqttCustomInboundAdapter extends MqttPahoMessageDrivenChannelAdapter {

    public MqttCustomInboundAdapter(String clientId,
            MqttPahoClientFactory clientFactory, String[] topic) {
        super(clientId, clientFactory, topic);
        // TODO Auto-generated constructor stub
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception
    {
        super.messageArrived(topic, message);
        System.out.println("**************** Message from topic : " + topic);
        System.out.println("**************** Message : " + new String(message.getPayload()));
    }

    public void addTopicIfNotExists(String topic)
    {
        for(String topicName:getTopic())
        {
            if(topicName.equals(topic))return;
        }

        addTopic(topic);

        System.out.println("************* Added Topic : " + topic);

        for(String topicName:getTopic())
        {
            System.out.println(topicName);
        }
    }
}

送信されたメッセージが到着したトピックを知る必要があるため、サービスアクティベーターを使用していないため、 Spring Integration DocsMqttPahoMessageDrivenChannelAdapter内で言及されているようにラップしました

何か提案はありますか?

4

2 に答える 2

2

java configでmqttを設定することができました

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {

    MqttPahoMessageDrivenChannelAdapter mqtt = new MqttPahoMessageDrivenChannelAdapter( applicationName + "-sub", clientFactory( ), "/#" );
    mqtt.setQos( 2 );
    mqtt.setOutputChannel( outbount( ) );
    mqtt.setAutoStartup( true );
    mqtt.setTaskScheduler( taskScheduler( ) );

    return mqtt;
}

@Bean
public MqttPahoMessageHandler mqqtMessageHandler() {

    return new MqttPahoMessageHandler( applicationName + "-pub", clientFactory( ) );
}

@Bean
public DefaultMqttPahoClientFactory clientFactory() {

    DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory( );
    clientFactory.setUserName( "test" );
    clientFactory.setPassword( "test" );
    clientFactory.setServerURIs( new String[] { "tcp://url:1883" } );
    return clientFactory;
}

@Bean
public PublishSubscribeChannel outbount() {

    PublishSubscribeChannel psc = new PublishSubscribeChannel( );
    psc.subscribe( new MessageHandler( ) {

        @Override
        public void handleMessage( Message<?> message ) throws MessagingException {

            logger.warn( message );

        }
    } );

    return psc;
}

メッセージを送信するには、次を追加します。

@Autowired
MqttPahoMessageHandler mqtt;

@RequestMapping( "/" )
public ModelAndView getHomePage() throws MqttPersistenceException, MqttException {

    Message<String> message = MessageBuilder.withPayload( "spring - test" ).setHeader( MqttHeaders.TOPIC, "/topic" ).build( );

    mqtt.handleMessage( message );

    return new ModelAndView( "home" );
}   
于 2015-08-18T16:46:13.340 に答える