Azure Function App - 频繁挂起和死锁

问题描述 投票:0回答:1

我正在使用 Python SDK 编写一个具有多个计时器触发器的 Azure 函数应用程序。每个计时器触发器都会在 msgraph api 的帮助下从 Microsoft Entra ID 中提取所有用户或组,并将它们写入同样托管在 Azure 上的 Postgres DB。正如我在标题中简要描述的那样,我的函数应用程序经常在“资源和运行状况”选项卡中显示挂起和死锁错误(每 2-3 天一次)。我们使用 EP1 应用服务计划。

这是失败的触发器的简短代码示例:

  1. PostgreSQLConnector,我在其中连接到 Postgres 数据库并插入:
class PostgreSQLConnector:
        def __init__(self):
            self.host = os.getenv("PG_HOST")
            self.dbname = os.getenv("PG_DB")
            self.user = os.getenv("PG_USER")
            self.password = os.getenv("PG_PASS")
            self.sslmode = "require"
    
       def __enter__(self):
            self.connect()
            self.cursor = self.connection.cursor()
            return self

       def __exit__(self, exc_type, exc_value, traceback):
            self.cursor.close()
            self.close_connection()
    
    def insert_many_into_table(self, table_name, columns, entries):
        sql_query = f"""
        INSERT INTO {table_name} ({', '.join([''.join(name) for name, _ in columns])})
        VALUES ({', '.join(['%s'] * len(columns))})
        """
        try:
            batch_size = 10000
            batches = [entries[i:i + batch_size] for i in range(0, len(entries), batch_size)]
            commit_interval = 10
            execute_many_counts = 0
            batch_counter = 0
            for i, batch in enumerate(batches):
                self.cursor.executemany(sql_query, batch)
                batch_counter += len(batch)
                execute_many_counts += 1
                if execute_many_counts % commit_interval == 0:
                    self.connection.commit()
            self.connection.commit()
        except Exception as e:
            self.connection.rollback()
            raise Exception(f"Error committing changes: {e}")
  1. 一个 msgraph api 类实例,我在其中验证并从 Entra 中提取所有用户:
class GraphHelper:
    client_credential: ClientSecretCredential
    app_client: GraphServiceClient

    def __init__(self, tenant_id, client_id, client_secret):
        client_id = client_id
        tenant_id = tenant_id
        client_secret = client_secret
        self.client_credential = ClientSecretCredential(
            tenant_id, client_id, client_secret
        )
        self.app_client = GraphServiceClient(self.client_credential)

    async def get_all_entra_users(self):
        request_configuration = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
            query_parameters=UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
                select=self.default_user_select_properties,
                top=999
            )
        )
        all_users = []
        top_users = await self.app_client.users.get(request_configuration=request_configuration)
        for user in top_users.value:
            all_users.append(user)
        next_link = top_users.odata_next_link
        try:
            while next_link:
                top_users = await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
                for user in top_users.value:
                    all_users.append(user)
                next_link = top_users.odata_next_link
        except Exception as e:
            logging.info(e)
        return all_users
  1. function_app.py 触发器定义,从 msgraph 中拉取用户并将其写入 PostgresDB
tenant_id = os.getenv("AZURE_TENANT_ID")
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")

graph_helper_instance = GraphHelper(tenant_id, client_id, client_secret)

@app.function_name(name="allEntraUsersTimerTrigger")
@app.timer_trigger(schedule="0 0 1 * * *", arg_name="allEntraUsersTimerTrigger", run_on_startup=False,
              use_monitor=False)
async def all_entra_users_timer_trigger(allEntraUsersTimerTrigger: func.TimerRequest) -> None:
    try:
        users = await graph_helper_instance.get_all_entra_users()
        user_entries = [tuple([getattr(user, key) for key, _ in SchemaDefinitions.columns_users]) for user in users]
        with PostgreSQLConnector() as postgres_helper_instance:
            postgres_helper_instance.drop_table(SchemaDefinitions.table_name_users)
            postgres_helper_instance.create_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users)
            postgres_helper_instance.insert_many_into_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users, user_entries)
    except Exception as e:
        # Error handling

