スパウトに実際のデータを渡す方法を理解するのに苦労しています。たとえば:
次の 2 つのファイルがあります (正常に動作しています)。
#! /usr/bin/env python
import os, random, sys, time
for i in xrange(50):
print("%s\t%s"%(os.getpid(), i))
sys.stdout.flush()
time.sleep(random.randint(0,5))
と
#! /usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen,PIPE
p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
timeout = 0.1 # seconds
while p:
# remove finished processes from the list
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
将来の処理のためにこれらのランダムな行をスパウトに渡したいと想像してください。私はこれを試していました:
class TwitterSpout(storm.Spout):
def initialize(self, conf, context):
self.pid = os.getpid()
try:
self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
except OSError, e:
self.log('%s'%e)
sys.exit(1)
そして nextTuple() より:
def nextTuple(self):
timeout = 0.1 # seconds
while self.p:
# remove finished processes from the list
if self.p.poll() is not None: # process ended
self.log ("%s"%self.p.stdout.read()) # read the rest
self.p.stdout.close()
processes.remove(self.p)
# wait until there is something to read
rlist = select([self.p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
self.log ("%s%s"%f.readline()) #NOTE: it can block
msgId = random.randint(0,500)
self.log('MSG IN SPOUT %s\n'%msgId)
storm.emit([f.readline()], id=msgId)
しかし、この構造は機能しません。常にエラーが発生"Pipi seems to be broken..."
するか、このコードのさまざまなバリエーションを試すと、プロセスがブロックされ、Storm が NextTuple をリッチにすることはありません。私の問題を解決するのを手伝ってください。または、誰かが同様のことを行う方法の例、またはアドバイスを教えてください。ありがとうございました