如何正确使用Flume在HDFS中插入JSON

问题描述 投票:0回答:4

我正在使用

HTTPSource
中的
Flume
来接收 json 格式的
POST
事件,如下所示:

{"username":"xyz","password":"123"}

我的问题是:我是否必须修改事件源(我的意思是向 Flume 发送

JSON
的事件源),以便 JSON 具有以下格式:

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "{"username":"xyz","password":"123"}"
}]

这是最好的方法吗?或者我可以在其他地方修改它?

我的

conf
flume agent
文件是:

## Componentes
SomeAgent.sources = SomeHTTP
SomeAgent.channels = MemChannel
SomeAgent.sinks = SomeHDFS

## Fuente e Interceptores
SomeAgent.sources.SomeHTTP.type = http
SomeAgent.sources.SomeHTTP.port = 5140
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler
SomeAgent.sources.SomeHTTP.channels = MemChannel
SomeAgent.sources.SomeHTTP.interceptors = i1 i2

## Interceptores
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname

## Canal
SomeAgent.channels.MemChannel.type = memory
SomeAgent.channels.MemChannel.capacity = 10000
SomeAgent.channels.MemChannel.transactionCapacity = 1000

## Sumidero
SomeAgent.sinks.SomeHDFS.type = hdfs
SomeAgent.sinks.SomeHDFS.channel = MemChannel
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs-
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true

运行

cat
hadoop fs

$ hadoop fs -ls -R /raw/logs/somes
drwxr-xr-x   - flume-agent supergroup          0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16
-rw-r--r--   3 flume-agent supergroup       3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369
-rw-r--r--   3 flume-agent supergroup       3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774


$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head




$

(你没看错,空行)

如果现在我查看该文件(例如使用

HUE
的二进制视图):

0000000:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
0000010:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
0000020:    0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a   ................
json hadoop flume flume-ng
4个回答
4
投票

如果我理解得很好,您想要序列化数据和标题。在这种情况下,您不必修改数据源,而是使用一些标准 Flume 元素并为 HDFS 创建自定义序列化器。

第一步是实现Flume创建所需的JSON结构,即headers+body。 Flume 可以为你做到这一点,只需在你的 HTTPSource 中使用 JSONHandler,这样:

a1.sources = r1
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler

事实上,没有必要配置 JSON 处理程序,因为它是 HTTPSource 的默认处理程序。

然后,使用 Timestamp InterceptorHost Interceptor 来添加所需的标头。唯一的技巧是 Flume 代理必须与发送方进程在同一台机器上运行,以便拦截的主机与发送方主机相同:

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname

此时,您将获得想要的事件。然而,HDFS 的标准序列化器仅保存主体,而不保存标头。因此创建一个实现

org.apache.flume.serialization.EventSerializer
的自定义序列化器。它的配置为:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.serializer = my_custom_serializer

HTH


3
投票

@frb 发布的答案是正确的,唯一缺少的一点是 JSON 生成器 must 发送

body
部分(我必须承认/抱怨 docs 在这一点上不清楚),所以, 正确发布
json
的方式是

[body:"{'username':'xyz','password':'123'}"]

请注意,数据的

json
现在是一个字符串。

通过此更改,

json
现在在
hdfs
中可见。


1
投票

使用默认 JSONHandler 的 Flume HTTPSource 期望将 JSON 表示形式的完整 Flume 事件列表

[{ headers: ..., body: ... }]
提交到端点;要创建一个可以接受像
{"username":"xyz", "password":"123"}
这样的裸应用程序级结构的代理端点,您可以使用实现 HTTPSourceHandler 的替代类覆盖处理程序;请参阅 JSONHandler 源代码 - 内容并不多。

public List<Event> getEvents(HttpServletRequest request) throws ...

在自定义 JSONHandler 中,您还可以根据 HTTP 请求向事件添加标头,例如源 IP、用户代理等(拦截器没有相关上下文)。此时您可能想要验证应用程序提供的 JSON(尽管默认处理程序不会)。

尽管正如您所发现的,您可以仅传递

[{body: ...}]
部分,但如果您想 防止 生成器为事件注入标头,这样的自定义处理程序也可能很有用。


0
投票

我在本地修复了包含 20 条记录的 json 文件,我想将其移至 hdfs,您能帮我进行源配置吗?

© www.soinside.com 2019 - 2024. All rights reserved.