如何使用 Apache Beam Python SDK 读取 MQTT

问题描述 投票:0回答:1

我正在尝试使用具有多语言支持的 Apache Beam Python SDK v2.50.0 读取 MQTT 主题 - MQTT IO 可通过 Java SDK 获得,但不能通过 Python SDK 获得)。 这是我正在关注的文档

import argparse
import logging
from typing import Callable, NamedTuple, Optional

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import JavaExternalTransform



def run(
    username: str,
    password: str,
    server_uri: str = 'mqtts:HOSTNAME:8883',
    topic: str = 'TOPIC_NAME',
    beam_options: Optional[PipelineOptions] = None,
) -> None:
    with beam.Pipeline(options=beam_options) as pipeline:
        MqttConfig = NamedTuple('MqttConfig', [('serverUri', str), ('topic', str), ('clientId', str), ('username', str), ('password', str)])
        
        mqtt_config = MqttConfig(serverUri=server_uri, topic=topic, clientId='random123', username=username, password=password)

        read_java_transform = JavaExternalTransform(
            'org.apache.beam.sdk.io.mqtt.MqttIO',
        ).read().withConnectionConfiguration(list(mqtt_config))

        data = (
            pipeline 
            | 'Read' >> read_java_transform
            | 'ToString' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'Print' >> beam.Map(print)
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--username", 
    )
    parser.add_argument(
        "--password",
    )
    parser.add_argument(
      '--server_uri',
      help='MQTT server URI',
      )
    parser.add_argument(
      '--topic',
      help='MQTT topic to subscribe to',
      )
    known_args, beam_args = parser.parse_known_args()

    beam_options = PipelineOptions(
        # save_main_session=True, 
        beam_args
    )

    run(
        **vars(known_args),
        beam_options=beam_options,
    )

当我使用

poetry run python3 main.py --runner DirectRunner --environment_type=DOCKER --username=USERNAME --password=API_KEY
运行文件时,出现以下错误:

RuntimeError: java.lang.IllegalArgumentException: Could not find class org.apache.beam.sdk.io.mqtt.MqttIO
    at org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:131)
    at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:402)
    at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:555)
    at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:639)
    at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
    at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
    at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
    at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
    at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.mqtt.MqttIO
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
    at org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:107)
    ... 12 more

扩展服务器似乎已自动启动后出现该错误。这是相关的标准输出:

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
/Users/daniel/Library/Caches/pypoetry/virtualenvs/good-data-app-rAL-Zgkg-py3.11/lib/python3.11/site-packages/apache_beam/transforms/external.py:676: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  self._expansion_service, pipeline.options)
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-expansion-service-app/2.50.0/beam-sdks-java-expansion-service-app-2.50.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /Users/daniel/.apache_beam/cache/jars/beam-sdks-java-expansion-service-app-2.50.0.jar 
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/Users/daniel/.apache_beam/cache/jars/beam-sdks-java-expansion-service-app-2.50.0.jar' '61991' '--javaClassLookupAllowlistFile=*']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:61991
INFO:apache_beam.utils.subprocess_server:Sep 24, 2023 7:10:25 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server:   beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@413d1baf
INFO:apache_beam.utils.subprocess_server:Sep 24, 2023 7:10:26 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read/ExternalTransform(beam:expansion:payload:java_class_lookup:v1)' with URN 'beam:expansion:payload:java_class_lookup:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}

我相信MqttIO类满足以下规范:

为了能够直接使用,Java 转换的 API 必须满足以下要求: 1. 可以使用同一 Java 类中可用的公共构造函数或公共静态方法(构造函数方法)来构造 Java 转换。 2. 可以使用一种或多种构建器方法来配置 Java 转换。每个构建器方法都应该是公共的,并且应该返回 Java 转换的实例。

我已经仔细检查了完全限定的类名

'org.apache.beam.sdk.io.mqtt.MqttIO'
是否正确。

我在这里缺少什么?

apache-beam apache-beam-io
1个回答
0
投票

似乎用于启动扩展服务的扩展服务jarbeam-sdks-java-expansion-service-app-2.50.0.jar)不包含org.apache.beam.sdk。 io.mqtt.MqttIO 您尝试通过 Python 使用的 Java 转换。

您能否尝试手动启动 Java 扩展服务,并确保启动扩展服务时,类路径中存在 beam-sdks-java-io-mqtt jar 和其他所需的依赖项?构建一个包含 MQTT jar 及其依赖项的阴影 jar 可能会更方便。例如,请参阅here了解我们为 Kafka 使用的 IO 扩展服务 jar 构建所包含的主类和依赖项。请参阅此处了解可用于启动扩展服务的命令。扩展服务搭建完成后,可以通过参数here指定其地址。

理想情况下,Beam 应该提供一个 Python 多语言包装器,以便为您提供方便。但这还没有。

© www.soinside.com 2019 - 2024. All rights reserved.