正在使用Kafka-Python
将CSV数据发送到Kafka主题。消费者成功发送和接收数据。现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题。任何建议都将有助于连续流式传输CSV文件
下面是我现有的代码,
from kafka import KafkaProducer
import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda
K:dumps(K).encode('utf-8'))
with open('C:/Hadoop/Data/Job.csv', 'r') as file:
reader = csv.reader(file, delimiter = '\t')
for messages in reader:
producer.send('Jim_Topic', messages)
producer.flush()
Kafka Connect(Apache Kafka的一部分)是在Kafka与其他系统(包括平面文件)之间进行提取和导出的好方法。
您可以使用Kafka Connect SpoolDir connector将CSV文件流式传输到Kafka。从Confluent Hub安装它,然后为它提供源文件的配置:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_00",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv",
"schema.generation.enabled":"true",
"csv.first.row.as.header":"true"
}'
更多示例和详细信息,请参见this blog。>>