0

filesystem状態のバックエンドとzookeeper回復モードを構成したい:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

ご覧のとおりcheckpointdirstorageDirパラメーターを指定する必要がありますが、Apache Flink でサポートされているファイル システム (HDFS や Amazon S3 など) はありません。しかし、Riak CS クラスターをインストールしました ( S3 と互換性があるようです)。

では、Riak CS を Apache Flink と一緒に使用できますか? 可能であれば: Riak CS で動作するように Apache Flink を構成するにはどうすればよいですか?

4

1 に答える 1

3

回答: Apache Flink と Riak CS に参加するにはどうすればよいですか?

Riak CS は S3 (バージョン 2) 互換のインターフェースを備えています。そのため、Hadoop から S3 ファイル システム アダプターを使用して Riak CS と連携することができます。

理由はわかりませんが、Apache Flink には、fat jar ( lib/flink-dist_2.11-1.0.1.jar) 内に Hadoop ファイルシステム アダプターの一部しかありません。つまり、FTP ファイル システム ( org.apache.hadoop.fs.ftp.FTPFileSystem) はありますが、S3 ファイル システム (つまり ) はありませんorg.apache.hadoop.fs.s3a.S3AFileSystem。したがって、この問題を解決するには 2 つの方法があります。

  • Hadoop インストールからこれらのアダプターを使用します。私はこれを試しませんでしたが、 HADOOP_CLASSPATH または HADOOP_HOME evn 変数を設定するだけでよいようです。
  • monky patch Apache Flink と<flink home>/libディレクトリへの必要な JAR のダウンロード

私の環境では Hadoop をプロビジョニングしたくないので、2 番目の方法を選択します。Hadoop dist またはインターネットから JAR をコピーできます。

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

ご覧のとおり、私は古いバージョンを使用しています。そのようなバージョンは Hadoop 2.7.2 で使用されており、このバージョンの Hadoop と互換性のある Flink を使用しているためです。

参考までに: これらの JAR の最新バージョンを独自のフローで使用している場合、このようなハッキングが問題を引き起こす可能性があります。さまざまなバージョンに関連する問題を回避するには、次のようなフローを使用してファット jar をビルドするときにパッケージを再配置できます (私は Gradle を使用しています)。

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

次に、Hadoop 互換ファイル システムが設定をロードするためにこの構成を使用するためcore-site.xml、パスを指定する必要があります。flink-conf.yaml

...
fs.hdfs.hadoopconf: /flink/conf
...

ご覧のとおり、<fink home>/confディレクトリに配置するだけです。次の設定があります。

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

次に、ここflink-conf.yamlで Riak CS バケットをレコメンダーとして設定する必要があります。

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

Riak CS でバケットを作成します。私は使用しています( OS X dev envにs3cmdインストールされています):brew

s3cmd mb s3://example-staging-flink

参考:使用s3cmdする前に、使用して構成し、ファイルs3cmd --configureのいくつかの設定を修正する必要があります。~/.s3cmd

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

これで、Riak CS のスタンドアロン HA Apache Flink クラスターの状態を保存/復元するために設定する必要があるのはこれだけです。

于 2016-04-15T11:37:10.833 に答える