4

スパウトに実際のデータを渡す方法を理解するのに苦労しています。たとえば:

次の 2 つのファイルがあります (正常に動作しています)。

#! /usr/bin/env python

import os, random, sys, time

for i in xrange(50):
    print("%s\t%s"%(os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.randint(0,5))

#! /usr/bin/env python

from __future__ import print_function
from select import select
from subprocess import Popen,PIPE

p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True) 

timeout = 0.1 # seconds
while p:
    # remove finished processes from the list 
    if p.poll() is not None: # process ended
        print(p.stdout.read(), end='') # read the rest
        p.stdout.close()
        processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

将来の処理のためにこれらのランダムな行をスパウトに渡したいと想像してください。私はこれを試していました:

class TwitterSpout(storm.Spout):

    def initialize(self, conf, context):
        self.pid = os.getpid()
        try:
            self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True)
        except OSError, e:
            self.log('%s'%e)
            sys.exit(1)

そして nextTuple() より:

def nextTuple(self):
    timeout = 0.1 # seconds
    while self.p:
        # remove finished processes from the list 
        if self.p.poll() is not None: # process ended
        self.log ("%s"%self.p.stdout.read()) # read the rest
        self.p.stdout.close()
        processes.remove(self.p)

        # wait until there is something to read
        rlist = select([self.p.stdout], [],[], timeout)[0]

        # read a line from each process that has output ready
        for f in rlist:
        self.log ("%s%s"%f.readline()) #NOTE: it can block
        msgId = random.randint(0,500)
        self.log('MSG IN SPOUT %s\n'%msgId)
        storm.emit([f.readline()], id=msgId)

しかし、この構造は機能しません。常にエラーが発生"Pipi seems to be broken..."するか、このコードのさまざまなバリエーションを試すと、プロセスがブロックされ、Storm が NextTuple をリッチにすることはありません。私の問題を解決するのを手伝ってください。または、誰かが同様のことを行う方法の例、またはアドバイスを教えてください。ありがとうございました

4

1 に答える 1