multiprocessing.Pool を使用して、いくつかの CypherQuery() を並行して実行しようとしています。
neo4j.CypherQuery() 非並列を実行すると、正常に動作します。multiprocessing.Pool で neo4j.CypherQuery() を 1 つだけ実行すると、正常に動作します。2 つ以上の neo4j.CypherQuery() プロセスを開始するとすぐに、以下のエラー メッセージで失敗します。
from mulitprocessing import Pool
from py2neo import neo4j
pool = Pool(processes=4)
db = neo4j.GraphDatabaseService("http://localhost:7474/db/data/")
def cypher_query(db):
try:
# very simple cypher query
query_string = "MATCH (n:Label) RETURN n.name, n"
query = neo4j.CypherQuery(db, query_string)
result = query.execute()
return_dict = {}
for r in result:
return_dict[r[0]] = r[1]
return return_dict
except:
# print stack trace
print('%s' % (traceback.format_exc()))
result1 = pool.apply_async(cypher_query, [db])
result2 = pool.apply_async(cypher_query, [db])
# close pool and wait for all processes to finish
pool.close()
pool.join()
# here I would collect results, something fails before
result1.get()
result2.get()
エラーメッセージ:
Traceback (most recent call last):
File "/path/to/my/script.py", line 237, in my_function
query = neo4j.CypherQuery(db, query_string)
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 976, in __init__
self._cypher = Resource(graph_db.__metadata__["cypher"])
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 320, in __metadata__
self.refresh()
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 342, in refresh
self._metadata = ResourceMetadata(self._get().content)
File "build/bdist.linux-x86_64/egg/py2neo/packages/httpstream/http.py", line 532, in content
elif self.is_text:
File "build/bdist.linux-x86_64/egg/py2neo/packages/httpstream/http.py", line 513, in is_text
return self.content_type.partition("/")[0] == "text"
AttributeError: 'NoneType' object has no attribute 'partition'
エラーメッセージがよくわかりません。さまざまな Cypher クエリと execute() と stream() の両方で試しましたが、常に失敗します。すべてのクエリは非並列で正常に実行されます。明らかに、関数の並列化を壊す何かが欠けていますが、それを解決する方法がわかりません。