我需要提取大约 50k 个用户并将其写入数据库,因此这符合我假设的繁重 I/O 功能。在本地运行它没有问题。通过在已部署的函数应用程序上发送 POST 请求来手动触发它也可以正常工作。唯一的问题是触发时经常挂起。我设置代码的方式是否存在根本性错误,可能导致自动触发时挂起和死锁?难道 Postgres 写入不是异步的?有人有想法吗?

如果您需要更多信息,请告诉我

postgresql azure azure-functions microsoft-graph-api microsoft-entra-id
1个回答
0
投票

我同意@Rui Jarimba 和@Je Je 的观点,如果您的函数应用程序可以考虑使用单独的存储帐户来避免死锁,并且可以考虑使用 asyncg 包来避免 I/O 阻塞和死锁。请参考以下代码:-

import asyncpg
import os

class PostgreSQLConnector:
    def __init__(self):
        self.host = "valleypostgresql76.postgres.database.azure.com"
        self.dbname = "postgres"
        self.user = "xxxxxx"
        self.password = "xxxxxx"
        self.sslmode = "require"
        self.pool = None
    
    async def initialize_pool(self):
        self.pool = await asyncpg.create_pool(
            user=self.user,
            password=self.password,
            database=self.dbname,
            host=self.host,
            ssl=self.sslmode
        )

    async def __aenter__(self):
        if self.pool is None:
            await self.initialize_pool()
        self.connection = await self.pool.acquire()
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.pool.release(self.connection)
    
    async def create_table(self, table_name, columns):
        column_defs = ', '.join([f"{name} {type}" for name, type in columns])
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {column_defs}
        )
        """
        await self.connection.execute(create_table_query)

    async def insert_many_into_table(self, table_name, columns, entries):
        sql_query = f"""
        INSERT INTO {table_name} ({', '.join([''.join(name) for name, _ in columns])})
        VALUES ({', '.join(['$' + str(i+1) for i in range(len(columns))])})
        """
        try:
            batch_size = 10000
            batches = [entries[i:i + batch_size] for i in range(0, len(entries), batch_size)]
            commit_interval = 10
            execute_many_counts = 0
            for i, batch in enumerate(batches):
                await self.connection.executemany(sql_query, batch)
                execute_many_counts += 1
                if execute_many_counts % commit_interval == 0:
                    await self.connection.execute('COMMIT')
            await self.connection.execute('COMMIT')
        except Exception as e:
            await self.connection.execute('ROLLBACK')
            raise Exception(f"Error committing changes: {e}")

# Example usage
async def main():
    table_name = "users"
    columns = [
        ("id", "SERIAL PRIMARY KEY"),
        ("name", "VARCHAR(100)"),
        ("email", "VARCHAR(100)"),
        ("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
    ]

    async with PostgreSQLConnector() as postgres:
        await postgres.create_table(table_name, columns)

# Run the example
import asyncio
asyncio.run(main())

enter image description here

此外,您可以批量获取 50K 用户以提高其并发性:-

import logging
from azure.identity import ClientSecretCredential
from msgraph.core import GraphServiceClient
from msgraph.generated.users.users_request_builder import UsersRequestBuilder

class GraphHelper:
    def __init__(self, tenant_id, client_id, client_secret):
        self.client_credential = ClientSecretCredential(tenant_id, client_id, client_secret)
        self.app_client = GraphServiceClient(self.client_credential)

    async def get_all_entra_users(self):
        request_configuration = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
            query_parameters=UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
                select=self.default_user_select_properties,
                top=999
            )
        )
        all_users = []
        top_users = await self.app_client.users.get(request_configuration=request_configuration)
        for user in top_users.value:
            all_users.append(user)
        next_link = top_users.odata_next_link
        try:
            while next_link:
                top_users = await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
                for user in top_users.value:
                    all_users.append(user)
                next_link = top_users.odata_next_link
        except Exception as e:
            logging.error(f"Error fetching users: {e}")
        return all_users

此外,您还可以添加

maxConcurrentRequests
来提高 Azure Functions 性能。

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "extensions": {
    "http": {
      "maxConcurrentRequests": 100,
      "maxOutstandingRequests": 200
    }
  }
}

甚至诊断和解决问题也会让您更深入地了解您的函数行为:-

enter image description here

您还可以参考我的SO回答1来监控功能。另外,您可以检查我的SO回答2以避免死锁问题。

© www.soinside.com 2019 - 2024. All rights reserved.