在类路径中找不到任何实现“org.apache.flink.table.factories.DynamicTableFactory”的标识符“kinesis”工厂

问题描述 投票:0回答:1

我正在尝试根据存储库使用 apache flink 1.15.2 运行 pyhon 作业https://github.com/aws-samples/pyflink-getting-started.git

该存储库列出了 4 个步骤,我陷入了第一步,即 1)使用 Pyflink 进行本地开发

我已经完成了所有先决条件,例如安装 conda virtual env 并安装所需的 flink 版本,即 apache flink 1.15.2

按照 https://github.com/aws-samples/pyflink-getting-started/tree/main/getting-started

的步骤操作

我还下载并保存了 .jar 文件(kinesis 连接器位于当前目录的 subdir lib 中)

  1. 有一个输入流生成器代码:stock.py 运行良好
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"
STREAM_REGION = "eu-west-2"


def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=STREAM_REGION))

  1. 问题出在运行输出流代码时:get-started.py
# -*- coding: utf-8 -*-
"""
getting-started.py
~~~~~~~~~~~~~~~~~~~
This module:
    1. Creates a table environment
    2. Creates a source table from a Kinesis Data Stream
    3. Creates a sink table writing to a Kinesis Data Stream
    4. Inserts the source table data into the sink table
"""

from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

statement_set = table_env.create_statement_set()
# print(os.environ.get("IS_LOCAL"))
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    table_env.get_config().get_configuration().set_string(
        "pipeline.jars",
        "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
        # "file:///" + "C:/lib/flink-sql-connector-kinesis-1.15.2.jar",
    )



def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_source_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )


def create_sink_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.batch.max-size' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region
    )


def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos
    )

def main():
    # Application Property Keys
    input_property_group_key = "consumer.config.0"
    producer_property_group_key = "producer.config.0"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    output_stream_key = "output.stream.name"
    output_region_key = "aws.region"

    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    # get application properties
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, producer_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_stream = output_property_map[output_stream_key]
    output_region = output_property_map[output_region_key]

    # 2. Creates a source table from a Kinesis Data Stream
    table_env.execute_sql(
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)
    )

    # 3. Creates a sink table writing to a Kinesis Data Stream
    table_env.execute_sql(
        create_print_table(output_table_name, output_stream, output_region, stream_initpos)
    )

    # 4. Inserts the source table data into the sink table
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                         .format(output_table_name, input_table_name))

    if is_local:
        table_result.wait()
    else:
        # get job status through TableResult
        print(table_result.get_job_client().get_job_status())


if __name__ == "__main__":
    main()

我遇到错误 Caused by: org.apache.flink.table.api.ValidationException: 找不到实现 'org.apache.flink.table.f 的标识符 'kinesis' 的任何工厂 类路径中的 actories.DynamicTableFactory。

查看完整轨迹

  File ".\getting-started.py", line 181, in <module>
    main()
  File ".\getting-started.py", line 170, in main
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\table\table_environment.py", line 828, in execute_sql  
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\util\exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.

Table options are:

'aws.region'='eu-west-2'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'stream'='ExampleInputStream '
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159)
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:1
97)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:353)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:763)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:322)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kinesis'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:728)
        at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:702)
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:155)
        ... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f
actories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:538)
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:724)
        ... 36 more

这是 application.json 文件

[
  {
  "PropertyGroupId": "kinesis.analytics.flink.run.options",
  "PropertyMap": {
    "python": "GettingStarted/getting-started.py",
    "jarfile": "GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar"
   }
  },
  {
    "PropertyGroupId": "consumer.config.0",
    "PropertyMap": {
      "input.stream.name": "ExampleInputStream ",
      "flink.stream.initpos": "LATEST",
      "aws.region": "eu-west-2"
    }
  },
  {
    "PropertyGroupId": "producer.config.0",
    "PropertyMap": {
      "output.stream.name": "ExampleOutputStream",
      "shard.count": "1",
      "aws.region": "eu-west-2"
    }
  }
]

我期望输出流显示一些结果,如图像输出流

我重新检查了指定路径中存在的jar文件的路径。我什至尝试使用 python 示例程序进行测试以查看 a) 如果目录存在 b) 如果 .jar 文件可以在该目录中访问 dir_struct_files

但仍然遇到连接器问题。我重新检查了 pyflink 文档是否有错误以及需要如何指定 .jar 依赖项,这正是程序中指定的方式。

https://pyflink.readthedocs.io/en/main/faq.html#connector-issues https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/dependency_management/#jar-dependency

我没有使用 fat jar,因为它只是一个 jar 文件,它应该按照 repo 中的步骤工作。

任何帮助将不胜感激。至于为什么我使用 1.15.2 版本是因为最终我们想要编写 Beam 应用程序(目前不支持超过 1.15.2 版本)

jar apache-flink local amazon-kinesis
1个回答
0
投票

问题已解决。代码中没有问题。只是 pycharm 由于某种原因无法正确解析“is_local”变量。所以它无法查看 .jar 文件的正确位置。所以我在windows cmd中设置IS_LOCAL=true。从那里运行代码(激活 conda 后)并在那里得到所需的输出。

© www.soinside.com 2019 - 2024. All rights reserved.