无法在Kafka消费者中显示消息

问题描述 投票:0回答:1

我正在阅读雅虎财经 API 以获取多个股票价格并将其发送到工作正常的 kafkaProducer。

代码如下

 import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
import threading
from confluent_kafka import Producer, Consumer

DELAY = 1.0  # Time between calls in seconds.

def get_stats(ticker):
   info = yf.Tickers(ticker).tickers[ticker].info
   print(f"{ticker} {info['currentPrice']}")
   p = Producer({'bootstrap.servers': 'localhost:9092'})
   p.produce(ticker, info['currentPrice'])
   p.flush()  


def call_API(event):
   ticker_list = ['DLF.NS','SBIN.NS']

   with ThreadPoolExecutor() as executor:
      executor.map(get_stats, ticker_list)

   event.set()  # Signal call has been made.

print('Running')
event = threading.Event()
while True:
   event.clear()
   timer = threading.Timer(DELAY, call_API, (event,))  # Call after delay.
   timer.start()
   event.wait()  # Blocks until event is set by another thread.

我想读取消费者中所有不起作用的主题的值

   from confluent_kafka import Consumer
   c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': "stock"})
   c.subscribe(['DLF.NS','SBIN.NS'])

   while True:
       msg = c.poll(1.0)

   if msg is None:
       continue
   if msg.error():
      print('Error: {}'.format(msg.error()))
      continue

   print('Received message on {}: {}'.format(msg.topic(), msg.value().decode('utf-8')))

我应该做哪些改变才能满足消费者中的所有主题数据

python apache-kafka
1个回答
0
投票

我建议从 kafka-python 安装 Kafka lib,并根据 其文档了解如何使用 KafkaConcumer

© www.soinside.com 2019 - 2024. All rights reserved.