当我尝试在使用表环境时运行 Flink 批处理时,表环境未实现,而是抛出异常:
TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
我收到以下错误异常:
在类路径中找不到任何实现“org.apache.flink.table.delegation.ExecutorFactory”的工厂。而工作却失败了。
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(wslIpAddress, flinkPort);
TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tenv.executeSql("CREATE TABLE Flinkdata (\n" +
" Inde STRING,\n" +
" User_Id STRING,\n" +
" First_Name STRING,\n" +
" Last_Name STRING,\n" +
" Sex STRING,\n" +
" Email STRING,\n" +
" Phone STRING,\n" +
" Date_of_birth STRING, \n" +
" Job_Title STRING,\n" +
" PRIMARY KEY (Inde) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://localhost/ruby',\n" +
" 'connector.table' = 'flink_people_data',\n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'passwordd1234'\n" +
")");
tenv.executeSql("SELECT * FROM Flinkdata");
Table transactions = tenv.from("Flinkdata");
env.execute("ReadWriteToMariaDB");
代码中的这一行似乎发生了错误:
TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
我该如何解决这个问题?
这是由于缺少依赖造成的,可以在maven中导入以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>${flink.version}</version>
</dependency>
以下是完整的pom文件:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
这是演示:
public class FlinkSQL {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.110.244", 6123);
TableEnvironment tenv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tenv.executeSql("CREATE TABLE test_t1 (\n" +
" id int,\n" +
" name STRING,\n" +
" age int,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://192.168.110.210:3306/test_db?useSSL=false',\n" +
" 'table-name' = 'test_t1',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root'\n" +
")");
tenv.executeSql("SELECT * FROM test_t1").print();
//Table transactions = tenv.from("test_t1");
//env.execute("ReadWriteToMariaDB");
}
}