使用 Azure Event Hub Kafka 连接 Azure VM 上的独立 Flink 集群

问题描述 投票:0回答:0

我已经使用 Ubuntu 操作系统创建了一个 Azure VM,我已经在其中安装了 Flink 1.16.1 及其所有先决条件。 VM 在 VNet 中。

在另一边,我有一个用于 Apache Kafka 的 Azure 事件中心,它是公开的。

在 VM 的 Vnet 上,我为 9093 端口创建了入站和出站安全规则,以便与我的事件中心进行通信。

我在我的 ubuntu 虚拟机上通过在终端使用以下命令启动了一个集群:

$FLINK_HOME/bin/start-cluster.sh

我用 mvn clean package 创建了一个 fat jar,并使用以下命令将其部署到 Flink 集群:

$FLINK_HOME/bin/flink run -d -c event-hub-kafka-consumer.jar

我尝试部署的 FlinkKafkaConsumer 的 Java 代码如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

public class FlinkKafkaConsumer {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("namespace.servicebus.windows.net:9093")
            .setProperty("sasl.mechanism", "PLAIN")
            .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=*********;SharedAccessKey=****************\";")
            .setProperty("security.protocol", "SASL_SSL")
            .setTopics("topicA")
            .setGroupId("flink-consumer")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource");

        stream.print();
        env.execute("Testing flink consumer");
    }
}

在 Flink 集群日志中,我得到以下输出:

2023-04-07 11:44:15,618 INFO  org.apache.kafka.clients.admin.AdminClientConfig             [] - AdminClientConfig values: 
    bootstrap.servers = [namespace.servicebus.windows.net:9093]
    client.dns.lookup = use_all_dns_ips
    client.id = flink-consumer-enumerator-admin-client
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = PLAIN
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SASL_SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2023-04-07 11:44:15,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
2023-04-07 11:44:15,628 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_2 for job ae9979f7cb6ba362eeaae632a7ce3f70.
2023-04-07 11:44:15,642 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_2 for job ae9979f7cb6ba362eeaae632a7ce3f70.
2023-04-07 11:44:15,660 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2023-04-07 11:44:15,661 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job ae9979f7cb6ba362eeaae632a7ce3f70: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2023-04-07 11:44:15,865 INFO  org.apache.kafka.common.security.authenticator.AbstractLogin [] - Successfully logged in.
2023-04-07 11:44:15,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from SCHEDULED to DEPLOYING.
2023-04-07 11:44:15,868 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: KafkaSource -> Sink: Print to Std. Out (1/1) (attempt #0) with attempt id 6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:36669-d1d6dc @ localhost (dataPort=41195) with allocation id fda739588d2baa9ef7eb5e4611c84edb
2023-04-07 11:44:16,162 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'key.deserializer' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'value.deserializer' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'group.id' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'client.id.prefix' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'enable.auto.commit' was supplied but isn't a known config.
2023-04-07 11:44:16,164 WARN  org.apache.kafka.clients.admin.AdminClientConfig             [] - The configuration 'auto.offset.reset' was supplied but isn't a known config.
2023-04-07 11:44:16,171 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 3.2.3
2023-04-07 11:44:16,171 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: 50029d3ed8ba576f
2023-04-07 11:44:16,171 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1680867856164
2023-04-07 11:44:16,178 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Starting the KafkaSourceEnumerator for consumer group flink-consumer without periodic partition discovery.
2023-04-07 11:44:16,353 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2023-04-07 11:44:16,651 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: KafkaSource registering reader for parallel task 0 (#0) @ localhost
2023-04-07 11:44:16,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2023-04-07 11:44:17,301 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Discovered new partitions: [topicA-0]
2023-04-07 11:44:17,304 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Assigning splits to readers {0=[[Partition: topicA-0, StartingOffset: -1, StoppingOffset: -9223372036854775808]]}
2023-04-07 11:49:17,236 INFO  org.apache.kafka.clients.NetworkClient                       [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Node -1 disconnected.
2023-04-07 11:49:47,266 INFO  org.apache.kafka.clients.NetworkClient                       [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Disconnecting from node 0 due to request timeout.
2023-04-07 11:49:47,267 INFO  org.apache.kafka.clients.NetworkClient                       [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Cancelled in-flight METADATA request with correlation id 4 due to node 0 being disconnected (elapsed time since creation: 30029ms, elapsed time since send: 30029ms, request timeout: 30000ms)
2023-04-07 11:49:47,267 INFO  org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1680868187237, tries=1, nextAllowedTryMs=1680868187367) timed out at 1680868187267 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled fetchMetadata request with correlation id 4 due to node 0 being disconnected

看起来,Flink 作业已创建,事件中心收到请求,但 Flink 收到 0 条消息。

对此有什么可能的解释吗?

java azure apache-kafka apache-flink azure-eventhub
© www.soinside.com 2019 - 2024. All rights reserved.