我正在使用 Azure 函数运行启动多个线程的 Python 脚本(出于性能原因)。一切都按预期运行,但 Azure Functions 日志中仅显示来自 main() 线程的信息日志。 我在 main() 中启动的“辅助”线程中使用的所有日志都不会出现在 Azure Functions 日志中。
有没有办法确保辅助线程的日志显示在 Azure Functions 日志上?
我使用的模块是“logging”和“threading”。
我使用的是Python 3.6;我已经尝试降低辅助线程中的日志记录级别,但不幸的是这没有帮助。 各种辅助线程函数位于不同的模块中。
我的函数具有类似于以下伪代码的结构:
def main()->None:
logging.basicConfig(level=logging.INFO)
logging.info("Starting the process...")
thread1 = threading.Thread(target=foo,args=("one arg",))
thread2 = threading.Thread(target=foo,args=("another arg",))
thread3 = threading.Thread(target=foo,args=("yet another arg",))
thread1.start()
thread2.start()
thread3.start()
logging.info("All threads started successfully!")
return
# in another module
def foo(st:str)->None:
logging.basicConfig(level=logging.INFO)
logging.info(f"Starting thread for arg {st}")
当前的Azure日志输出是:
INFO: Starting the process...
INFO: "All threads started successfully!"
我希望它是这样的:
INFO: Starting the process...
INFO: Starting thread for arg one arg
INFO: Starting thread for arg another arg
INFO: Starting thread for arg yet another arg
INFO: All threads started successfully!
(当然,辅助线程的顺序可以是任何顺序)
Azure 函数 Python 工作框架将
AsyncLoggingHandler
设置为根记录器的处理程序。从该处理程序到其目的地,日志似乎沿路径被 invocation_id
过滤。
如果框架自行启动线程,则设置
invocation_id
,就像 main
同步函数一样。另一方面,如果我们自己从 main
函数启动线程,则必须在启动的线程中设置 invocation_id
以便日志到达其目的地。
这个
azure_functions_worker.dispatcher.get_current_invocation_id
函数检查当前线程是否有正在运行的事件循环。如果没有找到正在运行的循环,它只会检查 azure_functions_worker.dispatcher._invocation_id_local
(线程本地存储)是否有名为 v
的属性,以获取 invocation_id
的值。
因为我们启动的线程没有正在运行的事件循环,所以我们必须从
invocation_id
获取 context
并将其设置在我们启动的每个线程中的 azure_functions_worker.dispatcher._invocation_id_local.v
上。
invocation_id
由框架在 context
函数的 main
参数中提供。
在 Ubuntu 18.04、azure-functions-core-tools-4 和 Python 3.8 上进行了测试。
import sys
import azure.functions as func
import logging
import threading
# import thread local storage
from azure_functions_worker.dispatcher import (
_invocation_id_local as tls,
)
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
logging.info("Starting the process...")
thread1 = threading.Thread(
target=foo,
args=(
context,
"one arg",
),
)
thread2 = threading.Thread(
target=foo,
args=(
context,
"another arg",
),
)
thread3 = threading.Thread(
target=foo,
args=(
context,
"yet another arg",
),
)
thread1.start()
thread2.start()
thread3.start()
logging.info("All threads started successfully!")
name = req.params.get("name")
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get("name")
if name:
return func.HttpResponse(
f"Hello, {name}. This HTTP triggered function executed successfully."
)
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
status_code=200,
)
# in another module
def foo(context, st: str) -> None:
# invocation_id_local = sys.modules[
# "azure_functions_worker.dispatcher"
# ]._invocation_id_local
# invocation_id_local.v = context.invocation_id
tls.v = context.invocation_id
logging.info(f"Starting thread for arg {st}")
这必须是您的 Azure 设置中的内容:在非 Azure 设置中,它会按预期工作。您应该为您的线程添加
join()
调用。并且 basicConfig()
只能从主入口点调用一次。
您的线程是否受 I/O 限制?由于 GIL,拥有多个计算绑定线程并不会给您的代码带来任何性能优势。围绕
concurrent.futures.ProcessPoolExecutor
或 multiprocessing
构建代码可能会更好。
这是一个 Repl,它显示了代码的稍微修改版本,可以按预期工作。
我可能是错的,但我怀疑 azure 在守护线程中运行你的 main 函数。
引用https://docs.python.org/3/library/threading.html:当没有存活的非守护线程时,整个Python程序将退出。
当Thread构造函数中没有设置daemon时,它会重用父线程的值。 您可以在启动您的子线程之前通过打印
thread1.daemon
来检查这是否是您的问题。
无论如何,我可以在我的电脑上重现这个问题(没有任何Azure,只是简单的python3):
def main():
logging.basicConfig(level=logging.INFO)
logging.info("Starting the process...")
thread1 = threading.Thread(target=foo,args=("one arg",),daemon=True)
thread2 = threading.Thread(target=foo,args=("another arg",),daemon=True)
thread3 = threading.Thread(target=foo,args=("yet another arg",),daemon=True)
thread1.start()
thread2.start()
thread3.start()
logging.info("All threads started successfully!")
return
def foo(st):
for i in range(2000): # Giving a bit a of time for race condition to happen
print ('tamere', file = open('/dev/null','w'))
logging.basicConfig(level=logging.INFO)
logging.info(f"Starting thread for arg {st}")
main()
如果我将守护进程强制设置为 False/未定义,则它可以工作。因此,我猜你的问题是 azure 在守护程序线程中启动你的主函数,并且由于你没有将守护程序标志覆盖为 False,整个过程会立即退出。
PD:我对 Azure 一无所知,有可能你确实试图以错误的方式做某事,并且有另一个界面可以完全按照你想要的方式做,但以 Azure 期望你做的方式。因此,这个答案可能只是对所发生情况的解释,而不是真正的指导。
Azure Functions 是一个异步环境。
如果您定义了
async def
,它将与 asyncio
一起运行。
否则它将以
concurrent.futures.ThreadPoolExecutor
运行。
最好定义你的函数
async
。
线程工作。您不需要手动启动线程。线程池执行您的阻塞代码。你必须让它为你工作。
我无法安装 Nizam 的答案中使用的 azure-functions-worker,因为 Linux 上存在一些无法安装 grpcio 依赖项的问题。 Azure 还警告不要在函数中使用 azure-functions-worker,这样你就不会意外修改某些关键内容。相反,你应该这样做:
import azure.functions as func
import threading
import logging
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
t1 = threading.Thread(target=thread_function, args=(context, 'Thread1 used.'))
t1.start()
return func.HttpResponse("Ok", status_code=200)
def thread_function(context: func.Context, message: str):
context.thread_local_storage.invocation_id = context.invocation_id
for _ in range(10):
logging.info(message)
我在这里找到了这个。