无法使用外部连接器注册Flink TableSource

问题描述 投票:0回答:1

我正在尝试使用以下代码段注册flink源。但是失败了,例外。

线程“主”中的异常org.apache.flink.table.api.TableException:findAndCreateTableSource失败。在org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)在org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)在org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)在test.flink.BatchJob.main(BatchJob.java:58)造成原因:org.apache.flink.table.api.NoMatchingTableFactoryException:在以下位置找不到适用于'org.apache.flink.table.factories.TableSourceFactory'的表工厂类路径。

Reason: No context matches.

The following properties are requested:
connector.path=file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result
connector.property-version=1
connector.type=filesystem
format.avro-schema={\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"User\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}
format.property-version=1
format.type=avro
schema.0.name=name
schema.0.type=VARCHAR
schema.1.name=favorite_number
schema.1.type=INT
schema.2.name=favorite_color
schema.2.type=VARCHAR

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
    at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
    at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)

            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

            BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

            String schemaString = "{\"namespace\": \"example.avro\",\n" +
                    " \"type\": \"record\",\n" +
                    " \"name\": \"User\",\n" +
                    " \"fields\": [\n" +
                    "     {\"name\": \"name\", \"type\": \"string\"},\n" +
                    "     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
                    "     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
                    " ]\n" +
                    "}";

            tEnv
                    .connect(
                            new FileSystem()
                                    .path("file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result")
                    )
                    .withFormat(new Avro().avroSchema(schemaString))
                    .withSchema(
                            new Schema()
                                    .field("name", Types.STRING)
                                    .field("favorite_number", Types.INT)
                                    .field("favorite_color", Types.STRING)
                    )
                    .registerTableSource("FirstTableSource");

            env.execute();
        }```
apache-flink flink-sql
1个回答
0
投票

documentation似乎表明文件系统连接器仅支持CSV。

© www.soinside.com 2019 - 2024. All rights reserved.