Kafka-Python中的CSV数据流

问题描述 投票:0回答:1

正在使用Kafka-Python将CSV数据发送到Kafka主题。消费者成功发送和接收数据。现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题。任何建议都将有助于连续流式传输CSV文件

下面是我现有的代码,

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()
apache-kafka kafka-producer-api kafka-python
1个回答
0
投票

Kafka Connect(Apache Kafka的一部分)是在Kafka与其他系统(包括平面文件)之间进行提取和导出的好方法。

您可以使用Kafka Connect SpoolDir connector将CSV文件流式传输到Kafka。从Confluent Hub安装它,然后为它提供源文件的配置:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

更多示例和详细信息,请参见this blog。>>

© www.soinside.com 2019 - 2024. All rights reserved.