rq モジュールで生成されたキューを使用して、Flask サーバーへのポスト リクエストをテストしようとしています。Rq のキューは、この奇妙なエラーを表示し続けます:
Traceback (most recent call last):
File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/test2.py", line 14, in <module>
result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/transmitter.py", line 23, in testQueue
result = globe.queue.enqueue(testPost(address, 0), args=[])
File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 500, in enqueue
on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 466, in parse_args
if not isinstance(f, string_types) and f.__module__ == '__main__':
AttributeError: 'tuple' object has no attribute '__module__'
このエラーについてお役に立てれば幸いです。これらは私のファイルです。
import requests
import globalVars as globe
import queue
from threading import Thread
import redis
from rq import Queue
import time
#DEFAULT_PORT = 8095
def testPost(address, payloadNum):
payload = {'test' : payloadNum}
response = requests.post(address,data = payload)
#Return true, if received: HTTP status code < 400
#Return the HTTP status code for the response
#Return the reponse in JSON format
return(response.ok, response.status_code, response.json())
def testQueue(address):
result = globe.queue.enqueue(testPost(address, 0), args=[])
def wait():
while not globe.ackQueue.empty(): #While the queue is NOT empty
pass
if __name__ == "__main__":
print('transmitter')
受信者サーバー.py:
from flask import Flask
from flask_restful import Api, Resource
import globalVars as globe
import redis
from rq import Queue
import queue
receiver = Flask(__name__)
api = Api(receiver)
def runReceiver():
receiver.run(debug=True, threaded=False)
class test(Resource):
def post(self):
return {'Test' : 'Passed!'} #Returns the response in JSON format
class testQueue(Resource):
def post(self):
return {'Test' : 'Passed!'}
#Listener Server list of resources:
api.add_resource(test, "/test")
api.add_resource(testQueue, "/testQueue")
if __name__ == "__main__":
runReceiver()
globalVars.py
import redis
from rq import Queue
r = redis.Redis()
queue = Queue(connection=r)
test2.py:
from flask import Flask
from flask_restful import Api, Resource
import transmitter as trans
import receiverServer as receiver
import globalVars as globe
defAddress = 'https://httpbin.org/post' #httpbin is a website aimed for testing HTTP requests
baseReceiverAddress = 'http://127.0.0.1:5000'
if __name__ == "__main__":
result1 = trans.testPost(baseReceiverAddress + '/test', 0)
print(result1)
result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
print(result2)
testPost 関数を何度か変更しようとしましたが、エラーが表示され続けます... この問題を解決するのを手伝ってください!