ORM (SQLalchemy) 的案例

问题描述 投票:0回答:4

我正在将 SQLAlchemy 与 ORM paragdim 结合使用。我无法找到执行 CASE WHEN 指令的方法。我在网上找不到这方面的信息。

可以吗?

python orm sqlalchemy
4个回答
88
投票

请参阅文档页面上的 sqlalchemy.sql.expression.case 函数和更多示例。但它看起来像这样(来自链接的文档的逐字记录):

case([(orderline.c.qty > 100, item.c.specialprice),
      (orderline.c.qty > 10, item.c.bulkprice)
    ], else_=item.c.regularprice)
case(value=emp.c.type, whens={
        'engineer': emp.c.salary * 1.1,
        'manager':  emp.c.salary * 3,
    })

edit-1(回答评论)当然可以,请参阅下面的示例:

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)
    first_name = Column(String)
    last_name = Column(String)

xpr = case([(User.first_name != None, User.first_name + " " + User.last_name),],
        else_ = User.last_name).label("full_name")

qry = session.query(User.id, xpr)
for _usr in qry:
    print _usr.fullname

另请参阅 使用混合,了解混合属性中使用的

case
示例。


8
投票

我让它与聚合函数一起使用,在本例中

func.sum

我的示例代码

from sqlalchemy import func, case

my_case_stmt = case(
    [
        (MyTable.hit_type.in_(['easy', 'medium']), 1),
        (MyTable.hit_type == 'hard', 3)
    ]
)

score = db.session.query(
    func.sum(my_case_stmt)
).filter(
    MyTable.success == 1
)

return score.scalar()

我的用例

我的表格看起来像这样:

|   hit_type     |  success |  
-----------------------------  
|   easy         |   1      |  
|   medium       |   1      |  
|   easy         |   0      |  
|   hard         |   1      |  
|   easy         |   0      |
|   easy         |   1      |  
|   medium       |   1      |  
|   hard         |   1      |

score
计算如下:
score = num_easy_hits + num_medium_hits + (3 * num_hard_hits)

4 次成功的简单/中度打击和 2 次成功的困难打击会给你

(4 + (2*3)) = 10


0
投票

这是文档中的链接: http://docs.sqlalchemy.org/en/latest/core/sqlelement.html?highlight=case#sqlalchemy.sql.expression.Case

但是看到这些例子让我很困惑,而且没有可运行的代码。 我尝试了很多次,也遇到了很多种问题。

最后,我找到了两种在 sqlalchemy 中实现“Case when”的方法。

第一种方式:

顺便说一句,我的情况是我需要根据用户是否已登录来屏蔽电话字段。

    @staticmethod
    def requirement_list_common_query(user=None):
      `enter code here`  phone_mask = case(
            [
                (db.true() if user else db.false(), Requirement.temp_phone),
            ],
            else_=func.concat(func.left(Requirement.temp_phone, 3), '****', func.right(Requirement.temp_phone, 4))
        ).label('temp_phone')
        query = db.session.query(Requirement.company_id,
                                 Company.uuid.label('company_uuid'),
                                 Company.name.label('company_name'),
                                 Requirement.uuid,
                                 Requirement.title,
                                 Requirement.content,
                                 Requirement.level,
                                 Requirement.created_at,
                                 Requirement.published_at,
                                 Requirement.end_at,
                                 Requirement.status,
                                 # Requirement.temp_phone,
                                 phone_mask,
                                 User.name.label('user_name'),
                                 User.uuid.label('user_uuid')
                                 )
        query = query.join(Company, Company.id == Requirement.company_id) \
            .join(User, User.id == Requirement.user_id)
        return query

需求是我的模型之一。 如果用户已登录,则方法“requirement_list_common_query”中的用户参数是登录用户。

第二种方式: 这里的情况是我想根据员工的收入对他们进行分类。

型号有:

class Dept(Base):
    __tablename__ = 'dept'
    deptno = Column(Integer, primary_key=True)
    dname = Column(String(14))
    loc = Column(String(13))

    def __repr__(self):
        return str({
            'deptno': self.deptno,
            'dname': self.dname,
            'loc': self.loc
        })


class Emp(Base):
    __tablename__ = 'emp'
    empno = Column(Integer, primary_key=True)
    ename = Column(String(10))
    job = Column(String(9))
    mgr = Column(Integer)
    hiredate = Column(Date)
    sal = Column(DECIMAL(7, 2))
    comm = Column(DECIMAL(7, 2))
    deptno = Column(Integer, ForeignKey('dept.deptno'))

    def __repr__(self):
        return str({
            'empno': self.empno,
            'ename': self.ename,
            'job': self.job,
            'deptno': self.deptno,
            'comm': self.comm
        })

这是代码:

from sqlalchemy import text
income_level = case(
    [
        (text('(emp.sal + ifnull(emp.comm,0))<1500'), 'LOW_INCOME'),
        (text('1500<=(emp.sal + ifnull(emp.comm,0))<3500'), 'MIDDLE_INCOME'),
        (text('(emp.sal + ifnull(emp.comm,0))>=3500'), 'HIGH_INCOME'),
    ], else_='UNKNOWN'
).label('income_level')
emps = sess.query(Emp.ename, label('income', Emp.sal + func.ifnull(Emp.comm, 0)),
                  income_level).all()
for item in emps:
    print(item.ename, item.income, item.income_level)

为什么我用“文本”?因为这样的代码在SQLAlchemy 1.2.8中无法实现。我已经尝试了很长时间,但找不到这样的方法,正如@van所说:

case([(orderline.c.qty > 100, item.c.specialprice),
      (orderline.c.qty > 10, item.c.bulkprice)
    ], else_=item.c.regularprice)
case(value=emp.c.type, whens={
        'engineer': emp.c.salary * 1.1,
        'manager':  emp.c.salary * 3,
    })

希望对你有帮助!


0
投票

更新:在最新的sqlalchemy中,case语句中不需要

[]

case(
    [
      (orderline.c.qty > 100, item.c.specialprice),
      (orderline.c.qty > 10, item.c.bulkprice),
    ], 
   else_=item.c.regularprice,
)

应改为

case(
    (orderline.c.qty > 100, item.c.specialprice),
    (orderline.c.qty > 10, item.c.bulkprice),
    else_=item.c.regularprice,
)

否则,你可能会遇到这样的错误:

sqlalchemy.exc.ArgumentError: The "whens" argument to case(), when referring to a sequence of items, is now passed as a series of positional elements, rather than as a list. 

© www.soinside.com 2019 - 2024. All rights reserved.