我如何访问flume-kafka管道中的完整数据集?

问题描述 投票:0回答:1

我正在读取文本文件SMSSpamCollection作为水槽源,并将其发布到卡夫卡主题,该主题是水槽。

     # Agent Name:
     a1.sources = r1
     a1.sinks = sample
     a1.channels = sample-channel


     # Source configuration:
     a1.sources.r1.type = exec
     a1.sources.r1.command = tail -f /Users/val/Documents/code/spark/m11_to_Upload/SMSSpamCollection
     a1.sources.r1.logStdErr = true

     # Sink type
     #a1.sinks.sample.type = logger

     # Buffers events in memory to channel
     a1.channels.sample-channel.type = memory
     a1.channels.sample-channel.capacity = 1000
     a1.channels.sample-channel.transactionCapacity = 100

     # Bind the source and sink to the channel
     a1.sources.r1.channels.selector.type = replicating
     a1.sources.r1.channels = sample-channel

     # Related settings Kafka, topic, and host channel where it set the source
     a1.sinks.sample.type = org.apache.flume.sink.kafka.KafkaSink
     a1.sinks.sample.topic = sample_topic
     a1.sinks.sample.brokerList = 127.0.0.1:9092
     a1.sinks.sample.requiredAcks = 1
     a1.sinks.sample.batchSize = 20
     a1.sinks.sample.channel = sample-channel

我使用此命令

    flume-ng agent --conf conf --conf-file /usr/local/Cellar/flume/1.9.0/libexec/conf/flume-sample.conf  -Dflume.root.logger=DEBUG,console --name a1 -Xmx512m -Xms256m 

当我从kafka主题读取数据时

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

我仅看到原始文件中的最后10条记录。

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    ham Pity, * was in mood for that. So...any other suggestions?
    ham The guy did some bitching but I acted like i'd be interested in buying something else next week and he gave it to us for free
    ham Rofl. Its true to its name

查看所有记录的正确方法是什么?

apache-kafka flume
1个回答
0
投票

您正在使用tail,默认情况下显示文件的最后10行。

代替使用:

a1.sources.r1.command = tail -c +0 -f /Users/val/Documents/code/spark/m11_to_Upload/SMSSpamCollection

-c +0告诉tail从文件的第一个字符开始。

顺便说一句,替代方法是将Kafka Connect与SpooldirFile Pulse插件类似使用。

© www.soinside.com 2019 - 2024. All rights reserved.