マッピングしている RxPy ストリームのハンドラーとして使用できる関数を構築しようとしています。私が持っている関数は、その変数が定義されているスコープ外の変数にアクセスする必要があります。これは、私にとって、ある種のクロージャーを使用する必要があることを意味します。そこで、functools.partial に到達して、1 つの変数を閉じ、ストリームにオブザーバーとして渡すことができる部分関数を返します。
ただし、これを行うと、次の結果になります。
Traceback (most recent call last):
File "retry/example.py", line 46, in <module>
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
selector = adapt_call(selector)
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
argnames, varargs, kwargs = getargspec(func)[:3]
File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function
問題を再現するサンプル コードを次に示します。
from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools
logger = logging.basicConfig()
def puts(thing):
print thing
def message_stream(consumer):
def thing(observer):
for message in consumer:
observer.on_next(message)
return Observable.create(thing)
def message_handler(message, context=None):
def req():
return requests.get('http://httpbin.org/get')
return Observable.start(req)
def handle_response(message, response, context=None):
consumer = context['consumer']
producer = context['producer']
t = 'even' if message % 2 == 0 else 'odd'
return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer
consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
'consumer': consumer,
'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)
False
問題は、呼び出し時に部分関数が返されることのようinspect.isfunction
です。
部分関数をこのチェックにパスさせるにはどうすればよいですか? 部分関数を「実際の」関数型に簡単に変換する方法はありますか?