我正在Kubernetes上运行3-zookeeper-cluster和3-kafka-cluster。Kafka似乎正在运行。但是,如果我向某个主题发出一些消息并检查该主题,则根本没有消息。
这是我的经纪人说的。那表示有些无效的接收或某些东西,有趣的是试图使主题正常工作但产生。我也可以观看主题或架构,这些主题或架构是我在Topics-ui(代理人的GUI工具)上早期制作的。模式注册,连接,Rest的日志很好,因此代理似乎运行良好。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at kafka.network.Processor.poll(SocketServer.scala:863)
at kafka.network.Processor.run(SocketServer.scala:762)
at java.lang.Thread.run(Thread.java:748)
这是我的经纪人配置,其中[
port {
container_port = 9092
}
env {
name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_DEFAULT_REPLICATION_FACTOR"
value = "3"
}
env {
name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"
value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
}
env {
name = "KAFKA_ZOOKEEPER_CONNECT"
value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
}
env {
name = "POD_IP"
value_from {
field_ref {
field_path = "status.podIP"
}
}
}
env {
name = "HOST_IP"
value_from {
field_ref {
field_path = "status.hostIP"
}
}
}
env {
name = "POD_NAME"
value_from {
field_ref {
field_path = "metadata.name"
}
}
}
env {
name = "POD_NAMESPACE"
value_from {
field_ref {
field_path = "metadata.namespace"
}
}
}
command = [
"sh",
"-exec",
"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
]
服务
resource "kubernetes_service" "kafka-service" {
metadata {
name = "kafka-service"
labels = {
app = "broker"
}
}
spec {
selector = {
app = "broker"
}
port {
port = 9092
}
cluster_ip = "None"
}
尝试产生的代码
kafka-console-producer --broker-list kafka-service:9092 --topic test
socket.request.max.bytes
。 socket.request.max.bytes
中增加server.properties
的大小并重新启动群集,然后再试一次。