如何在Python中的Azure函数输出绑定的上下文中设置队列存储消息TTL?

问题描述 投票:0回答:1

我有一个带有队列输出绑定的python azure函数。我已成功使用此绑定来使函数中的消息排队。是否可以在基础队列或消息本身上设置消息TTL?我不需要在每个消息的基础上进行设置,但是如果这是唯一的选择,则会这样做。

host.json

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  }
}

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    },
    {
      "type": "queue",
      "direction": "out",
      "name": "msg",
      "queueName": "predictions",
      "connection": "AzureWebJobsStorage"
    }
  ]
}

功能代码

import json
import logging
import azure.functions as func
from graphene import Schema
from .helpers import responses
from .schema.Query import Query

def main(req: func.HttpRequest, msg: func.Out[func.QueueMessage]) -> func.HttpResponse:
    logging.info('Executing GraphQL function.')

    try:
        query = req.get_body().decode()
    except ValueError:
        pass

    if query:
        schema = Schema(Query)
        results = schema.execute(query)
        response = responses.graphql(results)

        # Write response to azure queue storage
        message = responses.storage(query, response)
        if message:
            msg.set(message)

        return response
    else:
        return responses.bad_request(
            'Please pass a GraphQL query in the request body.')
python azure-functions azure-queues
1个回答
0
投票

目前仅支持c#语言,您可以将其绑定到CloudQueue类型。如果是其他方法,则必须使用SDK方法来实现。如果您坚持使用此功能,则可以在github issue处注释您的要求。

下面是我的测试代码,用于使用azure-storage-queue 2.1.0在HTTP触发功能中设置TTL。

import logging
import azure.functions as func
from azure.storage.queue import QueueService
import os

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    queue_service = QueueService(connection_string=os.environ['AzureWebJobsStorage'])

    message = req.params.get('message')
    if not message:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            message = req_body.get('message')

    if message:
        queue_service.put_message('myqueue',message,None,300,None)
        return func.HttpResponse(f" {message}!")
    else:
        return func.HttpResponse(
             "Please pass message in the request body",
             status_code=400
        )
© www.soinside.com 2019 - 2024. All rights reserved.