从卡夫卡到HDFS的avro活动

问题描述 投票:2回答:2

我有kafka集群接收来自生产者的avro事件。

我想使用flume来使用这些事件并将它们作为avro文件放在HDFS中

这可能与水槽有关吗?

有没有人有配置文件示例演示如何做到这一点?

Yosi

apache-kafka flume avro flume-ng
2个回答
1
投票

这确实是可能的。

如果你想从Kafka消费,那么你需要设置一个Kafka源和一个将使用Avro的HDFS接收器。

以下是Kafka源配置选项的链接:http://flume.apache.org/FlumeUserGuide.html#kafka-source

设置源配置非常简单。您当然需要对此进行测试,以验证您选择的设置是否适用于您的系统。

要使用Avro设置HDFS,您需要设置HDFS接收器并且运气好,本网站介绍了如何执行此操作:http://thisdataguy.com/2014/07/28/avro-end-to-end-in-hdfs-part-2-flume-setup/

最后,您需要配置一个频道。我有使用Flume的内存通道和默认设置的经验(我相信......现在无法检查)并且它运行良好。

我建议您花时间使用Flume文档:http://flume.apache.org/FlumeUserGuide.html,因为所有这些信息都包含在那里。在设置Flume代理程序来处理数据之前,了解您正在使用的系统非常重要。


0
投票

考虑这种情况。对于来自kafka的avro事件(只有二进制数据,没有架构),下面是为我工作的代理。

将使用以下代理在接收方添加架构。

#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.useFlumeEventFormat = false
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers =${BOOTSTRAP_SERVERS}
MY_AGENT.sources.my-source.kafka.topics = my-topic
MY_AGENT.sources.my-source.kafka.consumer.group.id = my-topic_grp
MY_AGENT.sources.my-source.kafka.consumer.client.id = my-topic_clnt
MY_AGENT.sources.my-source.kafka.compressed.topics = my-topic
MY_AGENT.sources.my-source.kafka.auto.commit.enable = false
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.max.partition.fetch.bytes=704857
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=latest

#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false

#Sink
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.kerberosPrincipal =${user}
MY_AGENT.sinks.my-sink.hdfs.kerberosKeytab =${keytab}
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://nameservice1/my_hdfs/my_table1/timestamp=%Y%m%d
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro

MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs://nameservice1/my_hdfs/avro_schemas/${AVSC_FILE}

我想强调的几件事情。

MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text ..仅帮助转储来自Flume事件的数据(忽略水槽事件标题....)

MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs://nameservice1/my_hdfs/avro_schemas/${AVSC_FILE} ..需要传递适当的模式(将被添加到avro文件中的二进制数据)。 hdfs中的最终输出文件将具有架构+数据。

在HDFS中存储数据后,使用适当的avro架构创建了hive表,并且我能够按预期访问数据。

© www.soinside.com 2019 - 2024. All rights reserved.