0

Spring Cloud Data Flow で DLQ を構成しようとしています。これがストリームの定義とそれを展開する方法です

  stream create --definition ":someTestTopic > custom-transform
     --spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran


    stream deploy ticktran --properties
  "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true"

custom-transform - プロセッサ コードで、私は言及しました

if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

つまり、メッセージに ERROR が含まれている場合、RunTimeException が発生し、これらのメッセージを DLQ でキャプチャしたいということです。しかし、コードを実行しているとき、test-tran という名前の Kafka DL キューを取得していないようです。

DLQ を有効にするにはさらにプロパティを設定する必要がありますか、それとも DLQ を適切に使用するためにコードを変更する必要がありますか?

カスタム変換コード

TransformationServiceApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableEntityLinks;

@SpringBootApplication
@EnableEntityLinks
public class TransformationServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(TransformationServiceApplication.class, args);
    }
}

TransformationMessageEndPoint.java

@EnableBinding(Processor.class)
@MessageEndpoint
public class TransformationMessageEndpoint {

    private static final String NS = "http://openrisk.com/ingestion/";

    AtomicInteger index = new AtomicInteger(1);
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object process(Message<?> message) {
        String out =  new String((byte[])message.getPayload());

        System.out.println("*****" + out);

        if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

        return message;

    }
}

pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.module</groupId>
            <artifactId>spring-cloud-stream-modules-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.jena</groupId>
            <artifactId>jena-core</artifactId>
            <version>3.1.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

モジュールの追加

app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT

ストリームの追加

stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran

ストリームのデプロイ

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"
4

2 に答える 2

0

ストリーム定義に問題はほとんどありませんでした。

  • 展開プロパティは で始まりapp.<app-name>.ますがapps.<app-name>.、いくつかの場所にいます。
  • 宛先は SCDF で自動的に作成されるため、デフォルトをオーバーライドすることはお勧めしません。spring-cloud-streamただし、スタンドアロンアプリケーションを実行している場合は、これを行うことができます。
  • カスタム宛先を使用する代わりに、デフォルト チャネルと直接対話することで DLQ を有効にすることができます - 以下の例を参照してください。

stream create --definition ":someTesssstTopic > transform | log " --name ticktran

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform .spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

  • プロパティでの参照に関しては、宛先test-tranが許容される形式ではありません。app.transform.spring.cloud.stream.kafka.bindings.<channelName>.consumer.enableDlq
  • 最後に、error.<destination>.<group>トピックはエラーが発生した場合にのみ作成されます。

#885を介して、いくつかの DSL サンプルをリファレンス ガイドに追加します。

編集: ストリーム定義を更新して、正しい展開プロパティのプレフィックスを反映させました。

于 2016-09-15T22:41:56.617 に答える
0

以下のコマンドを使用してデータ フローのバージョンを1.1 M1 リリースに変更し、プロパティを作成してデプロイしました。

stream create --definition ":someTesstTopic > transform | log " --name ticktran


stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

ありがとうサビー・アナンダン

于 2016-09-18T02:06:16.730 に答える