Apache Beam python SDK 中的 ReadFromKafka 不起作用:java.io.IOException:error=2,没有这样的文件或目录

问题描述 投票:0回答:1

我正在尝试在 python 中运行一个简单的 beam 程序,该程序从 Kafka Topic 读取消息并将其打印到控制台,但我收到此错误并且不知道是什么问题。

警告:root:等待 grpc 通道在本地主机:41521 准备就绪。

ERROR:root:java.io.IOException: error=2, No such file or directory

这是我的代码

import re
import typing
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka


pipeline_options = PipelineOptions(
    runner='FlinkRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='/tmp',
    streaming=True
)

consumer_config = {
    'bootstrap.servers': 'kafka.test.svc.cluster.local:9092'
}

def main():
    print(beam.__version__)
    with beam.Pipeline(options=pipeline_options) as p:
        (p | "Read from Kafka" >> ReadFromKafka(consumer_config=consumer_config,
                                                topics=['test'])
          | "Print to console" >> beam.Map(print)
           )
    
  
  
  
main()   

这是我的依赖项

root@ipython:/# pip freeze
apache-beam==2.43.0
apache-flink==1.17.0
apache-flink-libraries==1.17.0
asttokens==2.2.1
avro-python3==1.9.2.1
backcall==0.2.0
certifi==2022.12.7
charset-normalizer==3.1.0
cloudpickle==2.2.0
crcmod==1.7
decorator==5.1.1
dill==0.3.1.1
docopt==0.6.2
executing==1.2.0
fastavro==1.4.7
fasteners==0.18
find-libpython==0.3.0
grpcio==1.53.0
hdfs==2.7.0
httplib2==0.20.4
idna==3.4
ipython==8.12.0
jedi==0.18.2
matplotlib-inline==0.1.6
numpy==1.21.6
objsize==0.5.2
orjson==3.8.9
pandas==1.3.5
parso==0.8.3
pemja==0.3.0
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.38
proto-plus==1.22.2
protobuf==3.20.3
ptyprocess==0.7.0
pure-eval==0.2.2
py4j==0.10.9.7
pyarrow==8.0.0
pydot==1.4.2
Pygments==2.14.0
pymongo==3.13.0
pyparsing==3.0.9
python-dateutil==2.8.2
pytz==2023.3
regex==2023.3.23
requests==2.28.2
six==1.16.0
stack-data==0.6.2
traitlets==5.9.0
typing-extensions==4.5.0
urllib3==1.26.15
wcwidth==0.2.6
zstandard==0.20.0 
apache-flink apache-beam apache-beam-io apache-beam-kafkaio
1个回答
0
投票
  1. 查看
    temp
    文件夹中的权限
  2. 为每个步骤使用记录器
  3. 检查Kafka配置:验证Kafka broker的地址和端口号是否正确,topic是否存在。
  4. Check the network connectivity:检查客户端与集群或Kafka broker之间是否存在任何网络连接问题。确保所需端口已打开且可访问。

下面修改代码

    import re
    import typing
    import logging
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.io.kafka import ReadFromKafka
    from apache_beam.io.kafka import WriteToKafka
    
    
    pipeline_options = PipelineOptions(
        runner='FlinkRunner',
        project='my-project-id',
        job_name='unique-job-name',
        temp_location='/tmp',
        streaming=True
    )
    
    consumer_config = {
        'bootstrap.servers': 'kafka.test.svc.cluster.local:9092'
    }
    
    logging.basicConfig(level=logging.INFO)
    
    def main():
        try:
            logging.info(f'Beam version: {beam.__version__}')
            with beam.Pipeline(options=pipeline_options) as p:
                (p | "Read from Kafka" >> ReadFromKafka(consumer_config=consumer_config,
                                                        topics=['test'])
                  | "Print to console" >> beam.Map(print)
                )
        except Exception as e:
            logging.error(f"Error: {e}")
            raise e
              
    main()
© www.soinside.com 2019 - 2024. All rights reserved.