我有 3 个 SQLAlchemy 表:TagGroup、Tag 和 Video。 TagGroup/Tag 和 Tag/Video 之间都具有双向多对多关系,通过关联表指定。我想构建一个查询,对给定 TagGroup.id 的所有视频的 Video.viewCount 字段进行求和。但是,如果同一视频链接到同一 TagGroup 内的 2 个单独的 Tag 表,则 Video.viewCount 将求和两次。希望能提供任何帮助来避免这种情况。
例如
视频1 / 标签1 /\ TagGroup Video2(将求和两次) \ / 标签2 \ 视频3
注意,为了简单起见,我删除了所有不相关的字段。
class TagGroup(Base):
__tablename__ = "tag_groups"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
tags = relationship(
"Tag",
secondary=tags_and_groups_association_table,
back_populates="groups",
)
tags_and_groups_association_table = Table(
"tags_and_groups_association_table",
Base.metadata,
Column("tags_id", ForeignKey("tags.id"), primary_key=True),
Column("tag_groups_id", ForeignKey("tag_groups.id"), primary_key=True),
PrimaryKeyConstraint('tags_id', 'tag_groups_id') # to avoid duplicates.
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
in_videos = relationship("Video", secondary=video_tags, back_populates="tags",lazy="select")
groups = relationship(
"TagGroup",
secondary=tags_and_groups_association_table,
back_populates="tags",
)
video_tags = Table(
'video_tags', Base.metadata,
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True),
Column('video_id', Integer, ForeignKey('videos.id'), primary_key=True)
)
class Video(Base):
__tablename__ = "videos"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
viewCount: Mapped[int] = mapped_column(
BigInteger,
nullable=False,
default=0,
)
tags = relationship("Tag", secondary=video_tags, back_populates="in_videos", lazy="select")
我尝试了不同的组合或 group_by video.id,但似乎没有任何效果。
select(
TagGroup.id,
func.SUM(Video.viewCount),
func.COUNT(distinct(Video.id)), # count works perfectly well with distinct() by unique id.
).select_from(
TagGroup.id
).where(
TagGroup.id.in_(ids) # list of TagGroup ids
).join(
tags_and_groups_association_table,
TagGroup.id == tags_and_groups_association_table.c.tag_groups_id
).join(
Tag,
tags_and_groups_association_table.c.tags_id == Tag.id
).join(
video_tags_model,
Tag.id == video_tags_model.c.tag_id,
).join(
Video,
video_tags_model.c.video_id == Video.id
).group_by(
TagGroup.id,
)
看起来可行,但是尝试更多测试用例。
如果您确认它运行良好,我稍后会在我的答案中添加更多评论。
from sqlalchemy import (
Integer, ForeignKey, Column, PrimaryKeyConstraint, Table, BigInteger, create_engine, select, func
)
from sqlalchemy.orm import relationship, Mapped, mapped_column, DeclarativeBase, sessionmaker
class Base(DeclarativeBase):
pass
tags_and_groups_association_table = Table(
"tags_and_groups_association_table",
Base.metadata,
Column("tags_id", ForeignKey("tags.id"), primary_key=True),
Column("tag_groups_id", ForeignKey("tag_groups.id"), primary_key=True),
PrimaryKeyConstraint('tags_id', 'tag_groups_id') # to avoid duplicates.
)
class TagGroup(Base):
__tablename__ = "tag_groups"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
tags = relationship(
"Tag",
secondary=tags_and_groups_association_table,
back_populates="groups",
)
video_tags = Table(
'video_tags', Base.metadata,
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True),
Column('video_id', Integer, ForeignKey('videos.id'), primary_key=True)
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
in_videos = relationship("Video", secondary=video_tags, back_populates="tags",lazy="select")
groups = relationship(
"TagGroup",
secondary=tags_and_groups_association_table,
back_populates="tags",
)
class Video(Base):
__tablename__ = "videos"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
)
viewCount: Mapped[int] = mapped_column(
BigInteger,
nullable=False,
default=0,
)
tags = relationship("Tag", secondary=video_tags, back_populates="in_videos", lazy="select")
engine = create_engine("sqlite://")
Base.metadata.create_all(engine)
session_maker = sessionmaker(bind=engine)
with session_maker() as session:
t_gr_1 = TagGroup()
t_gr_2 = TagGroup()
t_1 = Tag()
t_2 = Tag()
t_3 = Tag()
t_4 = Tag()
t_gr_1.tags.append(t_1)
t_gr_1.tags.append(t_2)
t_gr_2.tags.append(t_3)
t_gr_2.tags.append(t_4)
v_1 = Video(viewCount=111, tags=[t_1, t_2])
v_2 = Video(viewCount=222, tags=[t_1, t_3])
v_3 = Video(viewCount=444, tags=[t_3, t_4])
v_4 = Video(viewCount=444, tags=[t_3])
session.add(t_gr_1)
session.add(t_gr_2)
session.commit()
ids = [1, 2]
subq = (
select(Video.id.label("video_id"), Video.viewCount.label("viewCount"), TagGroup.id.label("taggroup_id"))
.select_from(TagGroup)
.join(tags_and_groups_association_table)
.join(Tag)
.join(video_tags)
.join(Video)
.group_by(TagGroup.id, Video.id)
)
st = (
select(subq.c.taggroup_id, func.SUM(subq.c.viewCount))
.select_from(subq)
.group_by(subq.c.taggroup_id)
)
for row in session.execute(st).all():
print(row)
输出:
(1, 333)
(2, 1110)