在类路径中找不到任何实现“org.apache.flink.table.delegation.ExecutorFactory”的工厂。抛出异常

问题描述 投票:0回答:1

当我尝试在使用表环境时运行 Flink 批处理时,表环境未实现,而是抛出异常:

TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

我收到以下错误异常:

在类路径中找不到任何实现“org.apache.flink.table.delegation.ExecutorFactory”的工厂。而工作却失败了。

 ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(wslIpAddress, flinkPort);

  TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

   tenv.executeSql("CREATE TABLE Flinkdata (\n" +
          "  Inde STRING,\n" +
          "  User_Id STRING,\n" +
          "  First_Name STRING,\n" +
          "  Last_Name STRING,\n" +
          "  Sex STRING,\n" +
          "  Email STRING,\n" +
          "  Phone STRING,\n" +
          "  Date_of_birth STRING, \n" +
          "  Job_Title STRING,\n" +
          "  PRIMARY KEY (Inde) NOT ENFORCED\n" +
          ") WITH (\n" +
          "   'connector.type' = 'jdbc',\n" +
          "   'connector.url' = 'jdbc:mysql://localhost/ruby',\n" +
          "   'connector.table' = 'flink_people_data',\n" +
          "   'connector.username' = 'root',\n" +
          "   'connector.password' = 'passwordd1234'\n" +
          ")");

    tenv.executeSql("SELECT * FROM Flinkdata");
    Table transactions = tenv.from("Flinkdata");


env.execute("ReadWriteToMariaDB"); 

代码中的这一行似乎发生了错误:

TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); 

我该如何解决这个问题?

apache-flink batch-processing
1个回答
0
投票

这是由于缺少依赖造成的,可以在maven中导入以下依赖:

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-client</artifactId>
        <version>${flink.version}</version>
</dependency>

以下是完整的pom文件:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-client</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

这是演示:

public class FlinkSQL {

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.110.244", 6123);

    TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

    tenv.executeSql("CREATE TABLE test_t1 (\n" +
            "  id int,\n" +
            "  name STRING,\n" +
            "  age int,\n" +
            "  PRIMARY KEY (id) NOT ENFORCED\n" +
            ") WITH (\n" +
            "   'connector' = 'jdbc',\n" +
            "   'url' = 'jdbc:mysql://192.168.110.210:3306/test_db?useSSL=false',\n" +
            "   'table-name' = 'test_t1',\n" +
            "   'username' = 'root',\n" +
            "   'password' = 'root'\n" +
            ")");

    tenv.executeSql("SELECT * FROM test_t1").print();
    //Table transactions = tenv.from("test_t1");
    //env.execute("ReadWriteToMariaDB");

    }
}
© www.soinside.com 2019 - 2024. All rights reserved.