SQLAlchemy 异步联接不引入列

问题描述 投票:0回答:1

我正在尝试使用 sqlalchemy 连接两个表,我想要实现的查询是:

SELECT *
FROM service_order
JOIN vendor ON service_order.vendor_id = vendor.vendor_id
WHERE service_order.service_order_id = 898;

下面我当前的 sqlalchemy 语句仅生成一个对象 ,其中仅包含来自 service_order 表的列,而不是将列从供应商表添加到此对象中,而是创建一个供应商对象 作为值之一。

from typing import List, Sequence, Dict, Union

from sqlalchemy import select
from sqlalchemy import func

from sqlalchemy.orm import Session, joinedload, lazyload, selectinload

from api.model import CreateServiceOrderRequest
from db.model import ServiceOrder as DBServiceOrder
from db.model import ServiceOrderItem as DBServiceOrderItem
from db.model import Vendor as DBVendor
from db.engine import DatabaseSession as AsyncSession
from ..exceptions import DBRecordNotFoundException, InvalidPropertyException

from core.domains.service_order.service_order_model import ServiceOrderModel


async def get_service_order_by_id(
    session: AsyncSession, service_order_id: int
) -> ServiceOrderModel:
    async with session:
        statement = (
            select(DBServiceOrder)
            .options(
                joinedload(DBServiceOrder.service_order_item).joinedload(
                    DBServiceOrderItem.service_order_item_receive
                ),
            )
            .join(DBVendor, DBServiceOrder.vendor_id == DBVendor.vendor_id)
            .where(DBServiceOrder.service_order_id == service_order_id)
        )
        result = await session.scalars(statement)
        service_order = result.first()


    return service_order

以下是我的 service_order 和供应商的数据库模型。

from datetime import datetime
from typing import Optional, List

from sqlalchemy import func, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base_model import BaseModel


class ServiceOrder(BaseModel):
    __tablename__ = "service_order"

    service_order_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    vendor_id: Mapped[float] = mapped_column(ForeignKey("vendor.vendor_id"))

    vendor: Mapped["Vendor"] = relationship(
        "Vendor", lazy="selectin", back_populates="service_order"
    )
    service_order_item: Mapped[List["ServiceOrderItem"]] = relationship(
        "ServiceOrderItem", back_populates="service_order"
    )

from datetime import datetime
from typing import Optional, List

from sqlalchemy import func, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base_model import BaseModel


class Vendor(BaseModel):
    __tablename__ = "vendor"

    vendor_id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    company_name: Mapped[Optional[str]] = mapped_column(String(200))
 

    component: Mapped[List["Component"]] = relationship(back_populates="vendor")
    motor: Mapped[List["Motor"]] = relationship(foreign_keys="Motor.vendor_id")
    motor_field_track: Mapped[List['MotorFieldTrack']] = relationship('MotorFieldTrack', back_populates='vendor')
    service_order: Mapped[List['ServiceOrder']] = relationship('ServiceOrder', back_populates='vendor')
    motor_order: Mapped[List['MotorOrder']] = relationship('MotorOrder', back_populates='vendor')

我尝试过 joinloading、lazyloading、joininload 的各种组合,但似乎没有任何效果。它不是创建包含所有列的对象,而是继续创建供应商对象而不是该表中的列。如何返回一个包含两个表中所有列的对象?如果我计划从第二个表中选择哪些列就更好了,我该如何实现这一目标?

python sql sqlalchemy orm
1个回答
0
投票

通常,您只想在获取数千条记录或其他内容时执行此操作,因为您几乎是在针对 ORM 工作。请注意,您必须使用

execute
而不是
scalars
来访问包含其他“列”的元组。另外,如果您不使用 ORM 功能,那么您也不会想使用
joinedload

async def get_service_order_by_id(
    session: AsyncSession, service_order_id: int
) -> ServiceOrderModel:
    async with session:
        statement = (
            select(DBServiceOrder, DBVendor.company_name)
            .join(DBVendor, DBServiceOrder.vendor_id == DBVendor.vendor_id)
            .where(DBServiceOrder.service_order_id == service_order_id)
        )
       
        result = await session.execute(statement)
        service_order, vender_company_name = result.first()

传统的ORM方式是这样的。使用

joinload
可同时填充
.vendor
对象并将其附加到
server_order
对象上,所有操作均通过一个
SQL
查询进行。从常规的
SQL
来看,这听起来很疯狂,但最终在大多数情况下你最终都会手动进行这种积累和分组:

async def get_service_order_by_id(
    session: AsyncSession, service_order_id: int
) -> ServiceOrderModel:
    async with session:
        statement = (
            select(DBServiceOrder)
            .join(DBServiceOrder.vendor)
            .where(DBServiceOrder.service_order_id == service_order_id)
            .options(joinedload(DBServiceOrder.vendor))
        )
        
        result = await session.scalars(statement)  
        service_order = result.first()
        # without joinedload above this would trigger a SQL
        # select in the background to load this
        vendor = server_order.vendor
© www.soinside.com 2019 - 2024. All rights reserved.