Warm tip: This article is reproduced from serverfault.com, please click

node.js-通过websocket发送RabbitMQ消息

(node.js - Sending RabbitMQ messages via websockets)

发布于 2014-04-04 12:34:31

寻找一些代码示例来解决此问题:

想要编写一些代码(Python或Javascript)以充当RabbitMQ队列的订阅者,以便在接收到消息时将通过Websocket将消息广播到任何连接的客户端。

我看过Autobahn和node.js(使用“ amqp ”和“ ws ”),但无法按需运行。这是使用node.js的javascript服务器代码:-

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

使用此代码,我可以成功订阅Rabbit中的队列并接收发送到该队列的所有消息。同样,我可以将websocket客户端(例如浏览器)连接到服务器并发送/接收消息。但是...如何在指示的时间点将Rabbit队列消息的有效负载作为websocket消息发送(“现在如何通过WEBSOCKETS发送”)?我认为这与卡在错误的回调中有关,或者它们需要以某种方式嵌套...?

或者,如果可以在Python中简化此操作(通过Autobahn和pika),那就太好了。

谢谢 !

Questioner
bzo
Viewed
11
Gabriele Santomaggio 2014-11-05 15:08:33

实现系统的一种方法是将Python与Tornado结合使用

这是服务器:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread


    clients = []

    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()


    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)

    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)

        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)


    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")


application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])

if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()

    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

和这里的HTML页面:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>

    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {

              alert("<tr><td> your browser doesn't support web socket </td></tr>");

            return;
        }

      ws.onopen = function(evt) { alert("Connection open ...")};

      ws.onmessage = function(evt){
        alert(evt.data);
      };

      function closeConnect(){
          ws.close();
      }


  });
    </script>

</head>

<html>

因此,当你将消息发布到“ my_queue”时,该消息将重定向到所有连接的网页。

我希望它会有用

编辑**

在这里https://github.com/Gsantomaggio/rabbitmqexample 你可以找到完整的示例