BaseRepository[TModel]BaseRepoFilter (dataclass-based)ListQuery chaining (where → order_by → paging or with_cursor + limit)# SQLAlchemy ORM
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str]
# Pydantic schema
class UserSchema(BaseModel):
id: int
name: str
email: str
model_config = ConfigDict(from_attributes=True)
@dataclass
class UserFilter(BaseRepoFilter):
id: int | None = None
name: str | None = None
class UserRepo(BaseRepository[User, UserSchema]): # (default) enables Schema(UserSchema) return by default
filter_class = UserFilter
Options
mapper: type[BaseMapper] | None, you can customize Schema ↔ ORM conversion rules.mapping_schema, mapper, and default_convert_schema.BaseRepository session priority
SessionProvider is configured, the repo always uses provider.get_session().session passed to the repo constructor (specific session).Additionally, if you pass session=... to any repository method call, that session has the highest priority for that call.
# Configure once during app initialization
UserRepo.configure_session_provider(session_provider)
# After that, the repo does not need to hold a session itself
repo = UserRepo()
rows = await repo.get_list(
flt=UserFilter(name="A"),
page=1,
size=10,
)
repo = UserRepo()
async with AsyncSession(engine) as session:
rows = await repo.get_list(
flt=UserFilter(name="A"),
page=1,
size=10,
session=session, # used only for this call
)
async with AsyncSession(engine) as session:
repo = UserRepo(session=session)
rows = await repo.get_list(flt=UserFilter(name="A"), page=1, size=10)
Notes
UserRepo(session=...) is ignored (and a warning is emitted). The provider wins.commit().create/create_many/create_from_model/update_from_model only performs up to flush().update/delete/execute/get/get_list/count does not perform flush() or commit().commit/rollback) are the caller’s responsibility (service/use-case/middleware).# Single: schema or dict
created = await repo.create(UserSchema(name="Alice", email="a@test.com"))
created2 = await repo.create({"name": "Bob", "email": "b@test.com"})
# Bulk
bulk = await repo.create_many([
{"name": "C", "email": "c@test.com"},
UserSchema(name="D", email="d@test.com"),
])
# ORM model directly
created3 = await repo.create_from_model(User(name="E", email="e@test.com"))
PK handling rules
create() / create_many() ignore autoincrement PK input (columns-only payload + autoinc PK removal).create_from_model() adds + flushes the ORM object as-is, so if you set a PK, it may be included in the INSERT.row = await repo.get(UserFilter(name="Alice")) # default: Schema return (when mapping_schema is set)
row_raw = await repo.get(UserFilter(name="Alice"), convert_schema=False) # ORM return
q = (
repo.list(flt=UserFilter())
.order_by([User.id.asc()]) # optional (may be auto-augmented by your implementation if omitted)
.paging(page=1, size=20) # page>=1, size>=1
)
# Or: where chaining
q = (
repo.list()
.where(UserFilter())
.order_by([User.id.asc()])
.paging(page=1, size=20)
)
rows = await repo.execute(q) # SELECT always returns a list
# First page: None or {}
q1 = (
repo.list()
.order_by([User.id.asc()]) # must be set before cursor
.with_cursor(None)
.limit(20) # size>=1
)
rows1 = await repo.execute(q1)
# Next page
next_cursor = {"id": 123}
q2 = (
repo.list()
.order_by([User.id.asc()])
.with_cursor(next_cursor) # must match order_by column keys/order
.limit(20)
)
rows2 = await repo.execute(q2)
q = repo.list().where(UserFilter(id=1, name="Kyu"))
# Using both list(flt=...) and .where(...) may raise an exception in where(),
# because list() already set the filter.
# 1) string key
q = repo.list().order_by(["id"]).paging(page=1, size=10)
# 2) Enum.value (string) — example Enum
class SortKey(Enum):
ID = "id"
NAME = "name"
q = repo.list().order_by([SortKey.ID, SortKey.NAME]).paging(page=1, size=10)
# 3) model column attribute
q = repo.list().order_by([User.name]).paging(page=1, size=10)
# 4) asc()/desc()
q = repo.list().order_by([User.name.desc(), User.id.asc()]).paging(page=1, size=10)
# 5) ColumnElement (expression)
q = repo.list().order_by([User.name.expression]).paging(page=1, size=10)
# 6) default: omit order_by
q = repo.list().paging(page=1, size=10)
q = (
repo.list()
.order_by([User.id.asc(), User.name.desc()])
.with_cursor({
"id": 120,
"name": "Z",
})
.limit(20)
)
rows = await repo.execute(q)
Not allowed
func(...), text("..."), arbitrary typesNone values in cursorrows = await repo.get_list(
flt=UserFilter(name="A"),
order_by=[User.id.asc()],
page=1,
size=10,
)
rows = await repo.get_list(
flt=UserFilter(id=1, name="A"),
)
rows = await repo.get_list(
order_by=["id", User.name.desc()],
page=2,
size=10,
)
rows = await repo.get_list(page=1, size=10)
rows1 = await repo.get_list(
order_by=[User.id.asc()],
cursor={}, # or None (still keyset mode)
size=10,
)
rows2 = await repo.get_list(
order_by=[User.id.asc(), User.name.desc()],
cursor={"id": 120, "name": "Z"},
size=10,
)
get_list composition rules
cursor (including None), it runs in keyset paging mode.order_by is requiredsize is required (keyset limit)If you provide both page and size, it runs in offset paging mode.
Otherwise, no paging is applied.
page since it does not page anything.cnt = await repo.count(UserFilter(name="Alice"))
# Bulk UPDATE (single UPDATE SQL)
updated = await repo.update(UserFilter(name="Bob"), {"email": "bob@new"})
# Dirty Checking on a persistent object
obj = await repo.get(UserFilter(name="Alice"), convert_schema=False)
updated_Schema = await repo.update_from_model(obj, {"email": "alice@new"})
deleted = await repo.delete(UserFilter(name="Alice"))
configure_session_provider(provider: SessionProvider) -> None (class method)
session -> AsyncSession (property)
list(flt: BaseRepoFilter | None = None) -> ListQuery
execute(q_or_stmt, *, session=None, convert_schema=None)
ListQuery or a statement.list.get(flt, *, convert_schema=None, session=None) / get_or_fail(...)
get_list(*, flt=None, order_by=None, cursor=None, page=None, size=None, session=None, convert_schema=None)
ListQuery internally and calls execute().count(flt=None, *, session=None) / delete(flt, *, session=None)
add(obj, *, session=None) / add_all(objs, *, session=None)
create(data, *, session=None, convert_schema=None) /
create_many(items, *, session=None, convert_schema=None) /
create_from_model(obj, *, session=None, convert_schema=None)
create/create_many ignore autoincrement PK input.create_from_model uses ORM object as-is, so a set PK may be included in INSERT.update(flt, update, *, session=None)
update_from_model(base, update, *, session=None, convert_schema=None)
.where(flt: BaseRepoFilter | None) -> ListQuery
.order_by(items: Sequence[Any]) -> ListQuery
Enum.value(string), model column attributes, asc()/desc(), column expression.func(...), text("..."), arbitrary types..with_cursor(cursor: dict[str, Any] | None = None) -> ListQuery
None/{} means first page.order_by(...) (non-empty).order_by; values cannot be None..limit(size: int) -> ListQuery
size>=1. Cannot be combined with offset paging..paging(*, page: int, size: int) -> ListQuery
page>=1, size>=1. Only once. Cannot be combined with cursor mode.Rules
bool -> .is_(val).in_(...) (empty sequences are skipped)==__aliases__ maps field name -> column name__strict__ = True raises if mapping failsExample
@dataclass
class UserFilter(BaseRepoFilter):
id: int | list[int] | None = None
active: bool | None = None
__aliases__ = {"org_ids": "org_id"}
UserFilter(id=[1, 2, 3]).mapping_schema is set, the default return is Schema (Pydantic)convert_schema=FalseBaseMapper, you can customize Schema ↔ ORM conversionsExample
class UserMapper(BaseMapper):
def to_schema(self, db: User) -> UserSchema:
return UserSchema(id=db.id, name=db.name, email="Changed")
def to_orm(self, dm: UserSchema) -> User:
return User(**dm.model_dump(exclude=["name"]), name="fixed")
class UserRepo(BaseRepository[User, UserSchema]):
filter_class = UserFilter
mapper = UserMapper
row = await repo.get(UserFilter(id=1))
assert row.email == "Changed"
When you set mapper(BaseMapper), BaseRepository does not validate a strict 1:1 match between
mapping_schema field names and ORM model column names.
Intent
Summary
class UserRepo(BaseRepository[User, UserSchema]):
filter_class = UserFilter
mapper = UserMapper # if mapper exists, column-only strict validation is disabled
Command list
Check the Makefile targets.
perf-list
tests/perf/resultsperf-cpu
perf-db
perf-view
RUN_ID={RUN_ID} to visualize a specific runConfiguration
ROW_VALUES and ITERATIONS near the top of tests/perf/perf_cpu_only.py.SEED_DATA_ROWS in tests/perf/seed.config.py.