我正在尝试使用以下代码段注册flink源。但是失败了,例外。
线程“主”中的异常org.apache.flink.table.api.TableException:findAndCreateTableSource失败。在org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)在org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)在org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)在test.flink.BatchJob.main(BatchJob.java:58)造成原因:org.apache.flink.table.api.NoMatchingTableFactoryException:在以下位置找不到适用于'org.apache.flink.table.factories.TableSourceFactory'的表工厂类路径。
Reason: No context matches.
The following properties are requested:
connector.path=file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result
connector.property-version=1
connector.type=filesystem
format.avro-schema={\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"User\",\n \"fields\": [\n {\"name\": \"name\", \"type\": \"string\"},\n {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}
format.property-version=1
format.type=avro
schema.0.name=name
schema.0.type=VARCHAR
schema.1.name=favorite_number
schema.1.type=INT
schema.2.name=favorite_color
schema.2.type=VARCHAR
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
String schemaString = "{\"namespace\": \"example.avro\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
" {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
" ]\n" +
"}";
tEnv
.connect(
new FileSystem()
.path("file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result")
)
.withFormat(new Avro().avroSchema(schemaString))
.withSchema(
new Schema()
.field("name", Types.STRING)
.field("favorite_number", Types.INT)
.field("favorite_color", Types.STRING)
)
.registerTableSource("FirstTableSource");
env.execute();
}```
documentation似乎表明文件系统连接器仅支持CSV。