私は、テキスト内の単語の共起に基づいて大きなナレッジベースを構築する必要があるプロジェクトに取り組んでいます。私が調べた限りでは、同様のアプローチは appengine では試みられていません。私は appengine の柔軟性とスケーラビリティを利用して、ナレッジベースを提供し、幅広いユーザーに推論できるようにしたいと考えています。
これまでのところ、パイプライン用のデモ アプリに基づいて mapreduce の実装を考え出しました。ソース テキストは、1 つの xml ドキュメントを含む zip ファイルとしてブロブストアに保存されます。各ファイルには可変数の記事 (最大 30000) が含まれます。
最初のステップは、現在の を適応させてBlobstoreZipLineInputReader
、xml ファイルを解析し、そこから関連情報を取得することでした。XMLParser クラスは、lxml iterparse アプローチを使用して、http://www.ibm.com/developerworks/xml/library/x-hiperfparse/から処理する xml 要素を取得し、反復子を返します。
変更されたクラスBlobstoreXMLZipLineInputReader
には、わずかに異なるnext
機能があります。
def next(self):
if not self._filestream:
if not self._zip:
self._zip = zipfile.ZipFile(self._reader(self._blob_key))
self._entries = self._zip.infolist()[self._start_file_index:
self._end_file_index]
self._entries.reverse()
if not self._entries:
raise StopIteration()
entry = self._entries.pop()
parser = XMLParser()
# the result here is an iterator with the individual articles
self._filestream = parser.parseXML(self._zip.open(entry.filename))
try:
article = self._filestream.next()
self._article_index += 1
except StopIteration:
article = None
if not article:
self._filestream.close()
self._filestream = None
self._start_file_index += 1
self._initial_offset = 0
return self.next()
return ((self._blob_key, self._start_file_index, self._article_index),
article)
map 関数は、これらの各記事を受け取り、文ごとに分割してから、単語ごとに分割します。
def map_function(data):
"""Word count map function."""
(entry, article) = data
for s in split_into_sentences(article.body):
for w in split_into_words(s.lower()):
if w not in STOPWORDS:
yield (w, article.id)
レデューサーは単語を集約し、単語が表示される記事の ID を結合します。
def reduce_function(key, values):
"""Word count reduce function."""
yield "%s: %s\n" % (key, list(set(values)))
これは、開発サーバーとライブ セットアップの両方で、約 10000 のテキスト (それらにはそれほど多くの単語はありません) まで美しく機能します。通常、10 秒もかかりません。問題は、それを少し超えると、mapreduce がジョブの処理を継続的にハングアップするように見えることです。シャードごとに処理されるアイテムの数が増えるだけで、すぐに書き込み操作の制限に達します。
Q1. mapreduce パイプラインが「悪い動作」を開始する前に実行できるマップ操作の数に何らかの制限がありますか?
Q2. 私の問題に対するより良いアプローチはありますか?
Q3. これは以前にも尋ねられたことは知っていますが、一時的な mapreduce データストアへの書き込みを回避できますか? 彼らは私を殺している...
PS: これが私の主な mapreduce 呼び出しです:
class XMLArticlePipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
output = yield mapreduce_pipeline.MapreducePipeline(
"process_xml",
"backend.build_knowledgebase.map_function",
"backend.build_knowledgebase.reduce_function",
"backend.build_knowledgebase.BlobstoreXMLZipLineInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_keys": [blobkey],
},
reducer_params={
"mime_type": "text/plain",
},
shards=12)
yield StoreOutput(filekey, output)
EDIT .:終わりのないジョブを実行すると、開発サーバーで奇妙なエラーが発生します。
[App Instance] [0] [dev_appserver_multiprocess.py:821] INFO Exception in HandleRequestThread
Traceback (most recent call last):
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 819, in run
HandleRequestDirectly(request, client_address)
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver_multiprocess.py", line 957, in HandleRequestDirectly
HttpServer(), request, client_address)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 310, in process_request
self.finish_request(request, client_address)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 323, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/Applications/GoogleAppEngineLauncher.app/Contents/Resources/GoogleAppEngine-default.bundle/Contents/Resources/google_appengine/google/appengine/tools/dev_appserver.py", line 2579, in __init__
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 641, in __init__
self.finish()
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/SocketServer.py", line 694, in finish
self.wfile.flush()
File "/usr/local/Cellar/python/2.7.2/lib/python2.7/socket.py", line 303, in flush
self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe