Keda 服务总线:所有消息都由 keda scaledJob 的单个 pod 处理

问题描述 投票:0回答:0

我正在使用 Azure Kubernetes 服务 (AKS) 平台,并使用 KEDA“ScaledJob”进行长时间运行的作业。在此,Azure 服务总线队列触发器已用于自动触发作业。现在,当我在 Azure 服务总线中添加消息时,KEDA 将自动触发作业并根据配置创建节点/pod。但在这种情况下,尽管许多 Pod 正在扩展,但所有消息都只由一个 Pod 接收和处理。

以下是我的yml文件

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: {{ .Chart.Name }}
spec:
  jobTargetRef:
    backoffLimit: 4
    parallelism: 1
    completions: 1
    activeDeadlineSeconds: 300
    template:
      spec:
        imagePullSecrets:
          - name: {{ .Values.image.imagePullSecrets }}
        terminationGracePeriodSeconds: 30
        dnsPolicy: ClusterFirst
        volumes:
          - name: azure
            azureFile:
              shareName: sharenameone
              secretName: secret-sharenameone
              readOnly: true
          - name: one-storage
            emptyDir: {}
          - name: "logging-volume-file"
            persistentVolumeClaim:
              claimName: "azure-file-logging"        
        initContainers:
          - name: test-java-init
            image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            securityContext:
              readOnlyRootFilesystem: true
            resources:
              requests:
                cpu: 100m
                memory: 300Mi
              limits:
                cpu: 200m
                memory: 400Mi
            volumeMounts:
              - name: azure
                mountPath: /mnt/azure
              - name: one-storage
                mountPath: /certs
        containers:
          - name: {{ .Chart.Name }}
            image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            env:
              {{- include "chart.envVars" . | nindent 14 }}
              - name: JAVA_OPTS
                value: >-
                    {{ .Values.application.javaOpts }}          
              - name: application_name
                value: "test_application"
              - name: queueName
                value: "test-queue-name"
              - name: servicebusconnstrenv
                valueFrom: 
                  secretKeyRef:
                    name: secrets-service-bus
                    key: service_bus_conn_str
            volumeMounts:
              - name: cert-storage
                mountPath: /certs
              - name: "logging-volume-azure-file"
                mountPath: "/mnt/logging"
            resources:
              {{- toYaml .Values.resources | nindent 14 }}                   
  pollingInterval: 30
  maxReplicaCount: 5
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 20
  triggers:
  - type: azure-servicebus
    metadata:
      queueName: "test-queue-name"
      connectionFromEnv: servicebusconnstrenv
      messageCount: "1"

这是我的 azure 函数监听器

 @FunctionName("TestServiceBusTrigger")
  public void TestServiceBusTriggerHandler(
      @ServiceBusQueueTrigger(
              name = "msg",
              queueName = "%TEST_QUEUE_NAME%",
              connection = "ServiceBusConnectionString")
          final String inputMessage,
      final ExecutionContext context) {

    final java.util.logging.Logger contextLogger = context.getLogger();
    System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");
   
    try {
      // all the processing goes here 
    } catch (Exception e) {
     //Exception handling
    }
  }

需要添加什么配置,以便每个扩展的 pod 处理单个消息并终止?

azure kubernetes azureservicebus keda keda-scaledjob
© www.soinside.com 2019 - 2024. All rights reserved.