通过发布者RabbitMQ(红宝石兔子)保持发布数据

问题描述 投票:0回答:1

我有一个RabbitMQ,其中的bunny使用Consumer.rb和Publisher.rb。如果我运行

    ruby consumer.rb

然后

    publisher.rb 

我得到正在发布的数据(数据来自另一个类)。当我运行Publisher.rb时,数据在ruby Consumer.rb上打印如下:

Test New Data
1142.5186392493372

我需要这两个脚本不断运行,以便发布者继续通过推送数据(instrument.value),而我会连续看到上述输出。我尝试使用Daemon制作新的服务器文件并运行ruby server.rb start。但这确实会继续运行Publisher.rb,但不会一直运行,因此它会发布数据。另外,如何使RabbitMQ ruby​​应用程序将数据发布到另一个基于Web的Rails应用程序?

    require 'daemons'

    Daemons.run('publisher.rb')

consumer.rb

require 'bunny'
require 'daemons'

class BunnyConsumer < Bunny::Consumer

    # Connect to RabbitMQ running on localhost 
    conn = Bunny.new
    conn.start

    channel = conn.create_channel

    q = channel.queue("queue")

    channel_consumer = BunnyConsumer.new(channel, q, "the_consumer")


    channel_consumer.on_delivery do |delivery_info, metadata, payload| 
        puts payload
    end

    q.subscribe_with(channel_consumer)

    exchange = channel.default_exchange

    exchange.publish('Test New Data', routing_key: "queue")

    sleep 1000

    Daemons.run('bunny_consumer.rb')

end 

publisher.rb

require 'bunny'
require_relative 'instrument_data.rb'

class Publisher 

    instrument = InstrumentData.new

    connection = Bunny.new
    connection.start

    channel = connection.create_channel

    q = channel.queue("testthequeue") 

    exchange = channel.default_exchange

    begin 
        puts ' [*] Waiting for messages.'
        q.subscribe do |delivery_info, properties, payload|
            puts "Received: #{payload}"
        end

        rescue Interrupt => _
            connection.close 

            exit(0)
    end 

    exchange.publish("#{instrument.value}", routing_key: "testthequeue", persistent: true)

    sleep 1000

    connection.close

end

server.rb

require 'daemons'

Daemons.run('publisher.rb')

谢谢

ruby-on-rails ruby rabbitmq daemons bunny
1个回答
0
投票
i = 0 while i < 1 data = { weight: @devise.value, pressure: @devise.value } m = data.to_json; @pub.publish(m, routing_key: @q) // @pub: Publisher class end

在Publisher类中:

def publish(data, q)
    exchange.publish(data, routing_key: "queue_name") //exchange another method

    sleep(5) //Publish every 5 seconds

    connection.close
end
© www.soinside.com 2019 - 2024. All rights reserved.