アンドロイドコード
public class androidconn extends Activity {
private rabbitmqclient mConsumer;
private TextView mOutput;
/** Called when the activity is first created. */
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.conn);
//The output TextView we'll use to display messages
mOutput = (TextView) findViewById(R.id.moutput);
//Create the consumer
mConsumer = new rabbitmqclient("10.0.2.2:5672",
"logs",
"fanout");
//Connect to broker
mConsumer.connectToRabbitMQ();
//register for messages
mConsumer.setOnReceiveMessageHandler(new OnReceiveMessageHandler(){
public void onReceiveMessage(byte[] message) {
String text = "";
try {
text = new String(message, "UTF");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
mOutput.append("\n"+text);
}
});
}
@Override
protected void onResume() {
super.onPause();
mConsumer.connectToRabbitMQ();
}
@Override
protected void onPause() {
super.onPause();
mConsumer.dispose();
}
}
rabbitmq コード
public abstract class rabbitmq {
public String mServer;
public String mExchange;
protected com.rabbitmq.client.Channel mModel = null;
protected Connection mConnection;
protected boolean Running ;
protected String MyExchangeType ;
/**
*
* @param server The server address
* @param exchange The named exchange
* @param exchangeType The exchange type name
* @return
*/
public rabbitmq(String server, String exchange, String exchangeType)
{
mServer = server;
mExchange = exchange;
MyExchangeType = exchangeType;
}
public void Dispose() throws SQLException
{
Running = false;
try {
if (mConnection!=null)
mConnection.close();
if (mModel != null)
mModel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/** * Connect to the broker and create the exchange
* @return success
*/
public boolean connectToRabbitMQ()
{
if(mModel!= null && mModel.isOpen() )//already declared
return true;
try
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(mServer);
connectionFactory.setPort(5672);
mConnection = (Connection) connectionFactory.newConnection();
mModel = ((com.rabbitmq.client.Connection) mConnection).createChannel();
mModel.exchangeDeclare(mExchange, MyExchangeType, true);
return true;
}
catch (Exception e)
{
e.printStackTrace();
return false;
}
}
}
rabbitmqクライアントコード
public class rabbitmqclient extends rabbitmq{
public rabbitmqclient(
String server,String exchange,String exchangeType) {
super(server,exchange,exchangeType);
}
//The Queue name for this consumer
private String mQueue;
private QueueingConsumer MySubscription;
//last message to post back
private byte[] mLastMessage;
// An interface to be implemented by an object that is interested in messages(listener)
public interface OnReceiveMessageHandler{
public void onReceiveMessage(byte[] message);
};
//A reference to the listener, we can only have one at a time(for now)
private OnReceiveMessageHandler mOnReceiveMessageHandler;
/**
*
* Set the callback for received messages
* @param handler The callback
*/ public void setOnReceiveMessageHandler(OnReceiveMessageHandler handler)
{
mOnReceiveMessageHandler = handler;
};
private Handler mMessageHandler = new Handler();
private Handler mConsumeHandler = new Handler();
// Create runnable for posting back to main thread
final Runnable mReturnMessage = new Runnable() {
public void run() {
mOnReceiveMessageHandler.onReceiveMessage(mLastMessage);
}
};
final Runnable mConsumeRunner = new Runnable() {
public void run() {
Consume();
}
};
/**
* Create Exchange and then start consuming. A binding needs to be added before any messages will be delivered
*/
@Override
public boolean connectToRabbitMQ()
{
if(super.connectToRabbitMQ())
{
try {
mQueue = mModel.queueDeclare().getQueue();
MySubscription = new QueueingConsumer(mModel);
mModel.basicConsume(mQueue, false, MySubscription);
} catch (IOException e) {
e.printStackTrace();
return false;
}
if (MyExchangeType == "fanout")
AddBinding("");//fanout has default binding
Running = true;
mConsumeHandler.post(mConsumeRunner);
return true;
}
return false;
}
/**
* Add a binding between this consumers Queue and the Exchange with routingKey
* @param routingKey the binding key eg GOOG
*/
public void AddBinding(String routingKey)
{
try {
mModel.queueBind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Remove binding between this consumers Queue and the Exchange with routingKey
* @param routingKey the binding key eg GOOG
*/
public void RemoveBinding(String routingKey)
{
try {
mModel.queueUnbind(mQueue, mExchange, routingKey);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void Consume()
{
Thread thread = new Thread()
{
@Override
public void run() {
while(Running){
QueueingConsumer.Delivery delivery;
try {
delivery = MySubscription.nextDelivery();
mLastMessage = delivery.getBody();
mMessageHandler.post(mReturnMessage);
try {
mModel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
};
thread.start();
}
public void dispose(){
Running = false;
}
}
logcat は 07-24 22:57:45.412: D/SntpClient(59): request time failed: java.net.SocketException: Address family not supported by protocol 親切にエラーを教えてください