3 クラスタ マシンで celery + rabbitmq をセットアップしました。ファイルのデータに基づいて正規表現を生成し、その情報を使用してテキストを解析するタスクも作成しました。
from celery import Celery
celery = Celery('tasks', broker='amqp://localhost//')
import re
@celery.task
def add(x, y):
return x + y
def get_regular_expression():
with open("text") as fp:
data = fp.readlines()
str_re = "|".join([x.split()[2] for x in data ])
return str_re
@celery.task
def analyse_json(tw):
str_re = get_regular_expression()
re.match(str_re,tw.text)
次のPythonコードを使用して、このタスクを非常に簡単に呼び出すことができます:-
from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x)
ただし、今は Python ではなく Java から同じ呼び出しを行いたいと考えています。同じことを行う最も簡単な方法が何であるかはわかりません。
AMQP ブローカーにメッセージを送信するために、このコードを作成しました。コードは正常に実行されますが、タスクは実行されません。実行するタスクの名前を指定する方法がわかりません。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
class try1 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "celery", "celery");
String messageBody = "{\"text\":\"i am good\"}" ;
byte[] msgBytes = messageBody.getBytes("ASCII") ;
channel.basicPublish(queueName, queueName,
new AMQP.BasicProperties
("application/json", null, null, null,
null, null, null, null,
null, null, null, "guest",
null, null),messageBody.getBytes("ASCII")) ;
connection.close();
} }
これは rabbitMq のエラーログの出力です:-
connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}
どんな助けでも大歓迎です。
ありがとう、アミット