3

Oozieを使用して、3 つのサブワークフローを並行して実行するワークフローを実行しようとしていますfork。サブワークフローには、ネイティブのマップ縮小ジョブを実行するノードと、いくつかの複雑なPIGジョブを実行する後続の 2 つのノードが含まれています。最後に、3 つのサブワークフローが 1 つのendノードに結合されます。

このワークフローを実行すると、LeaseExpiredException. PIG例外は、ジョブの実行中にランダムに発生します。発生する場所は決まっていませんが、WF を実行するたびに発生します。

forkまた、サブワークフローを削除して順番に実行すると、正常に動作します。ただし、私たちの期待は、それらが並行して実行され、実行時間によっては同じになることです。

この問題と、どこで問題が発生する可能性があるかについてのヒントを教えてください。私たちは開発を開始してhadoopおり、以前はこのような問題に直面していませんでした。

複数のタスクが並行して実行されているため、スレッドの 1 つがパーツ ファイルを閉じ、別のスレッドが同じファイルを閉じようとすると、エラーがスローされるようです。

以下は、hadoop ログからの例外のスタック トレースです。

2013-02-19 10:23:54,815 INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher: 57% complete 
2013-02-19 10:26:55,361 INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher: 59% complete
2013-02-19 10:27:59,666 ERROR org.apache.hadoop.hdfs.DFSClient: Exception closing file <hdfspath>/oozie-oozi/0000105-130218000850190-oozie-oozi-W/aggregateData--pig/output/_temporary/_attempt_201302180007_0380_m_000000_0/part-00000 : org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on <hdfspath>/oozie-oozi/0000105-130218000850190-oozie-oozi-W/aggregateData--pig/output/_temporary/_attempt_201302180007_0380_m_000000_0/part-00000 File does not exist. Holder DFSClient_attempt_201302180007_0380_m_000000_0 does not have any open files.
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1664)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1655)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1710)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1698)
                at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:793)
                at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                at java.lang.reflect.Method.invoke(Method.java:597)
                at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
                at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1439)
                at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1435)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:396)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
                at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1433)

以下は、メイン ワークフローと 1 つのサブワークフローのサンプルです。

主なワークフロー:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="MainProcess">
<start to="forkProcessMain"/>
<fork name="forkProcessMain">
    <path start="Proc1"/>
    <path start="Proc2"/>
    <path start="Proc3"/>
</fork>
<join name="joinProcessMain" to="end"/>
<action name="Proc1">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc1_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<action name="Proc2">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc2_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<action name="Proc3">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc3_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<kill name="fail">
    <message>WF Failure, 'wf:lastErrorNode()' failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

サブワークフロー:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="Sub Process">
<start to="Step1"/>
<action name="Step1">
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${step1JoinOutputPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>mapred.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>com.absd.mr.step1</main-class>
        <arg>${wf:name()}</arg>
        <arg>${wf:id()}</arg>
        <arg>${tbMasterDataOutputPath}</arg>
        <arg>${step1JoinOutputPath}</arg>
        <arg>${tbQueryKeyPath}</arg>
        <capture-output/>
    </java>
    <ok to="generateValidQueryKeys"/>
    <error to="fail"/>
</action>
<action name="generateValidQueryKeys">
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${tbValidQuerysOutputPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>pig.tmpfilecompression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.tmpfilecompression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.map.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.map.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>mapred.compress.map.output</name>
                <value>true</value>
            </property>
        </configuration>
        <script>${pigDir}/tb_calc_valid_accounts.pig</script>
        <param>csvFilesDir=${csvFilesDir}</param>
        <param>step1JoinOutputPath=${step1JoinOutputPath}</param>
        <param>tbValidQuerysOutputPath=${tbValidQuerysOutputPath}</param>
        <param>piMinFAs=${piMinFAs}</param>
        <param>piMinAccounts=${piMinAccounts}</param>
        <param>parallel=80</param>
    </pig>
    <ok to="aggregateAumData"/>
    <error to="fail"/>
</action>
<action name="aggregateAumData">
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${tbCacheDataPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>pig.tmpfilecompression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.tmpfilecompression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.map.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.map.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>mapred.compress.map.output</name>
                <value>true</value>
            </property>
        </configuration>
        <script>${pigDir}/aggregationLogic.pig</script>
        <param>csvFilesDir=${csvFilesDir}</param>
        <param>tbValidQuerysOutputPath=${tbValidQuerysOutputPath}</param>
        <param>tbCacheDataPath=${tbCacheDataPath}</param>
        <param>currDate=${date}</param>
        <param>udfJarPath=${nameNode}${wfPath}/lib</param>
        <param>parallel=150</param>
      </pig>
    <ok to="loadDataToDB"/>
    <error to="fail"/>
</action>   
<kill name="fail">
    <message>WF Failure, 'wf:lastErrorNode()' failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

4

1 に答える 1

1

3 つの pig アクションを並行して実行していて、そのうちの 1 つが失敗したときに、同じエラーが発生しました。このメッセージ エラーは、1 つのアクションが失敗し、ワークフローが停止し、他のアクションが続行しようとしているため、ワークフローが予期せず停止した結果です。何が起こったかを知るには、ステータスがERRORの失敗したアクションを確認する必要があります。ステータスが KILLED のアクションは確認しません。

于 2013-07-11T19:44:47.650 に答える