我正在尝试使用具有多语言支持的 Apache Beam Python SDK v2.50.0 读取 MQTT 主题 - MQTT IO 可通过 Java SDK 获得,但不能通过 Python SDK 获得)。 这是我正在关注的文档。
import argparse
import logging
from typing import Callable, NamedTuple, Optional
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import JavaExternalTransform
def run(
username: str,
password: str,
server_uri: str = 'mqtts:HOSTNAME:8883',
topic: str = 'TOPIC_NAME',
beam_options: Optional[PipelineOptions] = None,
) -> None:
with beam.Pipeline(options=beam_options) as pipeline:
MqttConfig = NamedTuple('MqttConfig', [('serverUri', str), ('topic', str), ('clientId', str), ('username', str), ('password', str)])
mqtt_config = MqttConfig(serverUri=server_uri, topic=topic, clientId='random123', username=username, password=password)
read_java_transform = JavaExternalTransform(
'org.apache.beam.sdk.io.mqtt.MqttIO',
).read().withConnectionConfiguration(list(mqtt_config))
data = (
pipeline
| 'Read' >> read_java_transform
| 'ToString' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Print' >> beam.Map(print)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--username",
)
parser.add_argument(
"--password",
)
parser.add_argument(
'--server_uri',
help='MQTT server URI',
)
parser.add_argument(
'--topic',
help='MQTT topic to subscribe to',
)
known_args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(
# save_main_session=True,
beam_args
)
run(
**vars(known_args),
beam_options=beam_options,
)
当我使用
poetry run python3 main.py --runner DirectRunner --environment_type=DOCKER --username=USERNAME --password=API_KEY
运行文件时,出现以下错误:
RuntimeError: java.lang.IllegalArgumentException: Could not find class org.apache.beam.sdk.io.mqtt.MqttIO
at org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:131)
at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:402)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:555)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:639)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.mqtt.MqttIO
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:107)
... 12 more
扩展服务器似乎已自动启动后出现该错误。这是相关的标准输出:
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
/Users/daniel/Library/Caches/pypoetry/virtualenvs/good-data-app-rAL-Zgkg-py3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py:676: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
self._expansion_service, pipeline.options)
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-expansion-service-app/2.50.0/beam-sdks-java-expansion-service-app-2.50.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /Users/daniel/.apache_beam/cache/jars/beam-sdks-java-expansion-service-app-2.50.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/Users/daniel/.apache_beam/cache/jars/beam-sdks-java-expansion-service-app-2.50.0.jar' '61991' '--javaClassLookupAllowlistFile=*']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:61991
INFO:apache_beam.utils.subprocess_server:Sep 24, 2023 7:10:25 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@413d1baf
INFO:apache_beam.utils.subprocess_server:Sep 24, 2023 7:10:26 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read/ExternalTransform(beam:expansion:payload:java_class_lookup:v1)' with URN 'beam:expansion:payload:java_class_lookup:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
我相信MqttIO类满足以下规范:
为了能够直接使用,Java 转换的 API 必须满足以下要求: 1. 可以使用同一 Java 类中可用的公共构造函数或公共静态方法(构造函数方法)来构造 Java 转换。 2. 可以使用一种或多种构建器方法来配置 Java 转换。每个构建器方法都应该是公共的,并且应该返回 Java 转换的实例。
我已经仔细检查了完全限定的类名
'org.apache.beam.sdk.io.mqtt.MqttIO'
是否正确。
我在这里缺少什么?
似乎用于启动扩展服务的扩展服务jar(beam-sdks-java-expansion-service-app-2.50.0.jar)不包含org.apache.beam.sdk。 io.mqtt.MqttIO 您尝试通过 Python 使用的 Java 转换。
您能否尝试手动启动 Java 扩展服务,并确保启动扩展服务时,类路径中存在 beam-sdks-java-io-mqtt jar 和其他所需的依赖项?构建一个包含 MQTT jar 及其依赖项的阴影 jar 可能会更方便。例如,请参阅here了解我们为 Kafka 使用的 IO 扩展服务 jar 构建所包含的主类和依赖项。请参阅此处了解可用于启动扩展服务的命令。扩展服务搭建完成后,可以通过参数here指定其地址。
理想情况下,Beam 应该提供一个 Python 多语言包装器,以便为您提供方便。但这还没有。