我正在 Kubernetes 集群内以 HA 模式(多个代理)部署 Kafka。部署包括
部署时使用的文件请参考以下文件
Dockerfile
FROM eclipse-temurin:17.0.9_9-jdk-jammy
ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin
LABEL name="kafka" version=${KAFKA_VERSION}
RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
&& rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
&& rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"]
入口点.sh
#!/bin/bash
NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"
ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"
CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
if [[ $i != $REPLICAS ]]; then
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done
mkdir -p $SHARE_DIR/$NODE_ID
if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi
sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties
JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"
echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> /opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >> /opt/kafka/config/kraft/server.properties
echo -e "\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >> /opt/kafka/config/kraft/server.properties
kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties
exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties
kafka.yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: kafka-pv-volume
labels:
type: local
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-pv-claim
namespace: kafka-kraft
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
namespace: kafka-kraft
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
namespace: kafka-kraft
spec:
serviceName: kafka-svc
replicas: 5
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
volumes:
- name: kafka-storage
persistentVolumeClaim:
claimName: kafka-pv-claim
containers:
- name: kafka-container
image: myimage/kafka-kraft:1.0
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '5'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: kafka-kraft
- name: SHARE_DIR
value: /mnt/kafka
volumeMounts:
- name: kafka-storage
mountPath: /mnt/kafka
部署后,所有容器都已启动并运行。然后我使用以下命令连接代理
.\kafka-topics.bat --bootstrap-server kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 --command-config producer.properties --topic hello --create --replication-factor 5
生产者.属性
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
metadata.max.age.ms=1000
将显示一条提示,要求您输入消息。根据示例文本,它会抛出以下错误。
[Producer clientId=console-producer] Received invalid metadata error in produce request on partition hello2-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
到目前为止我所尝试过的
不幸的是,问题仍然存在并且无法生成消息。
我在使用 ingress 和 3 个代理的一项服务时遇到同样的问题。有什么解决办法吗