2

Dataproc を使用して GCP 上にクラスタを作成しました。1 マスター、2 スレーブ。私のシステムは、rabbitmq を使用して 1 時間ごとにアイテムをキューに入れ、消費者はキューの各アイテムに対して pyspark を介してスパーク ジョブを実行します。注: pyspark を通常の python 環境にインポートし、そこから spark を使用できるように構成しました。

長時間実行した後、実際にはそれほど長くはありませんが、システムはリースを更新できないことを出力し、セーフモードに移行します

WARN org.apache.hadoop.hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1938993080_12] for 304 seconds.  Will retry shortly ...  
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot renew lease for DFSClient_NONMAPREDUCE_-1938993080_12. Name node is in safe mode.
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE:  If you turn off safe mode before adding resources, the NN will immediately return to safe mode. 

システムはしばらくの間実行を続け、最終的にシャットダウンします。

    at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:892)
    at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:423)
    at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:448)
    at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
    at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304)
    at java.lang.Thread.run(Thread.java:745)
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

そのままサービスを再開してみます

python mq_consumer.py

そのように見える

import json
import pika
from lex_engine import run_engine as re, db_tools as dbt, credentials as creds

# This file consumes form the rabbit mq queue and runs the jobs it gets from
# the queue in spark. It also sets the lock key, to ensure that multiple jobs
# with the same ID don't start at the same time.
ADDR = creds.RABBITMQ_VARS['address']
CONNECTION = pika.BlockingConnection(pika.ConnectionParameters(host=ADDR))
CHANNEL = CONNECTION.channel()
QUEUE = creds.RABBITMQ_VARS['queue']
MSERVER = dbt.get_mongo(creds.MONGO_CREDS)
MONGO_DB = MSERVER[creds.MONGO_DB_VARS['database']]
COLLECTION = MONGO_DB[creds.MONGO_DB_VARS['collection']]


for method_frame, properties, body in CHANNEL.consume(QUEUE):
# This removes the message from the queue
CHANNEL.basic_ack(method_frame.delivery_tag)
json_payload = json.loads(body)
if (json_payload['id'] is not None and
        json_payload['body'] is not None and
        json_payload['title'] is not None):
    lock_key = 'le-' + creds.ENVIRONMENT + '-' + json_payload['id']
    if COLLECTION.find_one({u'_id': lock_key}) is None:
        COLLECTION.insert_one({u'_id': lock_key, u'in_use': 1})
        re.run_engine(json_payload)
        COLLECTION.delete_one({u'_id': lock_key})

しかし、spark/hadoop は再び開始されず、次のように表示されます。

16/03/15 20:29:09 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/03/15 20:29:09 INFO Remoting: Starting remoting
16/03/15 20:29:09 INFO Remoting: Remoting started; listening on addresses :        [akka.tcp://sparkDriverActorSystem@10.128.0.5:35762]
16/03/15 20:29:10 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-    SNAPSHOT
16/03/15 20:29:10 INFO org.spark-project.jetty.server.AbstractConnector:     Started SelectChannelConnector@0.0.0.0:4040
16/03/15 20:29:10 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to      ResourceManager at lexe-1-m/10.128.0.5:8032
16/03/15 20:29:11 INFO org.apache.hadoop.ipc.Client: Retrying connect to   server: lexe-1-m/10.128.0.5:8032. Already tried 0 time(s); retry policy is      RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:12 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:13 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:14 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:15 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:16 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:17 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:18 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:19 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:20 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

サーバー(debian)を再起動すると、再び機能しますが、しばらくすると同じ問題が再び発生します。これを修正する方法を知っている人はいますか?NameNode により多くのリソースを与えることを検討しましたが、私の考えでは、これはシステムの再起動を妨げるべきではありませんよね? 誰にも洞察がありますか?

4

0 に答える 0