我有一个使用套接字通过netcat发送数据的python脚本
import socket
import time
import csv
import json
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('localhost', 44444))
try:
with open('csv1.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
if len(row)==14:
j ={"id": row[0], "lon":row[1],"lat":row[2]}
line_count += 1
clientsocket.sendall(str(j))
data = clientsocket.recv(16)
time.sleep(250.0 / 1000.0)
if line_count==15:
break
finally:
clientsocket.close()
结果将如下所示:“ {'lat':1,'lon':1,'id':1}”]
现在,我正在尝试使用Flume代理将该输出流式传输到Hive:我的经纪人看起来像:
agent.sources = s1
agent.sinks = k1
agent.channels = c1
agent.sources.s1.type = netcat
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 44444
agent.sinks.k1.type = hive
agent.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
agent.sinks.k1.hive.database = default
agent.sinks.k1.hive.table = t
agent.sinks.k1.batchSize = 20
agent.sinks.k1.heartBeatInterval = 0
agent.sinks.k1.serializer = JSON
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
agent.sources.s1.channels = c1
agent.sinks.k1.channel = c1
我的配置单元表具有以下配置:
create table if not exists t (
plat string,
lon string,
id string
)
clustered by (id) into 5 buckets
stored as orc;
但是不幸的是,当我运行代理程序时,没有任何反应,水槽保持不变并且不显示任何错误!我确保我的python脚本可以正常运行,但是我不确定这里发生了什么。
我正在使用Cloudera发行版“ docker映像”。