回答: Apache Flink と Riak CS に参加するにはどうすればよいですか?
Riak CS は S3 (バージョン 2) 互換のインターフェースを備えています。そのため、Hadoop から S3 ファイル システム アダプターを使用して Riak CS と連携することができます。
理由はわかりませんが、Apache Flink には、fat jar ( lib/flink-dist_2.11-1.0.1.jar
) 内に Hadoop ファイルシステム アダプターの一部しかありません。つまり、FTP ファイル システム ( org.apache.hadoop.fs.ftp.FTPFileSystem
) はありますが、S3 ファイル システム (つまり ) はありませんorg.apache.hadoop.fs.s3a.S3AFileSystem
。したがって、この問題を解決するには 2 つの方法があります。
- Hadoop インストールからこれらのアダプターを使用します。私はこれを試しませんでしたが、 HADOOP_CLASSPATH または HADOOP_HOME evn 変数を設定するだけでよいようです。
- monky patch Apache Flink と
<flink home>/lib
ディレクトリへの必要な JAR のダウンロード
私の環境では Hadoop をプロビジョニングしたくないので、2 番目の方法を選択します。Hadoop dist またはインターネットから JAR をコピーできます。
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
ご覧のとおり、私は古いバージョンを使用しています。そのようなバージョンは Hadoop 2.7.2 で使用されており、このバージョンの Hadoop と互換性のある Flink を使用しているためです。
参考までに: これらの JAR の最新バージョンを独自のフローで使用している場合、このようなハッキングが問題を引き起こす可能性があります。さまざまなバージョンに関連する問題を回避するには、次のようなフローを使用してファット jar をビルドするときにパッケージを再配置できます (私は Gradle を使用しています)。
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
次に、Hadoop 互換ファイル システムが設定をロードするためにこの構成を使用するためcore-site.xml
、パスを指定する必要があります。flink-conf.yaml
...
fs.hdfs.hadoopconf: /flink/conf
...
ご覧のとおり、<fink home>/conf
ディレクトリに配置するだけです。次の設定があります。
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
次に、ここflink-conf.yaml
で Riak CS バケットをレコメンダーとして設定する必要があります。
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
Riak CS でバケットを作成します。私は使用しています( OS X dev envにs3cmd
インストールされています):brew
s3cmd mb s3://example-staging-flink
参考:使用s3cmd
する前に、使用して構成し、ファイルs3cmd --configure
のいくつかの設定を修正する必要があります。~/.s3cmd
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
これで、Riak CS のスタンドアロン HA Apache Flink クラスターの状態を保存/復元するために設定する必要があるのはこれだけです。