2

SQL クエリの出力を並行して処理しようとしています。以下は私のコードです。Aggregatorにsysoutがあります。しかし、Aggregator の sysout が印刷されていないことがランダムにわかります。また、アグリゲーターの解放方法も sysout を出力しません。どこかでメッセージを失っていると思います。誰でも光を当てることができますか?

    <int:bridge input-channel="inputChannel" output-channel="dbRequestChannel" />

        <jdbc:outbound-gateway request-channel="dbRequestChannel"
            max-rows-per-poll="0" data-source="dataSource" reply-channel="headerEnricher"
            query="select empname, empno, empdob from employee where empno = 1234" />

        <int:header-enricher input-channel="headerEnricher"
            output-channel="splitterChannel">
            <int:header name="payloadSize" value="3"></int:header>
        </int:header-enricher>

        <int:chain input-channel="splitterChannel" output-channel="splitterOutputChannel">
            <int:splitter />
        </int:chain>


        <int:channel id="splitterOutputChannel">
            <int:dispatcher task-executor="sampleTaskExecutor" />
        </int:channel>

        <bean id="sampleTaskExecutor"
            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="5" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="25" />
        </bean>

        <int:service-activator input-channel="splitterOutputChannel"
            ref="springIntegrationtest" method="testMethod" output-channel="aggregatePayload">
        </int:service-activator>

        <int:aggregator input-channel="aggregatePayload"
            release-strategy-method="release" output-channel="nullChannel"
            send-partial-result-on-expiry="true" ref="springIntegrationtest"
            method="aggregateData" />

    @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:spring-integration.xml" })
public class SpringIntegrationTest {


    @Autowired
    private MessageChannel inputChannel;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");



    @Test
    public void testQueue() {
        Message<String> quoteMessage = MessageBuilder
                .withPayload("testPayload").build();
        inputChannel.send(quoteMessage);
    }


    public Map<String, String> testMethod(Message<?> m) {

        System.out.println(sdf.format(new Date()));
        return (Map<String, String>) m.getPayload();
    }

    public boolean release(ArrayList<Map<String, Object>> payload) {
        boolean release = false;
        int size = payload.size();
        if (size == 3) {
            release = true;
        }
        System.out.println(release);
        return release;
    }

    public Message<String> aggregateData(ArrayList<Map<String, Object>> payload) {

        System.out.println("In aggregateData " + payload);
        Message<String> quoteMessage = MessageBuilder
                .withPayload("testPayload").build();
        return quoteMessage;
    }

}
4

1 に答える 1

1

さて、あなたの問題は、状態とオプションの組み合わせにあると思います。

あなたはこれを持っています:

int size = payload.size();
if (size == 3) {
    release = true;
}

3そのため、アグリゲーターはアイテムが到着した直後にグループを解放しますが、分割後はさらに多くのアイテムが存在する可能性があります.

アグリゲーターはrelease、グループを終了して終了するように通知します。デフォルトでは、 のようなオプションがあります。これは、グループをストアに保持expireGroupsUponCompletion = falseしますが、状態を保持することを意味します。completed

アグリゲーターのタプルから 3 だけ出力することが目標の場合は、 を に切り替えることを検討する必要がありexpireGroupsUponCompletionますtrue。詳細については、アグリゲーターのマニュアルを参照してください。

于 2016-07-19T00:46:48.110 に答える