我已经使用 Ubuntu 操作系统创建了一个 Azure VM,我已经在其中安装了 Flink 1.16.1 及其所有先决条件。 VM 在 VNet 中。
在另一边,我有一个用于 Apache Kafka 的 Azure 事件中心,它是公开的。
在 VM 的 Vnet 上,我为 9093 端口创建了入站和出站安全规则,以便与我的事件中心进行通信。
我在我的 ubuntu 虚拟机上通过在终端使用以下命令启动了一个集群:
$FLINK_HOME/bin/start-cluster.sh
我用 mvn clean package 创建了一个 fat jar,并使用以下命令将其部署到 Flink 集群:
$FLINK_HOME/bin/flink run -d -c event-hub-kafka-consumer.jar
我尝试部署的 FlinkKafkaConsumer 的 Java 代码如下:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
public class FlinkKafkaConsumer {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("namespace.servicebus.windows.net:9093")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=*********;SharedAccessKey=****************\";")
.setProperty("security.protocol", "SASL_SSL")
.setTopics("topicA")
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource");
stream.print();
env.execute("Testing flink consumer");
}
}
在 Flink 集群日志中,我得到以下输出:
2023-04-07 11:44:15,618 INFO org.apache.kafka.clients.admin.AdminClientConfig [] - AdminClientConfig values:
bootstrap.servers = [namespace.servicebus.windows.net:9093]
client.dns.lookup = use_all_dns_ips
client.id = flink-consumer-enumerator-admin-client
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = PLAIN
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2023-04-07 11:44:15,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration
2023-04-07 11:44:15,628 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_2 for job ae9979f7cb6ba362eeaae632a7ce3f70.
2023-04-07 11:44:15,642 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_2 for job ae9979f7cb6ba362eeaae632a7ce3f70.
2023-04-07 11:44:15,660 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2023-04-07 11:44:15,661 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job ae9979f7cb6ba362eeaae632a7ce3f70: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2023-04-07 11:44:15,865 INFO org.apache.kafka.common.security.authenticator.AbstractLogin [] - Successfully logged in.
2023-04-07 11:44:15,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from SCHEDULED to DEPLOYING.
2023-04-07 11:44:15,868 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: KafkaSource -> Sink: Print to Std. Out (1/1) (attempt #0) with attempt id 6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:36669-d1d6dc @ localhost (dataPort=41195) with allocation id fda739588d2baa9ef7eb5e4611c84edb
2023-04-07 11:44:16,162 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'key.deserializer' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'value.deserializer' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'group.id' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'client.id.prefix' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.
2023-04-07 11:44:16,163 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'enable.auto.commit' was supplied but isn't a known config.
2023-04-07 11:44:16,164 WARN org.apache.kafka.clients.admin.AdminClientConfig [] - The configuration 'auto.offset.reset' was supplied but isn't a known config.
2023-04-07 11:44:16,171 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 3.2.3
2023-04-07 11:44:16,171 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: 50029d3ed8ba576f
2023-04-07 11:44:16,171 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1680867856164
2023-04-07 11:44:16,178 INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Starting the KafkaSourceEnumerator for consumer group flink-consumer without periodic partition discovery.
2023-04-07 11:44:16,353 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2023-04-07 11:44:16,651 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: KafkaSource registering reader for parallel task 0 (#0) @ localhost
2023-04-07 11:44:16,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaSource -> Sink: Print to Std. Out (1/1) (6dfc8bb74d00e299a91e8f1785165713_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2023-04-07 11:44:17,301 INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Discovered new partitions: [topicA-0]
2023-04-07 11:44:17,304 INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Assigning splits to readers {0=[[Partition: topicA-0, StartingOffset: -1, StoppingOffset: -9223372036854775808]]}
2023-04-07 11:49:17,236 INFO org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Node -1 disconnected.
2023-04-07 11:49:47,266 INFO org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Disconnecting from node 0 due to request timeout.
2023-04-07 11:49:47,267 INFO org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Cancelled in-flight METADATA request with correlation id 4 due to node 0 being disconnected (elapsed time since creation: 30029ms, elapsed time since send: 30029ms, request timeout: 30000ms)
2023-04-07 11:49:47,267 INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient clientId=flink-consumer-enumerator-admin-client] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1680868187237, tries=1, nextAllowedTryMs=1680868187367) timed out at 1680868187267 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled fetchMetadata request with correlation id 4 due to node 0 being disconnected
看起来,Flink 作业已创建,事件中心收到请求,但 Flink 收到 0 条消息。
对此有什么可能的解释吗?