Kafka + FastAPI + Docker 模板

问题描述 投票:0回答:0

简介

我目前正在尝试使用 Kafka 和 FastAPI 并尝试构建一个模板,使我能够以微服务模式快速编写程序。

目标-愿景

构建设计模式存储库,实现非常简单的微服务基础设施。这些示例应该只演示如何在不同服务之间发送消息,并让用户轻松集成他们的自定义代码,而无需花费大量时间进行设置。

动机

我搜索了很多,但找不到简单的例子。大多数示例都是高度定制的,并没有真正概括。

技术栈

  • 卡夫卡
  • FastApi
  • 码头工人

对其他实现开放

如果您有任何其他建议,请告诉我。我对微服务架构还很陌生,很乐意探索进一步的设计。

当前问题

我当前的模板涉及构建 Zookeeper、Kafka、消费者和生产者服务。但是,我遇到了一个问题,即我的消费者无法使用我的生产者生成的消息。生产者似乎工作正常并成功发布消息,我已使用

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning
命令确认。

我的消费者似乎什么都不做。

提前感谢您对此问题的所有建议。

我的文件夹结构:

我的 docker-compose 文件:

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - kafka-net

  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=my-topic:1:1
      - KAFKA_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
      - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
    depends_on:
      - zookeeper
    networks:
      - kafka-net

  producer:
    build: ./producer
    ports:
      - '8000:8000'
    environment:
      - KAFKA_SERVERS=kafka:9092
    depends_on:
      - kafka
    networks:
      - kafka-net

  consumer:
    build: ./consumer
    environment:
      - KAFKA_SERVERS=kafka:9092
    depends_on:
      - kafka
    networks:
      - kafka-net

networks:
  kafka-net:

我的生产者 docker 文件:

FROM python:3.8-slim-buster


COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

我的制作人请求文件:

kafka-python

我的制作人 main.py:

import json
from kafka import KafkaProducer
from fastapi import FastAPI


app = FastAPI()

producer = KafkaProducer(bootstrap_servers=['kafka:9092'],api_version=(0,11,5),
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))

@app.post("/produce")
async def produce(data: dict):
    producer.send('my-topic', value=data)
    producer.flush()
    return {"status": "success"}

我的消费者 docker 文件:

FROM python:3.8-slim-buster

COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt



CMD [ "python", "main.py" ]

我的消费者需求文件:

fastapi
uvicorn
kafka-python

我的消费者 main.py:

import asyncio
from kafka import KafkaConsumer


consumer = KafkaConsumer(
    bootstrap_servers=['kafka:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    api_version=(0, 11, 5),
)

async def consume_messages():
    consumer.subscribe(['my-topic'])

    try:
        while True:
            msg = consumer.poll(timeout_ms=1000, max_records=1)
            if msg:
                for topic_partition, messages in msg.items():
                    for message in messages:
                        print(f"Received message: {message.value}")
    except Exception as e:
        print(f"Exception occurred while consuming messages: {e}")

async def startup():
    asyncio.create_task(consume_messages())


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        loop.create_task(startup())
        print("Starting consumer...")
        loop.run_forever()
    except Exception as e:
        print(f"Exception occurred: {e}")

通过以下方式构建系统:

docker-compose up

你可以用这个 curl 激活生产者:

 curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'

我多次尝试重写消费者。更改端口和 docker 组合配置。不幸的是,我无法确定我的问题。

docker apache-kafka fastapi consumer kafka-python
© www.soinside.com 2019 - 2024. All rights reserved.