Flask-socketIO + Kafka作为后台进程

问题描述 投票:0回答:1

我想做什么

我有一个用Flask编写的HTTP API服务,这是用于构建不同服务实例的模板。因此,此模板需要通用化,以处理包含和不包含Kafka使用情况的用例。

我的目标是在API模板的后台中运行可选 Kafka使用者。我希望任何需要它的服务都能够异步地从Kafka主题读取数据,同时也像通常那样独立地响应HTTP请求。这两个过程(消耗Kafka,HTTP请求处理)无关,只是它们将在同一服务的幕后发生。

我写的内容

这是我的设置:

# ./create_app.py

from flask_socketio import SocketIO

socketio = None

def create_app(kafka_consumer_too=False):
   """
   Return a Flask app object, with or without a Kafka-ready SocketIO object as well
   """
   app = Flask('my_service')
   app.register_blueprint(special_http_handling_blueprint)

   if kafka_consumer_too:
      global socketio
      socketio = SocketIO(app=app, message_queue='kafka://localhost:9092', channel='some_topic')
      from .blueprints import kafka_consumption_blueprint
      app.register_blueprint(kafka_consumption_blueprint)

      return app, socketio

  return app

我的run.py是:

# ./run.py
from . import create_app
app, socketio = create_app(kafka_consumer_too=True)
if __name__=="__main__":
  socketio.run(app, debug=True)

这是我写的卡夫卡消费蓝图,这是我认为应该处理流事件的地方:

# ./blueprints/kafka_consumption_blueprint.py

from ..create_app import socketio

kafka_consumption_blueprint = Blueprint('kafka_consumption', __name__)

@socketio.on('message')
def handle_message(message):
    print('received message: ' + message)

目前正在做什么

通过以上所述,当我卷曲localhost:5000时,我的HTTP请求得到了很好的处理。问题是,当我写some_topic Kafka主题(在端口9092上)时,什么都没有显示。我有一个在另一个外壳中运行的CLI Kafka使用者,并且可以看到我正在发送有关该主题are的消息。因此,是Flask应用没有反应:handle_message()没有使用任何消息

我在这里想念什么?预先感谢。

flask-socketio
1个回答
0
投票

我认为您误解了message_queue参数的含义。

当您有多个服务器实例时,将使用此参数。这些实例通过配置的消息队列相互通信。此队列是100%内部的队列,您作为库的用户无法使用消息队列执行任何操作。

如果您想构建某种发布/订阅机制,则必须在应用程序中实现该监听器。

© www.soinside.com 2019 - 2024. All rights reserved.