0

次のような単純な SCDF ストリームがあります。

http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

mvmn-transform は、次のような単純なカスタム トランスフォーマーです。

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Message<?> message) {
        Object payload = message.getPayload();
        Map<String, Object> result = new HashMap<>();
        Map<String, String> headersStr = new HashMap<>();

        message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

        result.put("headers", headersStr);
        result.put("payload", payload);
        result.put("configProp", config.getSomeConfigProp());

        return result;
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

これはうまくいきます。

しかし、Spring Cloud Function を使用すると、バインディングとトランスフォーマーのアノテーションを指定する必要なく、そのようなアプリを実装できるはずだと読んだので、次のように変更しました。

@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    // @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @Bean
    public Function<Message<?>, Map<String, Object>> transform(
    // Message<?> message
    ) {
        return message -> {
            Object payload = message.getPayload();
            Map<String, Object> result = new HashMap<>();
            Map<String, String> headersStr = new HashMap<>();

            message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

            result.put("headers", headersStr);
            result.put("payload", payload);
            result.put("configProp", "Config prop val: " + config.getSomeConfigProp());

            return result;
        };
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

そして今、問題があります.SCDFソースとターゲットのトピック名は明らかにSpring-Cloud-Functionによって無視され、代わりにトピックtransform-in-0transform-out-0が作成されます。

SCDF は、次のような名前を持つトピックを作成します<stream-name>.<app-name>TestStream123.httpTestStream123.mvmn-transform

以前は、SCDF ストリームの一部であるため、トランスフォーマーに使用されていました。しかし、Spring-Cloud-Function によって無視され、transform-in-0代わりtransform-out-0に作成されます。

したがって、私のトランスフォーマーは、間違った Kafka トピックで入力を期待しているため、入力を受信しなくなりました。また、間違った Kafka トピックにも出力されるため、ストリームにも出力が生成されない可能性があります。

PS 念のため、GitHub の完全なプロジェクト コード: https://github.com/mvmn/scdftest-transformer/tree/scfunc

ローカルで実行するには、Kafka、Skipper、SCDF、および SCDF コンソールを起動mvn clean installし、app フォルダーで実行app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOTしてから、coonsole で実行します。次に、定義を使用してストリームをデプロイできますhttp --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

4

1 に答える 1