如何访问Strimzi创建的Kafka Connect?

问题描述 投票:0回答:1

我正在尝试通过Redpanda控制台访问Strimzi创建的Kafka Connect。

我通过

安装了我的Kafka Connect
kubectl apply --filename=hm-kafka-iot-kafka-connect.yaml

hm-kafka-iot-kafka-connect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: hm-kafka-iot-kafka-connect
  namespace: hm-kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
  replicas: 5
  bootstrapServers: hm-kafka-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: hm-kafka-cluster-ca-cert
        certificate: ca.crt
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: hm-iot-db-credentials-volume
        secret:
          secretName: hm-iot-db-credentials

我安装了 Redpanda 控制台

helm upgrade \
  redpanda-console \
  console \
  --install \
  --repo=https://charts.redpanda.com \
  --namespace=hm-redpanda-console \
  --create-namespace \
  --values=my-values.yaml

my-values.yaml

console:
  config:
    kafka:
      brokers:
        - hm-kafka-kafka-bootstrap.hm-kafka.svc:9092
      schemaRegistry:
        enabled: true
        urls:
          - http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081
    connect:
      enabled: true
      clusters:
        - name: hm-kafka-iot-kafka-connect
          url: http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083

但是,我在连接由 Strimzi 在 Redpanda 控制台 pod 日志中创建的 Kafka Connect 时出错:

{"level":"info","ts":"2023-05-06T05:12:24.156Z","msg":"started Redpanda Console","version":"v2.2.3","built_at":"1679491937"}
{"level":"info","ts":"2023-05-06T05:12:24.156Z","msg":"connecting to Kafka seed brokers, trying to fetch cluster metadata"}
{"level":"info","ts":"2023-05-06T05:12:24.165Z","msg":"successfully connected to kafka cluster","advertised_broker_count":1,"topic_count":12,"controller_id":0,"kafka_version":"v3.4"}
{"level":"info","ts":"2023-05-06T05:12:24.165Z","msg":"creating schema registry client and testing connectivity"}
{"level":"info","ts":"2023-05-06T05:12:24.175Z","msg":"successfully tested schema registry connectivity"}
{"level":"info","ts":"2023-05-06T05:12:24.176Z","msg":"creating Kafka connect HTTP clients and testing connectivity to all clusters"}
{"level":"warn","ts":"2023-05-06T05:12:24.182Z","msg":"connect cluster is not reachable","cluster_name":"hm-kafka-iot-kafka-connect","cluster_address":"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused"}
{"level":"info","ts":"2023-05-06T05:12:24.182Z","msg":"tested Kafka connect cluster connectivity","successful_clusters":0,"failed_clusters":1}
{"level":"info","ts":"2023-05-06T05:12:24.182Z","msg":"successfully create Kafka connect service"}
{"level":"info","ts":"2023-05-06T05:12:24.552Z","msg":"Server listening on address","address":"[::]:8080","port":8080}
{"level":"warn","ts":"2023-05-06T05:13:05.781Z","msg":"failed to list connectors from Kafka connect cluster","cluster_name":"hm-kafka-iot-kafka-connect","cluster_address":"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/connectors?expand=info&expand=status\": dial tcp 10.43.193.144:8083: connect: connection refused"}
{"level":"error","ts":"2023-05-06T05:13:05.807Z","msg":"Sending REST error","cluster_name":"hm-kafka-iot-kafka-connect","route":"/api/kafka-connect/clusters/hm-kafka-iot-kafka-connect","method":"GET","status_code":503,"remote_address":"127.0.0.1:43902","public_error":"Failed to get cluster info: Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused"}

在 Redpanda 控制台 UI 中,它显示:

但是,如果我在本地

port-forward
Kafka Connect,我可以通过

# Open first terminal
kubectl port-forward service/hm-kafka-iot-kafka-connect-connect-api --namespace=hm-kafka 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
Handling connection for 8083

