与 OpenShift 服务器内部/外部的 EmbeddedKafkaBroker 交互

问题描述 投票:0回答:0

我暂时使用托管在 OpenShift 服务器容器中的 EmbeddedKafkaBroker。

这是服务器的样子:

EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, false, 2);
embeddedKafkaBroker.kafkaPorts(8092);
embeddedKafkaBroker.addTopics("testTopic");
embeddedKafkaBroker.brokerProperty("listeners", "INTERNAL://0.0.0.0:8092,EXTERNAL://0.0.0.0:9092");
embeddedKafkaBroker.brokerProperty("advertised.listeners", "INTERNAL://localhost:8092,EXTERNAL://kafka-mock-daemon2017.apps.testing.soul:80");
embeddedKafkaBroker.brokerProperty("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT");
embeddedKafkaBroker.brokerProperty("inter.broker.listener.name", "INTERNAL");
embeddedKafkaBroker.afterPropertiesSet();
log.info("EmbeddedKafkaBroker started!");

内部端口 9092 映射到外部端口 80.

版本:

org.springframework.kafka:spring-kafka:2.7.10
org.springframework.kafka:spring-kafka-test:2.7.10

如果我尝试从同一 OpenShift 命名空间中托管的另一个应用程序向该服务器 (kafka-mock-daemon2017.apps.testing.soul:80) 生成消息,则会出现错误:

2023-03-30 21:11:46[kafka-producer-network-thread |生产者-1]错误 o.a.kafka.common.utils.KafkaThread - 线程中未捕获的异常 'kafka-producer-network-thread |生产者 1': java.lang.OutOfMemoryError:Java 堆空间位于 java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:61) 在 java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) 在 org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) 在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) 在 org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) 在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) 在 org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) 在 org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) 在 org.apache.kafka.common.network.Selector.poll(Selector.java:481) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551) 在 org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 在 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 在 java.base/java.lang.Thread.run(Thread.java:829)

如果我尝试从我本地计算机上托管的另一个应用程序使用它 (kafka-mock-daemon2017.apps.testing.soul:80) 的消息,那么我会收到错误消息:

Java heap space java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:61) 在 java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:349) 在 org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) 在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) 在 org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) 在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) 在 org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) 在 org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) 在 org.apache.kafka.common.network.Selector.poll(Selector.java:485) 在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) 在 org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1232) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165)

我做错了什么?从 OpenShift 连接时和从我的计算机连接时,我必须使用哪个主机:端口 - 内部还是外部?

java apache-kafka openshift spring-kafka
© www.soinside.com 2019 - 2024. All rights reserved.