0

私は最近 Disco Project を見つけ、Hadoop と比べてとても気に入っていますが、問題があります。私のプロジェクトは次のように設定されています(実際のコードを切り取って貼り付けてください):

myfile.py

from disco.core import Job, result_iterator
import collections, sys
from disco.worker.classic.func import chain_reader
from disco.worker.classic.worker import Params

def helper1():
   #do stuff

def helper2():
   #do stuff
.
.
.
def helperN():
   #do stuff

class A(Job):
   @staticmethod
   def map_reader(fd, params):
      #Read input file
      yield line

   def map(self, line, params):
      #Process lines into dictionary
      #Iterate dictionary
          yield k, v

   def reduce(self, iter, out, params):
      #iterate iter
      #Process k, v into dictionary, aggregating values
      #Process dictionry
      #Iterate dictionary
         out.add(k,v)

Class B(Job):

   map_reader = staticmethod(chain_reader)
   map = staticmethod(nop_map)

   reduce(self, iter, out, params):
      #Process iter
      #iterate results
         out.add(k,v)


if __name__ == '__main__':
   from myfile import A, B
   job1 = A().run(input=[input_filename], params=Params(k=k))
   job2 = B().run(input=[job1.wait()], params=Params(k=k))
   with open(output_filename, 'w') as fp:
        for count, line in result_iterator(job2.wait(show=True)):
            fp.write(str(count) + ',' + line + '\n')

私の問題は、ジョブ フローが A の reduce を完全にスキップし、B の reduce に移行することです。

ここで何が起こっているのですか?

4

1 に答える 1

0

これは簡単ですが微妙な問題でした。

show = True

ジョブ 1 の場合。なんらかの理由で、job2 に show を設定すると、job1 の map() と map-shuffle() のステップが表示されていたので、期待していた最終結果が得られず、job2 関数の 1 つへの入力が間違っているように見えるためです。 、私は、job1 のステップが適切に実行されていないという結論に飛びつきました (これは、job2 を追加する前に、job1 の出力の正確性を検証したことがさらに裏付けられました)。

于 2016-01-02T20:26:25.160 に答える