我是 AMQP 的初学者,我在将消息从发布者发送到消费者时遇到了问题。我正在使用 rabbitmq。发布者似乎正在发布请求,但是,请求并没有进入消费者。我可以在 rabbitmq ui 中看到,消息卡在队列中,如下所示。这两个发布者和消费者文件在不同的文件夹中。
amqp_setup.py
import pika
hostname = "localhost" # default hostname
port = 5672 # default port
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname, port=port,
heartbeat=3600, blocked_connection_timeout=3600, # these parameters to prolong the expiration time (in seconds) of the connection
))
channel = connection.channel()
exchangename="order_topic"
exchangetype="topic"
channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)
queue_name = 'Notification'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=exchangename, queue=queue_name, routing_key='*.notif')
def check_setup():
global connection, channel, hostname, port, exchangename, exchangetype
if not is_connection_open(connection):
connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=port, heartbeat=3600, blocked_connection_timeout=3600))
if channel.is_closed:
channel = connection.channel()
channel.exchange_declare(exchange=exchangename, exchange_type=exchangetype, durable=True)
def is_connection_open(connection):
try:
connection.process_data_events()
return True
except pika.exceptions.AMQPError as e:
print("AMQP Error:", e)
print("...creating a new connection.")
return False
queue.py(发布者)
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from datetime import datetime
import amqp_setup
import json
import requests
from invokes import invoke_http
import pika
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+mysqlconnector://root:root@localhost:8889/queue_database'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
CORS(app)
class Queue(db.Model):
user_id = db.Column(db.Integer, primary_key=True)
status = db.Column(db.Enum('waiting', 'serving'), nullable=False)
concert_id = db.Column(db.Integer, nullable=False, primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/waiting-queue/<int:user_id>/<int:concert_id>')
def waiting_queue(user_id, concert_id):
try:
user = Queue.query.filter_by(user_id=user_id, concert_id=concert_id).first()
if user is None:
return jsonify({'status': 'error', 'message': 'User not found'}), 404
if user.status == 'serving':
return jsonify({'queue_position': 0, 'status': 'serving'}), 200
waiting_count = Queue.query.filter(Queue.status == 'waiting', Queue.concert_id == user.concert_id, Queue.created_at < user.created_at).count()
if waiting_count == 3:
print("waiting...")
sendNotif(user_id)
return jsonify({'queue_position': waiting_count + 1, 'status': 'waiting'}), 200
except Exception as e:
return jsonify({'status': 'error', 'message': "Error occurred when getting user queue position: " + str(e)}), 500
def sendNotif(user_id):
# 2. calls notification
print('\n-----Invoking notification microservice-----')
print('\n\n-----Publishing the notif message with routing_key=queue.notif-----')
# notif_result = invoke_http('http://127.0.0.1:5100/sendQueueNotification/' + user_id, method='POST', json=user_id)
message = json.dumps(user_id)
amqp_setup.channel.basic_publish(exchange=amqp_setup.exchangename, routing_key="queue.notif",
body=message, properties=pika.BasicProperties(delivery_mode = 2))
print("\nRequest published to RabbitMQ Exchange.\n")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5009, debug=True)
notification.py(消费者)
from flask import Flask, jsonify
from flask_cors import CORS
from twilio.rest import Client
import os
import requests
import json
import amqp_setup
monitorBindingKey='*.notif'
app = Flask(__name__)
CORS(app)
user_url = "http://127.0.0.1:5000/user/phoneNum/"
# Twilio account credentials
TWILIO_ACCOUNT_SID = "ACb73a42a689c04ad6bf175a645cfa9282"
TWILIO_AUTH_TOKEN = "72769e6ae2bb619d91fd600733634fbb"
TWILIO_PHONE_NUMBER = "+15178269570"
# send user reminder when they are 3 places away from the seat selection page
def send_notif_queue(user_id):
try:
result = invoke_http(user_url + user_id, method='GET')
code = result['code']
phone_num = result['phone_num']
client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
if code in (200, 300):
message = client.messages.create(
to="+65" + str(phone_num),
from_=TWILIO_PHONE_NUMBER,
body="You are currently 3 places away from the Seat Selection Page! "
"\nDo take note that you will have 10 mins to select your seats after entering!"
)
return jsonify({"code": 200, "message": "Notification is sent"})
else:
return jsonify({"code": 404, "message": "Notification is not found"})
except Exception as e:
return jsonify({"code": 500, "message": "Failed to send notification: " + str(e)})
def recieveQueue():
amqp_setup.check_setup()
print("entered notif")
queue_name = 'Notification'
# set up a consumer and start to wait for coming messages
amqp_setup.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
amqp_setup.channel.start_consuming() # an implicit loop waiting to receive messages;
#it doesn't exit by default. Use Ctrl+C in the command window to terminate it.
def callback(channel, method, properties, body): # required signature for the callback; no return
print("\nReceived an order log by " + __file__)
channel.basic_ack(delivery_tag=method.delivery_tag)
send_notif_queue(json.loads(body))
print() # print a new line feed
if __name__ == '__main__':
print("This is flask " + os.path.basename(__file__) + " for sending a notification...")
app.run(debug=True, port=5100)