AMQP 消息未被消费

问题描述 投票:0回答:0

我是 AMQP 的初学者,我在将消息从发布者发送到消费者时遇到了问题。我正在使用 rabbitmq。发布者似乎正在发布请求,但是,请求并没有进入消费者。我可以在 rabbitmq ui 中看到,消息卡在队列中,如下所示。这两个发布者和消费者文件在不同的文件夹中。

amqp_setup.py

import pika

hostname = "localhost" # default hostname
port = 5672 # default port
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host=hostname, port=port,
        heartbeat=3600, blocked_connection_timeout=3600, # these parameters to prolong the expiration time (in seconds) of the connection
))

channel = connection.channel()

exchangename="order_topic"
exchangetype="topic"
channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)


queue_name = 'Notification'
channel.queue_declare(queue=queue_name, durable=True)

channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='*.notif')

def check_setup():

    global connection, channel, hostname, port, exchangename, exchangetype

    if not is_connection_open(connection):
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=port, heartbeat=3600, blocked_connection_timeout=3600))
    if channel.is_closed:
        channel = connection.channel()
        channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)


def is_connection_open(connection):

    try:
        connection.process_data_events()
        return True
    except pika.exceptions.AMQPError as e:
        print("AMQP Error:", e)
        print("...creating a new connection.")
        return False

queue.py(发布者)

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from datetime import datetime
import amqp_setup
import json
import requests
from invokes import invoke_http
import pika


app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqlconnector://root:root@localhost:8889/queue_database'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

CORS(app)

class Queue(db.Model):
    user_id = db.Column(db.Integer, primary_key=True)
    status = db.Column(db.Enum('waiting', 'serving'), nullable=False)
    concert_id = db.Column(db.Integer, nullable=False, primary_key=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/waiting-queue/<int:user_id>/<int:concert_id>')
def waiting_queue(user_id, concert_id):
    try:
        user = Queue.query.filter_by(user_id=user_id, concert_id=concert_id).first()
        if user is None:
            return jsonify({'status': 'error', 'message': 'User not found'}), 404

        if user.status == 'serving':
            return jsonify({'queue_position': 0, 'status': 'serving'}), 200

        waiting_count = Queue.query.filter(Queue.status == 'waiting', Queue.concert_id == user.concert_id, Queue.created_at < user.created_at).count()
        
        if waiting_count == 3:
            print("waiting...")
            sendNotif(user_id)

        return jsonify({'queue_position': waiting_count + 1, 'status': 'waiting'}), 200

    except Exception as e:
        return jsonify({'status': 'error', 'message': "Error occurred when getting user queue position: " + str(e)}), 500
def sendNotif(user_id):
    # 2. calls notification 
    print('\n-----Invoking notification microservice-----')
    print('\n\n-----Publishing the notif message with routing_key=queue.notif-----')

    # notif_result = invoke_http('http://127.0.0.1:5100/sendQueueNotification/' + user_id, method='POST', json=user_id)
    
    message = json.dumps(user_id)

    amqp_setup.channel.basic_publish(exchange=amqp_setup.exchangename, routing_key="queue.notif", 
    body=message, properties=pika.BasicProperties(delivery_mode = 2)) 
    
    print("\nRequest published to RabbitMQ Exchange.\n")
    

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5009, debug=True)

notification.py(消费者)

from flask import Flask, jsonify
from flask_cors import CORS
from twilio.rest import Client
import os
import requests
import json
import amqp_setup


monitorBindingKey='*.notif'

app = Flask(__name__)
CORS(app)

user_url = "http://127.0.0.1:5000/user/phoneNum/"

# Twilio account credentials
TWILIO_ACCOUNT_SID = "ACb73a42a689c04ad6bf175a645cfa9282"
TWILIO_AUTH_TOKEN = "72769e6ae2bb619d91fd600733634fbb"
TWILIO_PHONE_NUMBER = "+15178269570"

# send user reminder when they are 3 places away from the seat selection page
def send_notif_queue(user_id):
    try:
        result = invoke_http(user_url + user_id, method='GET')
        code = result['code']
        phone_num = result['phone_num']

        client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)

        if code in (200, 300):
            message = client.messages.create(
                to="+65" + str(phone_num),
                from_=TWILIO_PHONE_NUMBER,
                body="You are currently 3 places away from the Seat Selection Page! "
                     "\nDo take note that you will have 10 mins to select your seats after entering!"
            )
            return jsonify({"code": 200, "message": "Notification is sent"})
        else:
            return jsonify({"code": 404, "message": "Notification is not found"})

    except Exception as e:
        return jsonify({"code": 500, "message": "Failed to send notification: " + str(e)})


def recieveQueue():
    amqp_setup.check_setup()
    print("entered notif")
    queue_name = 'Notification'

    # set up a consumer and start to wait for coming messages
    amqp_setup.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    amqp_setup.channel.start_consuming() # an implicit loop waiting to receive messages;
    #it doesn't exit by default. Use Ctrl+C in the command window to terminate it.

def callback(channel, method, properties, body): # required signature for the callback; no return
    print("\nReceived an order log by " + __file__)

    channel.basic_ack(delivery_tag=method.delivery_tag)
    send_notif_queue(json.loads(body))
    print() # print a new line feed

if __name__ == '__main__':
    print("This is flask " + os.path.basename(__file__) + " for sending a notification...")
    app.run(debug=True, port=5100)
python flask rabbitmq amqp
© www.soinside.com 2019 - 2024. All rights reserved.