6

3 クラスタ マシンで celery + rabbitmq をセットアップしました。ファイルのデータに基づいて正規表現を生成し、その情報を使用してテキストを解析するタスクも作成しました。

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
     return x + y


def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    



@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 

次のPythonコードを使用して、このタスクを非常に簡単に呼び出すことができます:-

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 

ただし、今は Python ではなく Java から同じ呼び出しを行いたいと考えています。同じことを行う最も簡単な方法が何であるかはわかりません。

AMQP ブローカーにメッセージを送信するために、このコードを作成しました。コードは正常に実行されますが、タスクは実行されません。実行するタスクの名前を指定する方法がわかりません。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "celery", "celery");
    String messageBody = "{\"text\":\"i am good\"}" ;
    byte[] msgBytes = messageBody.getBytes("ASCII") ;
    channel.basicPublish(queueName, queueName,
            new AMQP.BasicProperties
            ("application/json", null, null, null,
                    null, null, null, null,
                    null, null, null, "guest",
                    null, null),messageBody.getBytes("ASCII")) ;
    connection.close();    

} }

これは rabbitMq のエラーログの出力です:-

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

どんな助けでも大歓迎です。

ありがとう、アミット

4

2 に答える 2

7

いくつかの問題がありました。

1) String queueName = channel.queueDeclare().getQueue() コマンドが間違ったキュー名を返していました。キュー名を「セロリ」に変更したところ、うまくいきました。2) json の形式は、次のタイプである必要があります。 "kwargs": {}、"再試行": 0、"eta": "2009-11-17T12:30:56.527191"}

http://docs.celeryproject.org/en/latest/internals/protocol.htmlに見られるように

これらの2つの変更の後、正常に機能しました。

-アミット

于 2014-01-03T20:54:08.460 に答える
0

celery は交換を暗黙的に宣言します。Java を使用すると、自分で交換を宣言する必要があります。

Java からの Django/Celery との相互運用を参照してください。

于 2013-12-31T02:55:01.717 に答える