0

私は Gobblin を初めて使用し、Kafka から HDFS にデータを取り込もうとしています。Kafka-HDFS Ingestionのサンプルを正常にフローできました。しかし今、時間ベースのライター パーティション オプションをジョブに追加する必要があります。私はTimeBasedWriterPartitioner Google フォーラムを調べて、Zongjun の提案に従って以下の解決策を考え出しました。

  1. 時間ベースのライター パーティション クラス用に別の Java プロジェクトを作成します。
import gobblin.writer.partitioner.TimeBasedWriterPartitioner;

public class LogJsonWriterPartitioner  extends TimeBasedWriterPartitioner<byte[]> {
    public LogJsonWriterPartitioner(gobblin.configuration.State state, int numBranches, int branchId) {
        super(state, numBranches, branchId);
    }

    @Override
    public long getRecordTimestamp(byte[] payload) {
        return System.currentTimeMillis();
    }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.pm.data.gobblin.kafka</groupId>
    <artifactId>LogJsonWriterPartitioner </artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.linkedin.gobblin</groupId>
            <artifactId>gobblin-api</artifactId>
            <version>0.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.linkedin.gobblin</groupId>
            <artifactId>gobblin-core</artifactId>
            <version>0.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.9.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
  1. gobblin-dist/lib上記のプロジェクトから Jar を作成し、ディレクトリにコピーします。
  2. ディレクトリ内のgobblin-mapreduce.sh を更新しgobblin-dist/bin、新しい jar 名を LIBJARS の下に追加します。
  3. 次のようにジョブ ファイルを作成します。
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
fs.uri=file:///

kafka.brokers=localhost:9092

source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka

writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.partitioner.class=com.pm.data.gobblin.kafka.LogJsonWriterPartitioner
writer.partition.granularity=day
writer.partition.pattern=YYYY-MM-dd
writer.partition.timezone=UTC
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt

data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
data.publisher.replace.final.dir=false
data.publisher.final.dir=/home/myuser/Desktop/Gobblin

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest
  1. 次に、bin ディレクトリにある gobblin-standalone.sh ファイルを使用して、スタンドアロンとして gobblin を起動します。

logs/gobblin-current.log で以下のエラーが発生しました

 org.apache.gobblin.runtime.fork.Fork  250 - Fork 0 of task task_GobblinKafkaQuickStart_1590391135660_0 failed to process data records. Set throwable in holder org.apache.gobblin.runtime.ForkThrowableHolder@433cf3c0
java.io.IOException: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
    at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:135)
    at org.apache.gobblin.runtime.fork.Fork.buildWriter(Fork.java:534)
    at org.apache.gobblin.runtime.fork.Fork.buildWriterIfNotPresent(Fork.java:542)
    at org.apache.gobblin.runtime.fork.Fork.processRecord(Fork.java:502)
    at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:103)
    at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:86)
    at org.apache.gobblin.runtime.fork.Fork.run(Fork.java:243)
    at org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:128)
    ... 12 more

ジョブファイルを として変更するとwriter.partitioner.class=LogJsonWriterPartitioner、エラーが として変更されjava.lang.NoClassDefFoundError: gobblin/writer/partitioner/TimeBasedWriterPartitionerます。

この問題を克服するのに役立つ人はいますか?

4

1 に答える 1