現在、zeromq 3.2.2 (Linux 上) に取り組んでいます。C++ アプリと PHP アプリの間の IPC に使用しています。両方を次の「場所」に接続させました。
「ipc:///tmp/unibroker.5558.pipe」
==> (販売店への返信)
これは、C++ クライアントを C++ サーバーに接続しようとすると機能しますが、PHP から同じことをしようとすると、何も得られません! 私の最善の推測は、Linuxからのアクセス権を持つものだったので、chmod 777 /tmp/unibroker.5558.pipeを実行しました...しかし、それは役に立ちませんでした:-(
[編集] tcp を使用すると動作することを追加したいのですが、パフォーマンス テストには IPC を使用したいと考えています。
何か案は?
これは私のコードです:
function setCache( $topic, $value )
{
$this->pBackPort->send( "", ZMQ::MODE_SNDMORE );
$this->pBackPort->send( $topic, ZMQ::MODE_SNDMORE );
$this->pBackPort->send( "INT", ZMQ::MODE_SNDMORE );
$this->pBackPort->send( $value );
}
function changePort( $backPort, $frontPort )
{
$this->pFrontPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_SUB );
//$this->pFrontPort->connect('tcp://127.0.0.1:'.$frontPort);
echo 'ipc:///tmp/unibroker.'.$frontPort.'.pipe';
$this->pFrontPort->connect('ipc:///tmp/unibroker.'.$frontPort.'.pipe');
$this->pBackPort = new ZMQSocket($this->pContext, ZMQ::SOCKET_DEALER);
//$this->pBackPort->connect('tcp://127.0.0.1:'.$backPort);
$this->pFrontPort->connect('ipc:///tmp/unibroker.'.$backPort.'.pipe');
}
そして、これはバインディング側です:
UniBroker::UniBroker( unsigned short backPort, unsigned short frontPort )
: pBackPort( backPort ), pFrontPort( frontPort )/*, Interconnected( pInterconnected )*/
{
Version = new char[20];
int major, minor, patch;
zmq_version (&major, &minor, &patch);
sprintf (Version, "%d.%d.%d", major, minor, patch);
Neighbour = NULL;
evt = NULL;
pContext = zctx_new ();
pSync = zsocket_new (pContext, ZMQ_REP);
pBackend = zsocket_new (pContext, ZMQ_XPUB);
char connection[50];
#ifdef __LINUX__
sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pBackPort );
#else
sprintf(connection, "tcp://*:%d",pBackPort );
#endif;
zsocket_bind (pSync, connection);
#ifdef __LINUX__
sprintf(connection, "ipc:///tmp/unibroker.%d.pipe",pFrontPort );
#else
sprintf(connection, "tcp://*:%d",pFrontPort );
#endif
zsocket_bind (pBackend, connection);
int verbose = 1;
zmq_setsockopt( pBackend, ZMQ_XPUB_VERBOSE, &verbose, sizeof( int ) );
}
void UniBroker::setCache( string topic, string value, bool external )
{
pCache[topic] = value;
zstr_sendm(pSync, "");
zstr_sendm(pSync, topic.c_str() );
zstr_sendm(pSync, external ? "EXT" : "INT" );
zstr_send(pSync, value.c_str() );
}
void UniBroker::Run( int timeout )
{
zmq_pollitem_t items [] = {
{ pBackend, 0, ZMQ_POLLIN, 0 },
{ pSync, 0, ZMQ_POLLIN, 0 }
};
if (zmq_poll (items, 2, timeout) == -1)
{
return;
}
// Any new topic data we cache and then forward
if (items [1].revents & ZMQ_POLLIN)
{
char *topic = zstr_recv (pSync);
char *type = zstr_recv (pSync);
char *current = zstr_recv (pSync);
const char *previous = pCache[ topic ].c_str();
if( current && strcmp( current , previous ) != 0 )
{
pCache[ topic ] = current;
zstr_sendm (pBackend, topic);
zstr_sendm (pBackend, type);
zstr_send (pBackend, current);
if( Neighbour != NULL )
{
Neighbour->setCache(topic, current, true);
}
UniEvent *event = pEvents[topic];
if( event )
{
(*event).external = ( strcmp( "EXT", type ) == 0 );
(*event)( topic, current );
}
event = pEvents["*"];
if( event )
{
(*event).external = ( strcmp( "EXT", type ) == 0 );
(*event)( topic, current );
}
}
zstr_send(pSync, "");
}
if (items [0].revents & ZMQ_POLLIN)
{
zframe_t *frame = zframe_recv (pBackend);
if( frame )
{
// Event is one byte 0=unsub or 1=sub, followed by topic
byte *event = zframe_data (frame);
if (event [0] == 1) {
char *topic = new char[zframe_size(frame)+1];
memcpy (topic, event + 1, zframe_size (frame) - 1);
topic[zframe_size(frame)]=NULL;
printf("\t\tSubscribed for topic %s\r\n", topic);
if( strlen( topic ) == 0 )
{
map<string,string>::iterator it = pCache.begin();
while( it != pCache.end() )
{
const char *previous = (*it).second.c_str();
if (previous && strlen( previous ) > 0 ) {
zstr_sendm (pBackend, "%s", (*it).first.c_str());
zstr_sendm (pBackend, "%s", "INT");
zstr_send (pBackend, "%s", previous);
}
it++;
}
}
else
{
const char *previous = pCache[topic].c_str();
if (previous && strlen( previous ) > 0 ) {
zstr_sendm (pBackend, "%s", topic);
zstr_sendm (pBackend, "%s", "INT");
zstr_send (pBackend, "%s", previous);
}
}
delete (topic);
}
zframe_destroy (&frame);
}
}
}