我目前正在尝试使用 Kafka 和 FastAPI 并尝试构建一个模板,使我能够以微服务模式快速编写程序。
构建设计模式存储库,实现非常简单的微服务基础设施。这些示例应该只演示如何在不同服务之间发送消息,并让用户轻松集成他们的自定义代码,而无需花费大量时间进行设置。
我搜索了很多,但找不到简单的例子。大多数示例都是高度定制的,并没有真正概括。
如果您有任何其他建议,请告诉我。我对微服务架构还很陌生,很乐意探索进一步的设计。
我当前的模板涉及构建 Zookeeper、Kafka、消费者和生产者服务。但是,我遇到了一个问题,即我的消费者无法使用我的生产者生成的消息。生产者似乎工作正常并成功发布消息,我已使用
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning
命令确认。
我的消费者似乎什么都不做。
提前感谢您对此问题的所有建议。
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
ports:
- '9092:9092'
- '29092:29092'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=my-topic:1:1
- KAFKA_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
depends_on:
- zookeeper
networks:
- kafka-net
producer:
build: ./producer
ports:
- '8000:8000'
environment:
- KAFKA_SERVERS=kafka:9092
depends_on:
- kafka
networks:
- kafka-net
consumer:
build: ./consumer
environment:
- KAFKA_SERVERS=kafka:9092
depends_on:
- kafka
networks:
- kafka-net
networks:
kafka-net:
FROM python:3.8-slim-buster
COPY . /app
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
kafka-python
import json
from kafka import KafkaProducer
from fastapi import FastAPI
app = FastAPI()
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
@app.post("/produce")
async def produce(data: dict):
producer.send('my-topic', value=data)
producer.flush()
return {"status": "success"}
FROM python:3.8-slim-buster
COPY . /app
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
CMD [ "python", "main.py" ]
fastapi
uvicorn
kafka-python
import asyncio
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['kafka:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
api_version=(0, 11, 5),
)
async def consume_messages():
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout_ms=1000, max_records=1)
if msg:
for topic_partition, messages in msg.items():
for message in messages:
print(f"Received message: {message.value}")
except Exception as e:
print(f"Exception occurred while consuming messages: {e}")
async def startup():
asyncio.create_task(consume_messages())
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
loop.create_task(startup())
print("Starting consumer...")
loop.run_forever()
except Exception as e:
print(f"Exception occurred: {e}")
docker-compose up
curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'
我多次尝试重写消费者。更改端口和 docker 组合配置。不幸的是,我无法确定我的问题。