Flink 与 Iceberg Catalog 和 Hive Metastore:找不到 org.apache.hadoop.fs.s3a.S3AFileSystem

问题描述 投票:0回答:1

我正在尝试使用 Apache Iceberg 目录和 Hive Metastore 设置 Flink SQL,但没有成功。以下是我在干净的 Flink 1.18.1 安装中采取的步骤,以及我得到的错误。

设置组件

运行 Hive MetaStore:

docker run --rm --detach --name hms-standalone \
           --publish 9083:9083 \
           ghcr.io/recap-build/hive-metastore-standalone:latest 

使用 Docker 运行 MinIO:

docker run --rm --detach --name minio \
            -p 9001:9001 -p 9000:9000 \
            -e "MINIO_ROOT_USER=admin" \
            -e "MINIO_ROOT_PASSWORD=password" \
            minio/minio server /data --console-address ":9001"

提供一个存储桶:

docker exec minio \
    mc config host add minio http://localhost:9000 admin password
docker exec minio \
    mc mb minio/warehouse

将所需的 MinIO 配置添加到

./conf/flink-conf.yaml

cat >> ./conf/flink-conf.yaml <<EOF
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true
EOF

将 JAR 添加到 Flink

Flink 的 S3 插件:

mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Flink 的 Hive 连接器:

mkdir -p ./lib/hive
curl -s https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar -o ./lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar

Iceberg 的依赖项:

mkdir ./lib/iceberg
curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar -o ./lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar

mkdir -p ./lib/aws
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -o ./lib/aws/hadoop-aws-3.3.6.jar

运行它

设置 Hadoop 依赖项:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.2/bin/hadoop classpath)

启动 SQL 客户端:

./bin/sql-client.sh
Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>    'type' = 'iceberg',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3a://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Flink SQL>

完整的堆栈跟踪

Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation b685c995-3280-4a9e-b6c0-18ab9369d790.                                       │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)                                                          │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)                                                              │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                        │
│       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                        │
│       ... 1 more                                                                                                                                                                          │
│Caused by: org.apache.flink.table.api.TableException: Could not execute CREATE DATABASE: (catalogDatabase: [{}], catalogName: [c_iceberg_hive], databaseName: [db_rmoff], ignoreIfExists: [│
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:90)                                                                           │
│       at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1092)                                                                         │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:556)                                                                     │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:444)                                                                  │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:207)                                                                  │
│       at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)                                                           │
│       at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)                                                            │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)                                                              │
│       ... 7 more                                                                                                                                                                          │
│Caused by: java.lang.RuntimeException: Failed to create namespace db_rmoff in Hive Metastore                                                                                               │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)                                                                                                        │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:222)                                                                                                      │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:213)                                                                                                      │
│       at org.apache.flink.table.catalog.CatalogManager.createDatabase(CatalogManager.java:1381)                                                                                           │
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:84)                                                                           │
│       ... 14 more                                                                                                                                                                         │
│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)                                     │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)                                                             │
│       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)                                                                                                             │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)                                                              │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)                                                                   │
│       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)                                                                                │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                   │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                 │
│       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                         │
│       at java.base/java.lang.reflect.Method.invoke(Method.java:566)                                                                                                                       │
│       at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)                                                                                │
│       at com.sun.proxy.$Proxy35.createDatabase(Unknown Source)                                                                                                                            │
│       at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)                                                                                               │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)                                                                                                                    │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)                                                                                                                    │
│       at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)                                                                                                          │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)                                                                                                        │
│       ... 18 more

诊断

验证

hadoop-aws
位于类路径上:

❯ ps -ef|grep sql-client|grep hadoop-aws
  501 51499 45632   0  7:38pm ttys007    0:06.81 /Users/rmoff/.sdkman/candidates/java/current/bin/java -XX:+IgnoreUnrecognizedVMOptions --add-exports=java.base/sun.net.util=ALL-UNNAMED --ad
d-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=
jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exp
orts=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-
opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens
=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNN
AMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -Dlog.file=/Users/rmoff/flink/flink-1.18.1/log/flink-rmoff-sql-client-asgard08.log -Dlog4j.configuration=file:/Users/rmoff/
flink/flink-1.18.1/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/Users/rmoff/flink/flink-1.18.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/Users/rmoff/flink/fli
nk-1.18.1/conf/logback.xml -classpath /Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.6.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/l[…]

确认 JAR 包含 S3AFileSystem 类:

❯ jar tvf lib/aws/hadoop-aws-3.3.6.jar|grep -i filesystem.class
157923 Sun Jun 18 08:56:00 BST 2023 org/apache/hadoop/fs/s3a/S3AFileSystem.class
  3821 Sun Jun 18 08:56:02 BST 2023 org/apache/hadoop/fs/s3native/NativeS3FileSystem.class

如果我将

CREATE CATALOG
也剥离回原样,我也会得到同样的错误:

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'warehouse' = 's3a://warehouse',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive2;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Java版本:

❯ java --version
openjdk 11.0.21 2023-10-17
OpenJDK Runtime Environment Temurin-11.0.21+9 (build 11.0.21+9)
OpenJDK 64-Bit Server VM Temurin-11.0.21+9 (build 11.0.21+9, mixed mode)

编辑01

我尝试过的其他事情:

  1. 使用 Flink 1.17.1(与 Iceberg jar 中的 1.17 版本保持一致)
  2. 全程使用 Hadoop 3.3.4 组件
  3. 将 jar 移至
    ./lib
    而不是子文件夹
  4. 删除 Flink
    s3-fs-hadoop
    插件
  5. 添加
    iceberg-aws-bundle-1.4.3.jar
    aws-java-sdk-bundle-1.12.648.jar
    (分别和一起)
  6. 使用相同的设置以 parquet 格式写入 S3 (MinIO),效果很好。

更多诊断:

如果我将三个 SQL 语句 (

CREATE CATALOG
/
USE CATALOG
/
CREATE DATABASE
) 添加到文件中并启动具有详细类日志记录的 SQL 客户端:

JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../iceberg.sql > iceberg.log

我得到这个输出,表明

hadoop-aws
JAR 没有被拾取,即使它位于类路径中。

如果我将 Flink 的

s3-fs-hadoop
添加回来,我们可以看到它被拾取(log),但仍然遇到同样的失败。


编辑02

如果我从

s3a
切换到
s3
,我会收到不同的错误
¯\_(ツ)_/¯

Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>     'type' = 'iceberg',
>     'client.assume-role.region' = 'us-east-1',
>     'warehouse' = 's3://warehouse',
>     's3.endpoint' = 'http://localhost:9000',
>     's3.path-style-access' = 'true',
>     'catalog-type'='hive',
>     'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:Got exception: org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3")

如果我添加

io-impl
我会收到另一个不同的错误,这似乎(以我有限的理解)再次表明
hadoop-aws
JAR 没有被拾取

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.model.S3Exception
apache-flink flink-sql hive-metastore apache-iceberg
1个回答
0
投票

对我来说,我可以通过确保使用我的凭证定义 AWS_ACCESS_KEY 和 AWS_SECRET_ACCESS_KEY 环境变量来使其正常工作。

这是我写的教程,展示了一个使用 docker compose 的端到端示例来说明:https://www.dremio.com/blog/using-flink-with-apache-iceberg-and-nessie/

© www.soinside.com 2019 - 2024. All rights reserved.