タブ区切りのテキストファイルから解析するメッセージをRabbitMQに公開するPHPスクリプトが機能しています。私は文字通り、そのファイルから別のファイルに作業コードをコピーして貼り付け、取引所に公開されたメッセージを取得し、それらをjson_decodeしてデータベースに挿入するコンシューマーを確立したいと考えました。
PHP.netサイトからサンプルコードをコピーして貼り付けるすべての試み、さらにはSOの例でさえ、空白の白い画面とエラーメッセージなしで失敗し、php-fpmプロセスを強制終了します。
キューがバインドされない理由と、ここで何が問題になっているのか、何か考えはありますか?
- Nginx-> php-fpm
- PHP 5.3.x
- Macbook Pro(OSX Lion)
- RabbitMQ(librabbitmqおよびpecl amqpがインストールされています)
これが私が試した一例ですが、AMQPドキュメントでPHP.netとSOの例を試しましたが、どれも機能しません。正常に公開できますが、キューをバインドしようとすると失敗し、最終的にphp-fpmがロックされます。
<?php
// Report all PHP errors
error_reporting(E_ALL);
/*****************************************
* MQ settings
****************************************/
$mq = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'exchange' => 'gbus.user',
'routing_key' => 'gbus.test.mike',
);
/*****************************************
* Connect to queue
****************************************/
$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();
$ch = new AMQPChannel($conn);
// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('test-queue');
$q->bind($mq['exchange'],$mq['routing_key']);
?>
<br>
<font color="blue" face="arial" size="4">File Contents</font>
<hr>
<?php
while(true){
$msg=$q->get();
if ($msg['count']>-1){
echo "\n--------\n";
print_r($msg['msg']);
echo "\n--------\n";
}
sleep(1);
}
if (!$conn->disconnect()) {
throw new Exception('Could not disconnect');
}
?>
これは、キューに公開するために使用したものの例です。これを実行するたびに、RabbitMQコントロールパネルに20個の新しいメッセージが表示されます。テストでは20に制限していますが、ファイルには数万の行があります。
動作する公開コード:
<?php
/*****************************************
* MQ settings
****************************************/
$mq = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'exchange' => 'gbus.user',
'routing_key' => 'gbus.test.mike',
);
/*****************************************
* Connect to queue
****************************************/
$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();
$ch = new AMQPChannel($conn);
$ex = new AMQPExchange($ch);
$ex->setName($mq['exchange']);
/*****************************************
* Parse the file
****************************************/
$filename = "/tmp/Users.txt";
$board = "test";
$fd = fopen ($filename, "r");
$contents = fread ($fd,filesize ($filename));
fclose ($fd);
$delimiter = "\r\n";
$rows = explode($delimiter, $contents);
$counter = 0;
?>
<br>
<font color="blue" face="arial" size="4">File Rows (first 20)</font>
<hr>
<?php
foreach ( $rows as $row )
{
$counter++;
echo "<b>Row $counter: </b> $row<br>";
// build list columns
list($login_name, $pwd, $account_type, $access_level, $status, $first_name, $last_name, $agent_code) = explode("\t", $row);
// build assoc array for json
$user = array("domain"=>$board, "username"=>$login_name, "user_id"=>$agent_code, "password"=>$pwd, "first_name"=>$first_name, "last_name"=>$last_name);
// Publish a message to the exchange with a routing key
$ex->publish(json_encode($user), $mq['routing_key'], AMQP_NOPARAM, array("content_type"=>"application/data"));
if($counter == 20) {
break;
}
}
$ch->close();
$conn->close();
?>