Flume netcat 无法从 python 套接字接收任何消息

问题描述 投票:0回答:1

这是我的flume conf,请不要介意为什么sink被称为hive-sink但类型是hdfs。 Flume 可以接收来自 telnet 的消息,但不能接收来自 python socket 的消息。

agent.sources = django-source
agent.channels = memory-channel
agent.sinks = hive-sink

agent.sources.django-source.type = netcat
agent.sources.django-source.bind = localhost
agent.sources.django-source.port = 4444
agent.sources.django-source.channels = memory-channel

agent.channels.memory-channel.type = memory

# Sink configuration
agent.sinks.hive-sink.type = hdfs
agent.sinks.hive-sink.channel = memory-channel
agent.sinks.hive-sink.hdfs.path=/django
agent.sinks.hive-sink.hdfs.filePrefix = django
agent.sinks.hive-sink.hdfs.fileSuffix = .log
agent.sinks.hive-sink.hdfs.fileType = DataStream
agent.sinks.hive-sink.hdfs.writeFormat = Text

agent.channels.memory-channel.capacity=10000
agent.channels.memory-channel.transactionCapacity=1000
agent.sinks.hive-sink.batchSize = 5
agent.sinks.hive-sink.hdfs.rollSize=10485760
agent.sinks.hive-sink.hdfs.rollInterval = 0
agent.sinks.hive-sink.hdfs.rollCount = 0
agent.sinks.hive-sink.hdfs.minBlockReplicas=1
agent.sources.django-source.logAllEvents = true
agent.sinks.hive-sink.batchTimeout = 5000

import socket

flume_host = 'localhost'
flume_port = 4444

try:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((flume_host, flume_port))

        s.sendall("Test".encode('utf-8'))
        data = s.recv(1024) # Program stop here, waiting for ok from flume

        print(f"Message sent to Flume")

except Exception as e:
    print(f"Error sending message to Flume: {e}")

那么如何解决这个问题呢。谢谢!

python telnet flume
1个回答
0
投票

您的 Python 脚本似乎旨在向 Flume Netcat 源发送消息。由于您提到脚本在发送消息后卡住,这可能是由于 Flume Netcat 源在收到消息后期望得到特定的确认或响应。

在 Flume 配置中,您使用的是 Netcat 源 (agent.sources.django-source.type = netcat)。 Flume 中的 Netcat 源通常配置有一个拦截器,该拦截器在处理消息后期望来自客户端的特定确认。这是确保消息已成功接收并处理的常见行为。

您可以采取以下几个步骤来排除故障并可能解决问题:

检查 Flume 日志:

查看 Flume 日志,查看是否有任何消息或错误表明它可能正在等待确认的原因。可能有有关预期确认模式的信息。

回顾 Flume 拦截器配置:

如果您在 Flume 配置中配置了任何拦截器(拦截器修改或处理事件),请检查其配置以查看它们是否期望任何确认。拦截器通常与 Netcat 源一起使用。

更新Python脚本:

修改您的 Python 脚本以确保其遵循预期的协议。如果 Netcat 源需要确认,您需要在发送消息后将该确认发送回 Flume。在关闭套接字之前,您可能需要等待确认。

try:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
         s.connect((flume_host, flume_port))

        s.sendall("Test".encode('utf-8'))
        data = s.recv(1024)  # Wait for acknowledgment from Flume

        # Send acknowledgment back to Flume if required
        s.sendall("ACK".encode('utf-8'))

        print(f"Message sent to Flume")

except Exception as e:
    print(f"Error sending message to Flume: {e}")
© www.soinside.com 2019 - 2024. All rights reserved.