Chrome アプリから RabbitMQ に接続するために使用できる STOMP ライブラリを構築しようとしています。Chrome 以外からの最初の実験はうまくいきました。ただし、Socket コードを chrome アプリ内から動作するように変換することはできません。以下のクライアント API コードを変更して、Chrome アプリ内から動作するようにすることはできますか?
1809 次
1 に答える
1
私の最初の実験は成功しました。クライアント側の dart ライブラリに完全な Socket API があり、Chrome アプリからそのようなコードを直接使用できるようになることを願っています。chrome の js ソケット機能を使用して Socket API を提供する努力はありますが、サーバー側のコードで使用できる dart.io パッケージの API ほどクリーンではありません。
これがコードです。これを使用するには、RabbitMQ をデプロイし、STOMP プラグインを有効にします。
import 'dart:io';
import 'dart:async';
void main() {
List<String> versions = ["1.2","1.1","1.0"];
String host = "127.0.0.1"; // localhost
int port = 61613; // rabbitmq default port
Socket.connect(host, 61613).then((connection) {
String hostpath="/";
String login = "guest"; // rabbitmq default login
String passcode = "guest"; // rabbitmq default passcode
stomp(connection, versions, hostpath, login, passcode); // stomp connect
connection
.transform(new StompTransformer())
.listen((frame) {
if(frame.headers.containsKey("ack")) {
ack(connection, frame.headers["ack"]);
}
dumpStompFrame(frame);
stdout.write("enter a message> ");
},
onDone: () { print("done"); },
onError: (e) { print (e); } );
subscribe(connection, "/queue/a", 1, "client", true);
stdin
.transform(new StringDecoder())
.transform(new LineTransformer())
.listen((line) {
send(connection, "/queue/a", line);
});
});
}
/*
* STOMP
* accept-version:1.0,1.1,2.0
* host:/
*
* ^@
*/
void stomp(Socket connection,
List<String> versions,
String hostpath,
String login, String passcode) {
connection.writeln("STOMP");
if(versions.length > 0) {
connection.writeln("accept-version:${versions.join(',')}");
}
connection.writeln("host:$hostpath");
connection.writeln("login:$login");
connection.writeln("passcode:$passcode");
connection.writeln();
connection.add([0x00]);
}
/*
* SUBSCRIBE
* id:0
* destination:/queue/foo
* ack:client
*
* ^@
*/
void subscribe(Socket connection,
String destination,
int id, String ack, bool persistent) {
connection.writeln("SUBSCRIBE");
connection.writeln("id:$id");
connection.writeln("destination:$destination");
connection.writeln("ack:$ack");
connection.writeln("persistent:$persistent");
connection.writeln();
connection.add([0x00]);
}
/*
* UNSUBSCRIBE
* id:0
*
* ^@
*/
void unsubscribe(Socket connection, int id) {
connection.writeln("UNSUBSCRIBE");
connection.writeln("id:$id");
connection.writeln();
connection.add([0x00]);
}
/*
* ACK
* id:12345
* transaction:tx1
*
* ^@
*/
void ack(Socket connection, String id, [String transaction]) {
connection.writeln("ACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* NACK
* id:12345
* transaction:tx1
*
* ^@
*/
void nack(Socket connection, String id, [String transaction]) {
connection.writeln("NACK");
connection.writeln("id:$id");
if(?transaction) {
connection.writeln("transaction:$transaction");
}
connection.writeln();
connection.add([0x00]);
}
/*
* BEGIN
* transaction:tx1
*
* ^@
*/
void begin(Socket connection, String transaction) {
connection.writeln("BEGIN");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* COMMIT
* transaction:tx1
*
* ^@
*/
void commit(Socket connection, String transaction) {
connection.writeln("COMMIT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* ABORT
* transaction:tx1
*
* ^@
*/
void abort(Socket connection, String transaction) {
connection.writeln("ABORT");
connection.writeln("transaction:$transaction");
connection.writeln();
connection.add([0x00]);
}
/*
* SEND
* destination:/queue/a
* content-type:text/plain
*
* hello queue a
* ^@
*/
void send(Socket connection, String queue, String message) {
connection.writeln("SEND");
connection.writeln("destination:$queue");
connection.writeln("content-type:text/plain");
connection.writeln();
connection.write(message);
connection.add([0x00]);
}
class StompServerFrame {
String frame;
Map<String, String> headers = new Map<String, String>();
List<int> body = new List<int>();
String toString() {
StringBuffer sb = new StringBuffer();
sb.writeln(frame);
for(String key in headers.keys) {
sb.writeln("$key=${headers[key]}");
}
sb.writeln(new String.fromCharCodes(body));
return sb.toString();
}
}
void dumpStompFrame(StompServerFrame frame) {
print("BEGIN STOMP FRAME DUMP");
print(frame.toString());
print("END STOMP FRAME DUMP");
}
class StompTransformer extends StreamEventTransformer<List<int>, StompServerFrame> {
List<String> serverFrames = ['CONNECTED', 'MESSAGE', 'RECEIPT', 'ERROR'];
String state = 'COMMAND'; // 'COMMAND', 'HEADERS', 'BODY'
List<int> token = new List<int>();
StompServerFrame stompServerFrame = new StompServerFrame();
StompTransformer() {}
int lastValue = -1;
void handleData(List<int> intList, EventSink<StompServerFrame> sink) {
for(int b in intList) {
switch(state) {
case 'COMMAND':
if(b == 0x0a) { // done with command
stompServerFrame.frame = new String.fromCharCodes(token);
state = 'HEADERS';
token.clear();
} else {
token.add(b);
}
lastValue = b;
break;
case 'HEADERS':
if(b == 0x0a && lastValue == 0x0a) { // done with all headers
state = 'BODY';
token.clear();
lastValue = -1;
} else if(b == 0x0a && lastValue != 0x0a) { // done with a header
String tokenString = new String.fromCharCodes(token);
List<String> tokenStringParts = tokenString.split(":");
if(tokenStringParts.length == 2) {
stompServerFrame.headers.putIfAbsent(tokenStringParts.elementAt(0),
() => tokenStringParts.elementAt(1));
} else {
// possible header format error
print("was here with $tokenString");
}
token.clear();
lastValue = b;
} else {
token.add(b);
lastValue = b;
}
break;
case 'BODY':
if(b == 0x00) { // done with body
sink.add(stompServerFrame);
stompServerFrame = new StompServerFrame();
state = 'COMMAND';
token.clear();
lastValue = -1;
} else {
stompServerFrame.body.add(b);
}
break;
default:
break;
}
}
}
}
于 2013-07-15T06:15:53.873 に答える