0

rq モジュールで生成されたキューを使用して、Flask サーバーへのポスト リクエストをテストしようとしています。Rq のキューは、この奇妙なエラーを表示し続けます:

Traceback (most recent call last):
  File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/test2.py", line 14, in <module>
    result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
  File "/home/dor/Documents/workspace/NErlNet/src_py/apiServerNew/TotallyNew/transmitter.py", line 23, in testQueue
    result = globe.queue.enqueue(testPost(address, 0), args=[])
  File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 500, in enqueue
    on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
  File "/home/dor/anaconda3/lib/python3.8/site-packages/rq/queue.py", line 466, in parse_args
    if not isinstance(f, string_types) and f.__module__ == '__main__':
AttributeError: 'tuple' object has no attribute '__module__'

このエラーについてお役に立てれば幸いです。これらは私のファイルです。

import requests
import globalVars as globe
import queue
from threading import Thread
import redis
from rq import Queue
import time

#DEFAULT_PORT = 8095

def testPost(address, payloadNum):
    payload = {'test' : payloadNum}
    response = requests.post(address,data = payload)
    #Return true, if received: HTTP status code < 400
    #Return the HTTP status code for the response
    #Return the reponse in JSON format
    return(response.ok, response.status_code, response.json())

def testQueue(address):
        result = globe.queue.enqueue(testPost(address, 0), args=[])

def wait():
    while not globe.ackQueue.empty(): #While the queue is NOT empty
        pass

if __name__ == "__main__":
    print('transmitter')

受信者サーバー.py:

from flask import Flask
from flask_restful import Api, Resource
import globalVars as globe
import redis
from rq import Queue
import queue

receiver = Flask(__name__)
api = Api(receiver)

def runReceiver():
    receiver.run(debug=True, threaded=False)

class test(Resource):
    def post(self):
        return {'Test' : 'Passed!'} #Returns the response in JSON format

class testQueue(Resource):
    def post(self):
        return {'Test' : 'Passed!'} 

#Listener Server list of resources: 
api.add_resource(test, "/test")
api.add_resource(testQueue, "/testQueue")

if __name__ == "__main__":
    runReceiver()

globalVars.py

import redis
from rq import Queue

r = redis.Redis()
queue = Queue(connection=r)

test2.py:

from flask import Flask
from flask_restful import Api, Resource
import transmitter as trans
import receiverServer as receiver
import globalVars as globe

defAddress = 'https://httpbin.org/post' #httpbin is a website aimed for testing HTTP requests
baseReceiverAddress = 'http://127.0.0.1:5000'

if __name__ == "__main__":
    result1 = trans.testPost(baseReceiverAddress + '/test', 0)
    print(result1)
    
    result2 = trans.testQueue(baseReceiverAddress + '/testQueue')
    print(result2)

testPost 関数を何度か変更しようとしましたが、エラーが表示され続けます... この問題を解決するのを手伝ってください!

4

1 に答える 1