0

Spring XDについては、すでにこの質問をしています。現在、Spring CDF に移行しようとしています。

このリンクを見つけて、そこでコードを再利用し、エンコードを変更しようとしました。

次のPOMを作成しました。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>tcp-ber-source</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>TCP Ber Source</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>

        <tcp-app-starters-common.version>1.1.0.RELEASE</tcp-app-starters-common.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>tcp-app-starters-common</artifactId>
            <version>${tcp-app-starters-common.version}</version>
        </dependency>

        <dependency>
            <groupId>com.example</groupId>
            <artifactId>ber-byte-array-serializers</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

構成:

@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpBerSourceProperties.class)
public class TcpBerSourceConfiguration {

    private final Source channels;
    private final TcpBerSourceProperties properties;

    @Autowired
    public TcpSourceConfiguration(final TcpBerSourceProperties properties, final Source channels) {
        this.properties = properties;
        this.channels = channels;
    }

    @Bean
    public TcpReceivingChannelAdapter adapter(@Qualifier("tcpBerSourceConnectionFactory") final AbstractConnectionFactory connectionFactory) {
        final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(connectionFactory);
        adapter.setOutputChannel(this.channels.output());
        return adapter;
    }

    @Bean
    public TcpConnectionFactoryFactoryBean tcpBerSourceConnectionFactory(@Qualifier("tcpBerSourceDecoder") final AbstractByteArraySerializer decoder) throws Exception {
        final TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
        factoryBean.setType("server");
        factoryBean.setPort(this.properties.getPort());
        factoryBean.setUsingNio(this.properties.isNio());
        factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
        factoryBean.setLookupHost(this.properties.isReverseLookup());
        factoryBean.setDeserializer(decoder);
        factoryBean.setSoTimeout(this.properties.getSocketTimeout());
        return factoryBean;
    }

    @Bean
    public BerEncoderDecoderFactoryBean tcpBerSourceDecoder() {
        final BerEncoderDecoderFactoryBean factoryBean = new BerEncoderDecoderFactoryBean(this.properties.getDecoder());
        factoryBean.setMaxMessageSize(this.properties.getBufferSize());
        return factoryBean;
    }
}

そして、この FactoryBean:

public class BerEncoderDecoderFactoryBean extends AbstractFactoryBean<AbstractByteArraySerializer> implements ApplicationEventPublisherAware {

    private final BerEncoding encoding;

    private ApplicationEventPublisher applicationEventPublisher;
    private Integer maxMessageSize;

    public BerEncoderDecoderFactoryBean(final BerEncoding encoding) {
        Assert.notNull(encoding, "'encoding' cannot be null");
        this.encoding = encoding;
    }

    @Override
    public void setApplicationEventPublisher(final ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    /**
     * The maximum message size allowed when decoding.
     * @param maxMessageSize the maximum message size.
     */
    public void setMaxMessageSize(final int maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
    }

    @Override
    protected AbstractByteArraySerializer createInstance() throws Exception {
        final AbstractByteArraySerializer codec;
        switch (this.encoding) {
            case SPLIT:
                codec = new ByteArrayBerSplitSerializer();
                break;
            case EXTRACT:
                codec = new ByteArrayBerExtractSerializer();
                break;
            default:
                throw new IllegalArgumentException("Invalid encoding: " + this.encoding);
        }
        codec.setApplicationEventPublisher(this.applicationEventPublisher);
        if (this.maxMessageSize != null) {
            codec.setMaxMessageSize(this.maxMessageSize);
        }
        return codec;
    }

    @Override
    public Class<?> getObjectType() {
        return AbstractByteArraySerializer.class;
    }
}

BerEncoding は単純な列挙型であり、TcpBerSourceProperties は非常に単純です。

これは正しいアプローチですか?

もしそうなら、どうすればこれを実行できますか?Spring Bootスタンドアロンアプリケーションとして実行するために言及されたリンクで見つけたtcpストリームアプリスターターのどこにも@SpringBootApplicationが表示されませんか?

4

1 に答える 1

1

独自の Spring Boot App クラスを作成し、構成クラスをインポートする必要があります。カスタム アプリの作成に関するドキュメントを参照してください。

here で説明されているように、スターターから標準アプリ (rabbit および kafka バインダー用) を生成します。

于 2016-12-19T22:31:02.197 に答える