我正在使用 Python SDK 编写一个具有多个计时器触发器的 Azure 函数应用程序。每个计时器触发器都会在 msgraph api 的帮助下从 Microsoft Entra ID 中提取所有用户或组,并将它们写入同样托管在 Azure 上的 Postgres DB。正如我在标题中简要描述的那样,我的函数应用程序经常在“资源和运行状况”选项卡中显示挂起和死锁错误(每 2-3 天一次)。我们使用 EP1 应用服务计划。
这是失败的触发器的简短代码示例:
class PostgreSQLConnector:
def __init__(self):
self.host = os.getenv("PG_HOST")
self.dbname = os.getenv("PG_DB")
self.user = os.getenv("PG_USER")
self.password = os.getenv("PG_PASS")
self.sslmode = "require"
def __enter__(self):
self.connect()
self.cursor = self.connection.cursor()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cursor.close()
self.close_connection()
def insert_many_into_table(self, table_name, columns, entries):
sql_query = f"""
INSERT INTO {table_name} ({', '.join([''.join(name) for name, _ in columns])})
VALUES ({', '.join(['%s'] * len(columns))})
"""
try:
batch_size = 10000
batches = [entries[i:i + batch_size] for i in range(0, len(entries), batch_size)]
commit_interval = 10
execute_many_counts = 0
batch_counter = 0
for i, batch in enumerate(batches):
self.cursor.executemany(sql_query, batch)
batch_counter += len(batch)
execute_many_counts += 1
if execute_many_counts % commit_interval == 0:
self.connection.commit()
self.connection.commit()
except Exception as e:
self.connection.rollback()
raise Exception(f"Error committing changes: {e}")
class GraphHelper:
client_credential: ClientSecretCredential
app_client: GraphServiceClient
def __init__(self, tenant_id, client_id, client_secret):
client_id = client_id
tenant_id = tenant_id
client_secret = client_secret
self.client_credential = ClientSecretCredential(
tenant_id, client_id, client_secret
)
self.app_client = GraphServiceClient(self.client_credential)
async def get_all_entra_users(self):
request_configuration = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
query_parameters=UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
select=self.default_user_select_properties,
top=999
)
)
all_users = []
top_users = await self.app_client.users.get(request_configuration=request_configuration)
for user in top_users.value:
all_users.append(user)
next_link = top_users.odata_next_link
try:
while next_link:
top_users = await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
for user in top_users.value:
all_users.append(user)
next_link = top_users.odata_next_link
except Exception as e:
logging.info(e)
return all_users
tenant_id = os.getenv("AZURE_TENANT_ID")
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")
graph_helper_instance = GraphHelper(tenant_id, client_id, client_secret)
@app.function_name(name="allEntraUsersTimerTrigger")
@app.timer_trigger(schedule="0 0 1 * * *", arg_name="allEntraUsersTimerTrigger", run_on_startup=False,
use_monitor=False)
async def all_entra_users_timer_trigger(allEntraUsersTimerTrigger: func.TimerRequest) -> None:
try:
users = await graph_helper_instance.get_all_entra_users()
user_entries = [tuple([getattr(user, key) for key, _ in SchemaDefinitions.columns_users]) for user in users]
with PostgreSQLConnector() as postgres_helper_instance:
postgres_helper_instance.drop_table(SchemaDefinitions.table_name_users)
postgres_helper_instance.create_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users)
postgres_helper_instance.insert_many_into_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users, user_entries)
except Exception as e:
# Error handling
我需要提取大约 50k 个用户并将其写入数据库,因此这符合我假设的繁重 I/O 功能。在本地运行它没有问题。通过在已部署的函数应用程序上发送 POST 请求来手动触发它也可以正常工作。唯一的问题是触发时经常挂起。我设置代码的方式是否存在根本性错误,可能导致自动触发时挂起和死锁?难道 Postgres 写入不是异步的?有人有想法吗?
如果您需要更多信息,请告诉我
我同意@Rui Jarimba 和@Je Je 的观点,如果您的函数应用程序可以考虑使用单独的存储帐户来避免死锁,并且可以考虑使用 asyncg 包来避免 I/O 阻塞和死锁。请参考以下代码:-
import asyncpg
import os
class PostgreSQLConnector:
def __init__(self):
self.host = "valleypostgresql76.postgres.database.azure.com"
self.dbname = "postgres"
self.user = "xxxxxx"
self.password = "xxxxxx"
self.sslmode = "require"
self.pool = None
async def initialize_pool(self):
self.pool = await asyncpg.create_pool(
user=self.user,
password=self.password,
database=self.dbname,
host=self.host,
ssl=self.sslmode
)
async def __aenter__(self):
if self.pool is None:
await self.initialize_pool()
self.connection = await self.pool.acquire()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.pool.release(self.connection)
async def create_table(self, table_name, columns):
column_defs = ', '.join([f"{name} {type}" for name, type in columns])
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{column_defs}
)
"""
await self.connection.execute(create_table_query)
async def insert_many_into_table(self, table_name, columns, entries):
sql_query = f"""
INSERT INTO {table_name} ({', '.join([''.join(name) for name, _ in columns])})
VALUES ({', '.join(['$' + str(i+1) for i in range(len(columns))])})
"""
try:
batch_size = 10000
batches = [entries[i:i + batch_size] for i in range(0, len(entries), batch_size)]
commit_interval = 10
execute_many_counts = 0
for i, batch in enumerate(batches):
await self.connection.executemany(sql_query, batch)
execute_many_counts += 1
if execute_many_counts % commit_interval == 0:
await self.connection.execute('COMMIT')
await self.connection.execute('COMMIT')
except Exception as e:
await self.connection.execute('ROLLBACK')
raise Exception(f"Error committing changes: {e}")
# Example usage
async def main():
table_name = "users"
columns = [
("id", "SERIAL PRIMARY KEY"),
("name", "VARCHAR(100)"),
("email", "VARCHAR(100)"),
("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
]
async with PostgreSQLConnector() as postgres:
await postgres.create_table(table_name, columns)
# Run the example
import asyncio
asyncio.run(main())
此外,您可以批量获取 50K 用户以提高其并发性:-
import logging
from azure.identity import ClientSecretCredential
from msgraph.core import GraphServiceClient
from msgraph.generated.users.users_request_builder import UsersRequestBuilder
class GraphHelper:
def __init__(self, tenant_id, client_id, client_secret):
self.client_credential = ClientSecretCredential(tenant_id, client_id, client_secret)
self.app_client = GraphServiceClient(self.client_credential)
async def get_all_entra_users(self):
request_configuration = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
query_parameters=UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
select=self.default_user_select_properties,
top=999
)
)
all_users = []
top_users = await self.app_client.users.get(request_configuration=request_configuration)
for user in top_users.value:
all_users.append(user)
next_link = top_users.odata_next_link
try:
while next_link:
top_users = await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
for user in top_users.value:
all_users.append(user)
next_link = top_users.odata_next_link
except Exception as e:
logging.error(f"Error fetching users: {e}")
return all_users
此外,您还可以添加
maxConcurrentRequests
来提高 Azure Functions 性能。
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"extensions": {
"http": {
"maxConcurrentRequests": 100,
"maxOutstandingRequests": 200
}
}
}
甚至诊断和解决问题也会让您更深入地了解您的函数行为:-