我正在尝试在 python 中运行一个简单的 beam 程序,该程序从 Kafka Topic 读取消息并将其打印到控制台,但我收到此错误并且不知道是什么问题。
警告:root:等待 grpc 通道在本地主机:41521 准备就绪。
ERROR:root:java.io.IOException: error=2, No such file or directory
这是我的代码
import re
import typing
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
pipeline_options = PipelineOptions(
runner='FlinkRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='/tmp',
streaming=True
)
consumer_config = {
'bootstrap.servers': 'kafka.test.svc.cluster.local:9092'
}
def main():
print(beam.__version__)
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read from Kafka" >> ReadFromKafka(consumer_config=consumer_config,
topics=['test'])
| "Print to console" >> beam.Map(print)
)
main()
这是我的依赖项
root@ipython:/# pip freeze
apache-beam==2.43.0
apache-flink==1.17.0
apache-flink-libraries==1.17.0
asttokens==2.2.1
avro-python3==1.9.2.1
backcall==0.2.0
certifi==2022.12.7
charset-normalizer==3.1.0
cloudpickle==2.2.0
crcmod==1.7
decorator==5.1.1
dill==0.3.1.1
docopt==0.6.2
executing==1.2.0
fastavro==1.4.7
fasteners==0.18
find-libpython==0.3.0
grpcio==1.53.0
hdfs==2.7.0
httplib2==0.20.4
idna==3.4
ipython==8.12.0
jedi==0.18.2
matplotlib-inline==0.1.6
numpy==1.21.6
objsize==0.5.2
orjson==3.8.9
pandas==1.3.5
parso==0.8.3
pemja==0.3.0
pexpect==4.8.0
pickleshare==0.7.5
prompt-toolkit==3.0.38
proto-plus==1.22.2
protobuf==3.20.3
ptyprocess==0.7.0
pure-eval==0.2.2
py4j==0.10.9.7
pyarrow==8.0.0
pydot==1.4.2
Pygments==2.14.0
pymongo==3.13.0
pyparsing==3.0.9
python-dateutil==2.8.2
pytz==2023.3
regex==2023.3.23
requests==2.28.2
six==1.16.0
stack-data==0.6.2
traitlets==5.9.0
typing-extensions==4.5.0
urllib3==1.26.15
wcwidth==0.2.6
zstandard==0.20.0
temp
文件夹中的权限下面修改代码
import re
import typing
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
pipeline_options = PipelineOptions(
runner='FlinkRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='/tmp',
streaming=True
)
consumer_config = {
'bootstrap.servers': 'kafka.test.svc.cluster.local:9092'
}
logging.basicConfig(level=logging.INFO)
def main():
try:
logging.info(f'Beam version: {beam.__version__}')
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read from Kafka" >> ReadFromKafka(consumer_config=consumer_config,
topics=['test'])
| "Print to console" >> beam.Map(print)
)
except Exception as e:
logging.error(f"Error: {e}")
raise e
main()