使用Flume向Hive下沉插入netcat JSON响应

问题描述 投票:0回答:1

我有一个使用套接字通过netcat发送数据的python脚本

import socket
import time
import csv
import json
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('localhost', 44444))

try:
    with open('csv1.csv') as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:

            if line_count == 0:
                line_count += 1
            else:
                if len(row)==14:
                    j ={"id": row[0], "lon":row[1],"lat":row[2]}

                    line_count += 1
                    clientsocket.sendall(str(j))
                    data = clientsocket.recv(16)
                    time.sleep(250.0 / 1000.0)
            if line_count==15:
                break


finally:
    clientsocket.close()

结果将如下所示:“ {'lat':1,'lon':1,'id':1}”]

现在,我正在尝试使用Flume代理将该输出流式传输到Hive:我的经纪人看起来像:

agent.sources = s1
agent.sinks = k1
agent.channels = c1

agent.sources.s1.type = netcat
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 44444

agent.sinks.k1.type = hive
agent.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
agent.sinks.k1.hive.database = default
agent.sinks.k1.hive.table = t

agent.sinks.k1.batchSize = 20
agent.sinks.k1.heartBeatInterval = 0
agent.sinks.k1.serializer = JSON

agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

agent.sources.s1.channels = c1
agent.sinks.k1.channel = c1

我的配置单元表具有以下配置:

create table if not exists t (

    plat string,
    lon string,
    id string

)
clustered by (id) into 5 buckets
stored as orc;

但是不幸的是,当我运行代理程序时,没有任何反应,水槽保持不变并且不显示任何错误!我确保我的python脚本可以正常运行,但是我不确定这里发生了什么。

我正在使用Cloudera发行版“ docker映像”。

json hive flume
1个回答
0
投票
您确定水槽工作正常吗?
© www.soinside.com 2019 - 2024. All rights reserved.