私は Gobblin を初めて使用し、Kafka から HDFS にデータを取り込もうとしています。Kafka-HDFS Ingestionのサンプルを正常にフローできました。しかし今、時間ベースのライター パーティション オプションをジョブに追加する必要があります。私はTimeBasedWriterPartitioner Google フォーラムを調べて、Zongjun の提案に従って以下の解決策を考え出しました。
- 時間ベースのライター パーティション クラス用に別の Java プロジェクトを作成します。
import gobblin.writer.partitioner.TimeBasedWriterPartitioner;
public class LogJsonWriterPartitioner extends TimeBasedWriterPartitioner<byte[]> {
public LogJsonWriterPartitioner(gobblin.configuration.State state, int numBranches, int branchId) {
super(state, numBranches, branchId);
}
@Override
public long getRecordTimestamp(byte[] payload) {
return System.currentTimeMillis();
}
}
POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.pm.data.gobblin.kafka</groupId>
<artifactId>LogJsonWriterPartitioner </artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.linkedin.gobblin</groupId>
<artifactId>gobblin-api</artifactId>
<version>0.6.2</version>
</dependency>
<dependency>
<groupId>com.linkedin.gobblin</groupId>
<artifactId>gobblin-core</artifactId>
<version>0.6.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
gobblin-dist/lib
上記のプロジェクトから Jar を作成し、ディレクトリにコピーします。- ディレクトリ内のgobblin-mapreduce.sh を更新し
gobblin-dist/bin
、新しい jar 名を LIBJARS の下に追加します。 - 次のようにジョブ ファイルを作成します。
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
fs.uri=file:///
kafka.brokers=localhost:9092
source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=org.apache.gobblin.extract.kafka
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.partitioner.class=com.pm.data.gobblin.kafka.LogJsonWriterPartitioner
writer.partition.granularity=day
writer.partition.pattern=YYYY-MM-dd
writer.partition.timezone=UTC
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
data.publisher.replace.final.dir=false
data.publisher.final.dir=/home/myuser/Desktop/Gobblin
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${gobblin.cluster.work.dir}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
- 次に、bin ディレクトリにある gobblin-standalone.sh ファイルを使用して、スタンドアロンとして gobblin を起動します。
logs/gobblin-current.log で以下のエラーが発生しました
org.apache.gobblin.runtime.fork.Fork 250 - Fork 0 of task task_GobblinKafkaQuickStart_1590391135660_0 failed to process data records. Set throwable in holder org.apache.gobblin.runtime.ForkThrowableHolder@433cf3c0
java.io.IOException: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:135)
at org.apache.gobblin.runtime.fork.Fork.buildWriter(Fork.java:534)
at org.apache.gobblin.runtime.fork.Fork.buildWriterIfNotPresent(Fork.java:542)
at org.apache.gobblin.runtime.fork.Fork.processRecord(Fork.java:502)
at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:103)
at org.apache.gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:86)
at org.apache.gobblin.runtime.fork.Fork.run(Fork.java:243)
at org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.pm.data.logging.gobblin.LogJsonWriterPartitioner
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:128)
... 12 more
ジョブファイルを として変更するとwriter.partitioner.class=LogJsonWriterPartitioner
、エラーが として変更されjava.lang.NoClassDefFoundError: gobblin/writer/partitioner/TimeBasedWriterPartitioner
ます。
この問題を克服するのに役立つ人はいますか?