有以下型号定义:
class ScheduledNotebook(Base):
__tablename__ = 'scheduled_notebooks'
__bind_key__ = 'metadata'
__table_args__ = (
{'schema': 'metadata'}
)
id: Mapped[int] = mapped_column(primary_key=True)
notebook_path = Column(String, nullable=True, unique=True)
joined_runs = relationship(
"ScheduledNotebookRun",
backref="scheduled_notebook",
order_by='ScheduledNotebookRun.id.desc()',
lazy='raise',
passive_deletes=True
)
class ScheduledNotebookRunInfo(Base):
__tablename__ = "scheduled_notebook_run_info"
__table_args__ = (
{'schema': 'run_info_schema'}
)
id: Mapped[int] = mapped_column(
ForeignKey("metadata.scheduled_notebook_run.id", ondelete='CASCADE'),
primary_key=True
)
info: Mapped[dict] = mapped_column(type_=JSONB)
class ScheduledNotebookRun(ScheduledNotebookRunInfo):
__tablename__ = "scheduled_notebook_run"
__table_args__ = (
{'schema': 'metadata'}
)
id: Mapped[int] = mapped_column(primary_key=True)
notebook_id: Mapped[int] = mapped_column(ForeignKey(
ScheduledNotebook.id,
ondelete='CASCADE',
use_alter=True
), nullable=False)
queued_at_time: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
ScheduledNotebookRunInfo.__mapper_args__ = {
'inherit_condition': id == ScheduledNotebookRun.id,
}
产生下表:
执行插入时出现问题:
n = ScheduledNotebook(
notebook_path='a/b.ipynb',
)
db_session.add(n)
db_session.commit()
nr = ScheduledNotebookRun(
notebook_id=n.id,
queued_at_time=datetime.now(),
info={'foo': 'bar'}
)
db_session.add(nr)
db_session.commit()
它提出:
sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "id" of relation "scheduled_notebook_run_info" violates not-null constraint
DETAIL: Failing row contains (null, {"foo": "bar"}).
问题:
inherit_condition
不是负责用 FK 到继承表的 PK 自动填充 PK 吗?这里有一些可能有效的示例。
import sys
from datetime import datetime, timezone
from pprint import pprint
from sqlalchemy import (
create_engine,
ForeignKey,
String,
MetaData,
Integer,
)
from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
from sqlalchemy.schema import (
CreateSchema,
Column,
Table,
)
from sqlalchemy.sql import select
from sqlalchemy.orm import (
Session,
relationship,
mapped_column,
Mapped,
selectinload,
join,
DeclarativeBase,
column_property,
)
def utc_now():
return datetime.now(timezone.utc)
metadata_obj = MetaData()
class Base(DeclarativeBase):
metadata = metadata_obj
username, password, db = sys.argv[1:4]
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)
with engine.connect() as conn:
from sqlalchemy import event
event.listen(Base.metadata, "before_create", CreateSchema("run_info_schema"))
event.listen(Base.metadata, "before_create", CreateSchema("metadata"))
# define two Table objects
run_table = Table(
"scheduled_notebook_runs",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("notebook_id", Integer, ForeignKey("metadata.scheduled_notebooks.id")),
Column("queued_at_time", TIMESTAMP(timezone=True), nullable=True),
schema="metadata",
)
run_info_table = Table(
"scheduled_notebook_run_infos",
metadata_obj,
Column(
"id",
Integer,
ForeignKey("metadata.scheduled_notebook_runs.id"),
primary_key=True,
),
Column("info", JSONB),
schema="run_info_schema",
)
run_run_info_join = join(run_table, run_info_table)
class ScheduledNotebook(Base):
__tablename__ = "scheduled_notebooks"
__table_args__ = {"schema": "metadata"}
id: Mapped[int] = mapped_column(primary_key=True)
notebook_path = Column(String, nullable=True, unique=True)
joined_runs = relationship(
"ScheduledNotebookRun",
backref="scheduled_notebook",
order_by="ScheduledNotebookRun.id.desc()",
lazy="raise",
passive_deletes=True,
)
class ScheduledNotebookRun(Base):
__table__ = run_run_info_join
__table_args__ = {"schema": "run_info_schema"}
id: Mapped[int] = column_property(run_table.c.id, run_info_table.c.id)
queued_at_time = run_table.c.queued_at_time
info = run_info_table.c.info
Base.metadata.create_all(engine)
with Session(engine) as db_session:
# Make notebook and run.
n = ScheduledNotebook(
notebook_path="a/b.ipynb",
)
nr = ScheduledNotebookRun(
scheduled_notebook=n, queued_at_time=utc_now(), info={"foo": "bar"}
)
db_session.add_all([n, nr])
db_session.commit()
# Run same notebook again.
nr = ScheduledNotebookRun(
scheduled_notebook=n, queued_at_time=utc_now(), info={"ping": "pong"}
)
db_session.add_all([nr])
db_session.commit()
with Session(engine) as db_session:
stmt = select(ScheduledNotebookRun).options(
selectinload(ScheduledNotebookRun.scheduled_notebook)
)
for run in db_session.scalars(stmt):
pprint(
{
"notebook id": run.scheduled_notebook.id,
"run id": run.id,
"queued_at_time": run.queued_at_time,
"info": run.info,
}
)
import sys
from datetime import datetime, timezone
from pprint import pprint
from sqlalchemy import (
create_engine,
ForeignKey,
String,
)
from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
from sqlalchemy.schema import (
CreateSchema,
Column,
)
from sqlalchemy.sql import select
from sqlalchemy.orm import (
declarative_base,
Session,
relationship,
mapped_column,
Mapped,
joinedload,
selectinload,
)
def utc_now():
return datetime.now(timezone.utc)
Base = declarative_base()
username, password, db = sys.argv[1:4]
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)
with engine.connect() as conn:
from sqlalchemy import event
event.listen(Base.metadata, "before_create", CreateSchema("run_info_schema"))
event.listen(Base.metadata, "before_create", CreateSchema("metadata"))
class ScheduledNotebook(Base):
__tablename__ = "scheduled_notebooks"
__table_args__ = {"schema": "metadata"}
id: Mapped[int] = mapped_column(primary_key=True)
notebook_path = Column(String, nullable=True, unique=True)
joined_runs = relationship(
"ScheduledNotebookRun",
backref="scheduled_notebook",
order_by="ScheduledNotebookRun.id.desc()",
lazy="raise",
passive_deletes=True,
)
class ScheduledNotebookRun(Base):
__tablename__ = "scheduled_notebook_run"
__table_args__ = {"schema": "metadata"}
id: Mapped[int] = mapped_column(primary_key=True)
notebook_id: Mapped[int] = mapped_column(
ForeignKey(ScheduledNotebook.id, ondelete="CASCADE", use_alter=True),
nullable=False,
)
queued_at_time: Mapped[datetime] = mapped_column(
TIMESTAMP(timezone=True), nullable=True
)
info: Mapped["ScheduledNotebookRunInfo"] = relationship(
"ScheduledNotebookRunInfo", back_populates="run", uselist=False
)
class ScheduledNotebookRunInfo(Base):
__tablename__ = "scheduled_notebook_run_info"
__table_args__ = {"schema": "run_info_schema"}
id: Mapped[int] = mapped_column(
ForeignKey("metadata.scheduled_notebook_run.id", ondelete="CASCADE"),
primary_key=True,
)
run: Mapped[ScheduledNotebookRun] = relationship(
"ScheduledNotebookRun", back_populates="info", uselist=False
)
info: Mapped[dict] = mapped_column(type_=JSONB)
Base.metadata.create_all(engine)
with Session(engine) as db_session:
# Make notebook and run.
n = ScheduledNotebook(
notebook_path="a/b.ipynb",
)
nr = ScheduledNotebookRun(
scheduled_notebook=n,
queued_at_time=utc_now(),
)
nri = ScheduledNotebookRunInfo(run=nr, info={"foo": "bar"})
db_session.add_all([n, nr, nri])
db_session.commit()
# Run same notebook again.
nr = ScheduledNotebookRun(
scheduled_notebook=n,
queued_at_time=utc_now(),
)
nri = ScheduledNotebookRunInfo(run=nr, info={"ping": "pong"})
db_session.add_all([nr, nri])
db_session.commit()
with Session(engine) as db_session:
stmt = select(ScheduledNotebookRun).options(
# left outer join to load info
joinedload(ScheduledNotebookRun.info),
# select netbooks by ids in second query
selectinload(ScheduledNotebookRun.scheduled_notebook),
)
for run in db_session.scalars(stmt):
pprint(
{
"notebook id": run.scheduled_notebook.id,
"run id": run.id,
"info id": run.info.id,
"queued_at_time": run.queued_at_time,
# Obviously not ideal and gross, but yet effective...
"info": run.info.info,
}
)
import sys
from datetime import datetime, timezone
from pprint import pprint
from sqlalchemy import (
create_engine,
ForeignKey,
String,
)
from sqlalchemy.dialects.postgresql import JSONB, TIMESTAMP
from sqlalchemy.schema import (
CreateSchema,
Column,
)
from sqlalchemy.sql import select
from sqlalchemy.orm import (
declarative_base,
Session,
relationship,
mapped_column,
Mapped,
)
def utc_now():
return datetime.now(timezone.utc)
Base = declarative_base()
username, password, db = sys.argv[1:4]
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@/{db}", echo=True)
with engine.connect() as conn:
from sqlalchemy import event
event.listen(Base.metadata, "before_create", CreateSchema("run_info_schema"))
event.listen(Base.metadata, "before_create", CreateSchema("metadata"))
class ScheduledNotebook(Base):
__tablename__ = "scheduled_notebooks"
__table_args__ = {"schema": "metadata"}
id: Mapped[int] = mapped_column(primary_key=True)
notebook_path = Column(String, nullable=True, unique=True)
joined_runs = relationship(
"ScheduledNotebookRun",
backref="scheduled_notebook",
order_by="ScheduledNotebookRun.id.desc()",
lazy="raise",
passive_deletes=True,
)
class ScheduledNotebookRun(Base):
__tablename__ = "scheduled_notebook_run"
__table_args__ = {"schema": "metadata"}
__mapper_args__ = {
"polymorphic_on": "type",
}
id: Mapped[int] = mapped_column(primary_key=True)
type: Mapped[str] = mapped_column(String, nullable=False)
notebook_id: Mapped[int] = mapped_column(
ForeignKey(ScheduledNotebook.id, ondelete="CASCADE", use_alter=True),
nullable=False,
)
queued_at_time: Mapped[datetime] = mapped_column(
TIMESTAMP(timezone=True), nullable=True
)
class ScheduledNotebookRunInfo(ScheduledNotebookRun):
__tablename__ = "scheduled_notebook_run_info"
__table_args__ = {"schema": "run_info_schema"}
__mapper_args__ = {
"polymorphic_identity": "info",
}
id: Mapped[int] = mapped_column(
ForeignKey("metadata.scheduled_notebook_run.id", ondelete="CASCADE"),
primary_key=True,
)
info: Mapped[dict] = mapped_column(type_=JSONB)
Base.metadata.create_all(engine)
with Session(engine) as db_session:
# Make notebook and run.
n = ScheduledNotebook(
notebook_path="a/b.ipynb",
)
nri = ScheduledNotebookRunInfo(
scheduled_notebook=n,
queued_at_time=utc_now(),
info={"foo": "bar"},
)
db_session.add_all([n, nri])
db_session.commit()
# Run same notebook again.
nri = ScheduledNotebookRunInfo(
scheduled_notebook=n,
queued_at_time=utc_now(),
info={"ping": "pong"},
)
db_session.add_all([nri])
db_session.commit()
with Session(engine) as db_session:
stmt = select(ScheduledNotebookRun)
for run in db_session.scalars(stmt):
pprint(
{
"notebook id": run.scheduled_notebook.id,
"type": run.type,
"run id": run.id,
"queued_at_time": run.queued_at_time,
# Obviously not ideal and gross, but yet effective...
"info": run.info,
}
)