将csv文件写入kafka主题

问题描述 投票:0回答:1

我有一个大的csv,我想写到一个kafka主题。

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

这个代码会产生一个错误。

AssertionError: value must be bytes

这个文件看起来像:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

谁能帮帮我?

python-2.7 apache-kafka kafka-producer-api pykafka
1个回答
1
投票

你需要正确地序列化你的值。


下面的内容应该可以做到这一点。

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

1
投票

与其重新发明轮子,不如使用已经存在的非常好的轮子 :) 它是 Kafka连接,它是Apache Kafka的一部分。

有几个连接器可以从CSV中读取,包括 Kafka Connect spooldir (见 例子)和 文件脉冲.

了解更多关于Kafka Connect的信息 此话.

© www.soinside.com 2019 - 2024. All rights reserved.