5

次のコードで pub/sub から読み込もうとしています

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));

NodeJS で受け取るものと比較すると、dataフィールドに含まれるメッセージが表示されます。フィールドを取得するにはどうすればよいですかackId(後でメッセージを確認するために使用できます)。私が印刷している属性マップはnull. ackId を把握せずにすべてのメッセージを確認する他の方法はありますか?

4

1 に答える 1