我正在阅读雅虎财经 API 以获取多个股票价格并将其发送到工作正常的 kafkaProducer。
代码如下
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
import threading
from confluent_kafka import Producer, Consumer
DELAY = 1.0 # Time between calls in seconds.
def get_stats(ticker):
info = yf.Tickers(ticker).tickers[ticker].info
print(f"{ticker} {info['currentPrice']}")
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce(ticker, info['currentPrice'])
p.flush()
def call_API(event):
ticker_list = ['DLF.NS','SBIN.NS']
with ThreadPoolExecutor() as executor:
executor.map(get_stats, ticker_list)
event.set() # Signal call has been made.
print('Running')
event = threading.Event()
while True:
event.clear()
timer = threading.Timer(DELAY, call_API, (event,)) # Call after delay.
timer.start()
event.wait() # Blocks until event is set by another thread.
我想读取消费者中所有不起作用的主题的值
from confluent_kafka import Consumer
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': "stock"})
c.subscribe(['DLF.NS','SBIN.NS'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print('Error: {}'.format(msg.error()))
continue
print('Received message on {}: {}'.format(msg.topic(), msg.value().decode('utf-8')))
我应该做哪些改变才能满足消费者中的所有主题数据
我建议从 kafka-python 安装 Kafka lib,并根据 其文档了解如何使用 KafkaConcumer