# Open second terminal
➜ curl --location 'http://localhost:8083/connectors?expand=info&expand=status'
{"hm-motor-jdbc-sink-kafka-connector":{"info":{"name":"hm-motor-jdbc-sink-kafka-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","table.name.format":"motor","transforms.convertTimestamp.target.type":"Timestamp","connection.password":"${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}","topics":"hm.motor","tasks.max":"8","batch.size":"100000","transforms":"convertTimestamp","transforms.convertTimestamp.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","value.converter.schema.registry.url":"http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081","transforms.convertTimestamp.field":"timestamp","connection.user":"${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}","name":"hm-motor-jdbc-sink-kafka-connector","connection.url":"jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db","value.converter":"io.confluent.connect.avro.AvroConverter","insert.mode":"insert","pk.mode":"record_value","pk.fields":"timestamp"},"tasks":[{"connector":"hm-motor-jdbc-sink-kafka-connector","task":0},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":1},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":2},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":3},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":4},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":5},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":6},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":7}],"type":"sink"},"status":{"name":"hm-motor-jdbc-sink-kafka-connector","connector":{"state":"RUNNING","worker_id":"10.42.0.21:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.42.0.21:8083"},{"id":1,"state":"RUNNING","worker_id":"10.42.0.18:8083"},{"id":2,"state":"RUNNING","worker_id":"10.42.0.20:8083"},{"id":3,"state":"RUNNING","worker_id":"10.42.0.19:8083"},{"id":4,"state":"RUNNING","worker_id":"10.42.0.22:8083"},{"id":5,"state":"RUNNING","worker_id":"10.42.0.21:8083"},{"id":6,"state":"RUNNING","worker_id":"10.42.0.18:8083"},{"id":7,"state":"RUNNING","worker_id":"10.42.0.20:8083"}],"type":"sink"}}}

有什么想法吗?谢谢!

apache-kafka apache-kafka-connect strimzi redpanda-console
1个回答
0
投票

解决方案是添加一个新的

NetworkPolcy
.

背景

我的 Redpanda 控制台 pod YAML 看起来像

kubectl get pod redpanda-console-7fb65b7f5-87cxk -n hm-redpanda-console -o yam
apiVersion: v1
kind: Pod
metadata:
  name: redpanda-console-77595c8f75-ltspx
  namespace: hm-redpanda-console
  labels:
    app.kubernetes.io/name: console
    # ...

我现有的

NetworkPolicy
由 Strimzi 创建的用于我的 Kafka Connect 的 YAML 看起来像

kubectl get networkpolicy hm-kafka-iot-kafka-connect-connect -n hm-kafka -o yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: hm-kafka-iot-kafka-connect-connect
  namespace: hm-kafka
  labels:
    # ...
spec:
  podSelector:
    matchLabels:
      strimzi.io/cluster: hm-kafka-iot-kafka-connect
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: hm-kafka-iot-kafka-connect-connect
  policyTypes:
    - Ingress
  ingress:
    - ports:
        - protocol: TCP
          port: 8083
      from:
        - podSelector:
            matchLabels:
              strimzi.io/cluster: hm-kafka-iot-kafka-connect
              strimzi.io/kind: KafkaConnect
              strimzi.io/name: hm-kafka-iot-kafka-connect-connect
        - podSelector:
            matchLabels:
              strimzi.io/kind: cluster-operator
# ...

解决方案

基于我现有的 Kafka Connect

NetworkPolicy
,我需要创建一个新的
NetworkPolicy
.

注意:编辑现有的 Kafka Connect

NetworkPolicy
不起作用,因为 Strimzi 将还原更改。

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: hm-kafka-iot-kafka-connect-network-policy
  namespace: hm-kafka
spec:
  podSelector:
    matchLabels:
      strimzi.io/cluster: hm-kafka-iot-kafka-connect
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: hm-kafka-iot-kafka-connect-connect
  policyTypes:
    - Ingress
  ingress:
    - ports:
        - protocol: TCP
          port: 8083
      from:
        - namespaceSelector:
            matchLabels:
              name: hm-redpanda-console
          podSelector:
            matchLabels:
              app.kubernetes.io/name: console

这里我们允许命名空间

app.kubernetes.io/name: console
中带有标签
hm-redpanda-console
的pod访问Kafka Connect。

如果你想在任何命名空间中允许带有标签

app.kubernetes.io/name: console
的pod,你可以使用

# ...
        - namespaceSelector: {}
          podSelector:
            matchLabels:
              app.kubernetes.io/name: console

现在我的 Redpanda 控制台可以成功连接我的 Kafka Connect:

© www.soinside.com 2019 - 2024. All rights reserved.