Flume TAILDIR来源于Kafka Sink-静态拦截器问题

问题描述 投票:0回答:1

我正在尝试的方案如下:

1- Flume TAILDIR从日志文件中读取源代码并将静态拦截器附加到消息的开头。拦截器由主机名和主机IP组成,因为我收到的每条日志消息都需要它。

2- Flume Kafka Producer Sink从文件中获取这些消息并将它们放入Kafka主题中。

Flume配置如下:

tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1

tier1.sources.source1.interceptors=i1


tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###


tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1

tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data



tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy

所以现在我正在测试,我运行了一个Console Kafka Consumer,我开始在源文件中写入,我确实收到了附加标题的消息。

例:

我在源文件中写'test'并按Enter然后保存文件

Flume检测文件更改,然后将新行发送给Kafka生产者。

我的消费者获得以下一行:

###HostName###000.00.0.000###test

现在的问题是有时,拦截器不能按预期工作。这就像Flume发送2条消息,一条包含拦截器,另一条包含消息内容。

例:

我在源文件中写'hi you'并按Enter然后保存文件

Flume检测文件更改,然后将新行发送给Kafka生产者。

我的消费者获得以下2行:

###HostName###000.00.0.000###
hi you

终端滚动到新的消息内容。

这种情况总是发生在我在文本文件中输入'hi you'时,因为我从日志文件中读取,所以当它发生时它是不可预测的。

将非常感谢帮助和支持^^

谢谢

apache-kafka interceptor flume
1个回答
0
投票

所以问题来自Kafka Consumer。它从水槽收到完整的消息

Interceptor + some garbage characters + message

如果其中一个垃圾字符是\ n(Linux系统中的LF),那么它将假设其2条消息,而不是1条消息。

我在Streamsets中使用Kafka Consumer元素,因此更改消息分隔符很简单。我做到了\ r \ n,现在它工作正常。

如果您将完整的消息作为字符串处理并想要在其上应用正则表达式或想要将其写入文件,那么最好用空字符串替换\ r和\ n。

可以在此处找到答案的完整演练:

https://community.cloudera.com/t5/Data-Ingestion-Integration/Flume-TAILDIR-Source-to-Kafka-Sink-Static-Interceptor-Issue/m-p/86388#M3508

© www.soinside.com 2019 - 2024. All rights reserved.