Flink - kafka 连接器 OAUTHBEARER 类加载器问题

问题描述 投票:0回答:3

我尝试使用sasl机制(OAUTHBEARER)配置kafka身份验证(使用flink 1.9.2,kafka-client 2.2.0)。

当使用带有 SASL 身份验证的 Flink 时,我遇到了以下异常。 Kafka 与应用程序一起在一个胖罐子里着色。

远程调试后,我发现我的回调处理程序有一个

ChildFirstClassloader
和 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 属于另一个
ChildFirstClassloader
,因此
instance of
以下测试失败 (
OAuthBearerSaslClientFactory
) :

if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
    throw new IllegalArgumentException(String.format(
         "Callback handler must be castable to %s: %s",
         AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));

我不知道为什么这两个类有两个不同的类加载器。

有什么想法吗?有什么解决办法吗?

感谢您的帮助。

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
    at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
    at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
    at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
    at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
apache-kafka apache-flink flink-streaming jaas sasl
3个回答
0
投票

我不确定你是否已经解决了这个问题,但我在这个完全相同的场景中挣扎了很长一段时间。最终对我有用的是将 kafka-clients jar 复制到 Flink 的 lib/ 目录中。


0
投票

抱歉忘记发布解决方案,但是是的,我以同样的方式解决了它,通过复制 flink lib 中的 kafka-client 。


0
投票

我遇到了这个确切的问题,并为此苦苦挣扎了好几个星期。就我而言,仅将

kafka-client
jar 放入我的
/opt/flink/lib
目录是不够的。这是因为导致此问题的问题不仅仅是
kafka-client
库问题。

类加载:

首先,如果您不熟悉 Java 中的类加载,我恳请您研究一下这个概念。如果没有这些知识,您将无法理解任何答案。

问题原因概述

这个问题的根本原因是Flink如何加载项目中的类(这也会根据部署模式(会话或应用程序)而有所不同)。如果您通过应用程序模式进行部署,则将 jar 文件上传到

userlib
文件夹中。

  1. jobmanager
    启动时,它将通过其类加载器读取您的代码,创建作业图,进行优化等。
  2. 然后它通过网络发送作业图每个部分所需的代码,以便您的
    taskmanagers
    可以运行它。
  3. taskmanagers
    选择此代码,使用
    UserCodeClassLoader
    加载它,然后执行您的应用程序。

如果一切顺利,那么您的应用程序将成功运行。然而,这个问题出现在失败的案例中。那么发生了什么事? Flink 的

UserCodeClassLoader
是一个子级加载器。这是什么意思?在java应用程序中加载类的正常过程是父级优先。因此,如果子类加载器需要一个类,那么它会询问父类是否已加载该类,如果已加载,它将使用该类。然而,Flink 的做法略有不同。
UserCodeClassLoader
是一个子级优先的类加载器,这意味着如果类加载器缺少依赖项,它会首先询问它的子级。 Flink 做出这样的选择是因为这意味着 Flink 集群的依赖项(也是用 java 编写的)不会与您导入的任何依赖项发生冲突。

因此,这个错误(我的例子中的一个错误实例)告诉我们该类是由 2 个不同的类加载器加载的。如果由 2 个不同的类加载器加载,则

instanceof
检查将会失败(即使它是同一个类)。

解决方案

  1. 仅使用您的依赖项创建一个 uber jar - 您可以使用 Maven Shade 插件,并将
    createDependencyReducedPom
    选项设置为
    false
    。如果您使用 avro 生成插件从模式创建 java 对象,您可能还需要将其添加到其中。
  2. 删除您原始项目中可能有的任何 uber jar 配置
  3. 将您的 uber jar 添加到您的
    opt/flink/lib
    文件夹中,以便它由 Flink 的父类加载器加载,并且可用且仅加载一次。
  4. 如果您需要任何库(即 avro 模式生成对象),请将新的 uber jar 添加为您的 Maven 项目中的导入。

此解决方案无需担心哪些子类加载器加载哪些类。

© www.soinside.com 2019 - 2024. All rights reserved.