如何使用 Python 从 Azure Functions 中的辅助线程重定向日志

问题描述 投票:0回答:5

我正在使用 Azure 函数运行启动多个线程的 Python 脚本(出于性能原因)。一切都按预期运行,但 Azure Functions 日志中仅显示来自 main() 线程的信息日志。 我在 main() 中启动的“辅助”线程中使用的所有日志都不会出现在 Azure Functions 日志中。

有没有办法确保辅助线程的日志显示在 Azure Functions 日志上?

我使用的模块是“logging”和“threading”。

我使用的是Python 3.6;我已经尝试降低辅助线程中的日志记录级别,但不幸的是这没有帮助。 各种辅助线程函数位于不同的模块中。

我的函数具有类似于以下伪代码的结构:

def main()->None:
  logging.basicConfig(level=logging.INFO)
  logging.info("Starting the process...")
  thread1 = threading.Thread(target=foo,args=("one arg",))
  thread2 = threading.Thread(target=foo,args=("another arg",))
  thread3 = threading.Thread(target=foo,args=("yet another arg",))
  thread1.start()
  thread2.start()
  thread3.start()
  logging.info("All threads started successfully!")
  return

# in another module

def foo(st:str)->None:
  logging.basicConfig(level=logging.INFO)
  logging.info(f"Starting thread for arg {st}")

当前的Azure日志输出是:

INFO: Starting the process...
INFO: "All threads started successfully!"

我希望它是这样的:

INFO: Starting the process...
INFO: Starting thread for arg one arg
INFO: Starting thread for arg another arg
INFO: Starting thread for arg yet another arg
INFO: All threads started successfully!

(当然,辅助线程的顺序可以是任何顺序)

python multithreading azure logging azure-functions
5个回答
4
投票

Azure 函数 Python 工作框架将

AsyncLoggingHandler
设置为根记录器的处理程序。从该处理程序到其目的地,日志似乎沿路径被
invocation_id
过滤。

如果框架自行启动线程,则设置

invocation_id
,就像
main
同步函数一样。另一方面,如果我们自己从
main
函数启动线程,则必须在启动的线程中设置
invocation_id
以便日志到达其目的地。

这个

azure_functions_worker.dispatcher.get_current_invocation_id
函数检查当前线程是否有正在运行的事件循环。如果没有找到正在运行的循环,它只会检查
azure_functions_worker.dispatcher._invocation_id_local
(线程本地存储)是否有名为
v
的属性,以获取
invocation_id
的值。

因为我们启动的线程没有正在运行的事件循环,所以我们必须从

invocation_id
获取
context
并将其设置在我们启动的每个线程中的
azure_functions_worker.dispatcher._invocation_id_local.v
上。
invocation_id
由框架在
context
函数的
main
参数中提供。

在 Ubuntu 18.04、azure-functions-core-tools-4 和 Python 3.8 上进行了测试。

import sys
import azure.functions as func
import logging
import threading

# import thread local storage
from azure_functions_worker.dispatcher import (
    _invocation_id_local as tls,
)


def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
    logging.info("Starting the process...")
    thread1 = threading.Thread(
        target=foo,
        args=(
            context,
            "one arg",
        ),
    )
    thread2 = threading.Thread(
        target=foo,
        args=(
            context,
            "another arg",
        ),
    )
    thread3 = threading.Thread(
        target=foo,
        args=(
            context,
            "yet another arg",
        ),
    )
    thread1.start()
    thread2.start()
    thread3.start()
    logging.info("All threads started successfully!")

    name = req.params.get("name")
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get("name")

    if name:
        return func.HttpResponse(
            f"Hello, {name}. This HTTP triggered function executed successfully."
        )
    else:
        return func.HttpResponse(
            "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
            status_code=200,
        )


# in another module


def foo(context, st: str) -> None:
    # invocation_id_local = sys.modules[
    #     "azure_functions_worker.dispatcher"
    # ]._invocation_id_local
    # invocation_id_local.v = context.invocation_id

    tls.v = context.invocation_id

    logging.info(f"Starting thread for arg {st}")

https://github.com/Azure/azure-functions-python-worker/blob/81b84102dc14b7d209ad7e00be68f25c37987c1e/azure_functions_worker/dispatcher.py


0
投票

这必须是您的 Azure 设置中的内容:在非 Azure 设置中,它会按预期工作。您应该为您的线程添加

join()
调用。并且
basicConfig()
只能从主入口点调用一次。

您的线程是否受 I/O 限制?由于 GIL,拥有多个计算绑定线程并不会给您的代码带来任何性能优势。围绕

concurrent.futures.ProcessPoolExecutor
multiprocessing
构建代码可能会更好。

这是一个 Repl,它显示了代码的稍微修改版本,可以按预期工作。


0
投票

我可能是错的,但我怀疑 azure 在守护线程中运行你的 main 函数。

引用https://docs.python.org/3/library/threading.html:当没有存活的非守护线程时,整个Python程序将退出。

当Thread构造函数中没有设置daemon时,它会重用父线程的值。 您可以在启动您的子线程之前通过打印

thread1.daemon
来检查这是否是您的问题。

无论如何,我可以在我的电脑上重现这个问题(没有任何Azure,只是简单的python3):

def main():
  logging.basicConfig(level=logging.INFO)
  logging.info("Starting the process...")
  thread1 = threading.Thread(target=foo,args=("one arg",),daemon=True)
  thread2 = threading.Thread(target=foo,args=("another arg",),daemon=True)
  thread3 = threading.Thread(target=foo,args=("yet another arg",),daemon=True)
  thread1.start()
  thread2.start()
  thread3.start()
  logging.info("All threads started successfully!")
  return

def foo(st):
  for i in range(2000): # Giving a bit a of time for race condition to happen
    print ('tamere', file = open('/dev/null','w'))
  logging.basicConfig(level=logging.INFO)
  logging.info(f"Starting thread for arg {st}")

main()

如果我将守护进程强制设置为 False/未定义,则它可以工作。因此,我猜你的问题是 azure 在守护程序线程中启动你的主函数,并且由于你没有将守护程序标志覆盖为 False,整个过程会立即退出。

PD:我对 Azure 一无所知,有可能你确实试图以错误的方式做某事,并且有另一个界面可以完全按照你想要的方式做,但以 Azure 期望你做的方式。因此,这个答案可能只是对所发生情况的解释,而不是真正的指导。


0
投票

Azure Functions 是一个异步环境。

如果您定义了

async def
,它将与
asyncio
一起运行。

否则它将以

concurrent.futures.ThreadPoolExecutor
运行。

最好定义你的函数

async

线程工作。您不需要手动启动线程。线程池执行您的阻塞代码。你必须让它为你工作。

https://learn.microsoft.com/en-us/azure/azure-functions/functions-app-settings#python_threadpool_thread_count


0
投票

我无法安装 Nizam 的答案中使用的 azure-functions-worker,因为 Linux 上存在一些无法安装 grpcio 依赖项的问题。 Azure 还警告不要在函数中使用 azure-functions-worker,这样你就不会意外修改某些关键内容。相反,你应该这样做:

import azure.functions as func
import threading
import logging

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
    t1 = threading.Thread(target=thread_function, args=(context, 'Thread1 used.'))
    t1.start()
    
    return func.HttpResponse("Ok", status_code=200)
    
def thread_function(context: func.Context, message: str):
    context.thread_local_storage.invocation_id = context.invocation_id
    for _ in range(10):
        logging.info(message)

我在这里找到了这个。

© www.soinside.com 2019 - 2024. All rights reserved.