base-repository

English | 한국어

How To Use (User Guide)

Table of Contents


0) Scope


1) Model & Schema Definition

# SQLAlchemy ORM
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]

# Pydantic schema
class UserSchema(BaseModel):
    id: int
    name: str
    email: str
    model_config = ConfigDict(from_attributes=True)

2) Repository Implementation

@dataclass
class UserFilter(BaseRepoFilter):
    id: int | None = None
    name: str | None = None

class UserRepo(BaseRepository[User, UserSchema]): # (default) enables Schema(UserSchema) return by default
    filter_class = UserFilter

Options


3) Quick Start (Step-by-step)

3.1 Session Binding (Important)

BaseRepository session priority

  1. If a SessionProvider is configured, the repo always uses provider.get_session().
  2. If no provider is configured, the repo uses the session passed to the repo constructor (specific session).
  3. If neither exists, it raises a runtime error.

Additionally, if you pass session=... to any repository method call, that session has the highest priority for that call.

# Configure once during app initialization
UserRepo.configure_session_provider(session_provider)

# After that, the repo does not need to hold a session itself
repo = UserRepo()

rows = await repo.get_list(
    flt=UserFilter(name="A"),
    page=1,
    size=10,
)

3.1.2 (Option) Inject Session Per Call

repo = UserRepo()

async with AsyncSession(engine) as session:
    rows = await repo.get_list(
        flt=UserFilter(name="A"),
        page=1,
        size=10,
        session=session,  # used only for this call
    )

3.1.3 (Option) Bind a Session to the Repo (No Provider)

async with AsyncSession(engine) as session:
    repo = UserRepo(session=session)
    rows = await repo.get_list(flt=UserFilter(name="A"), page=1, size=10)

Notes


3.2 Transaction / Commit Responsibility (Important)


3.3 Create (Single / Bulk)

# Single: schema or dict
created = await repo.create(UserSchema(name="Alice", email="a@test.com"))
created2 = await repo.create({"name": "Bob", "email": "b@test.com"})

# Bulk
bulk = await repo.create_many([
    {"name": "C", "email": "c@test.com"},
    UserSchema(name="D", email="d@test.com"),
])

# ORM model directly
created3 = await repo.create_from_model(User(name="E", email="e@test.com"))

PK handling rules


3.4 Read (Single)

row = await repo.get(UserFilter(name="Alice"))  # default: Schema return (when mapping_schema is set)
row_raw = await repo.get(UserFilter(name="Alice"), convert_schema=False)  # ORM return

3.5 Read (List - ListQuery Chaining)

OFFSET paging

q = (
    repo.list(flt=UserFilter())
        .order_by([User.id.asc()])   # optional (may be auto-augmented by your implementation if omitted)
        .paging(page=1, size=20)     # page>=1, size>=1
)

# Or: where chaining
q = (
    repo.list()
        .where(UserFilter())
        .order_by([User.id.asc()])
        .paging(page=1, size=20)
)

rows = await repo.execute(q)         # SELECT always returns a list

CURSOR (Keyset) paging

# First page: None or {}
q1 = (
    repo.list()
        .order_by([User.id.asc()])   # must be set before cursor
        .with_cursor(None)
        .limit(20)                   # size>=1
)
rows1 = await repo.execute(q1)

# Next page
next_cursor = {"id": 123}
q2 = (
    repo.list()
        .order_by([User.id.asc()])
        .with_cursor(next_cursor)    # must match order_by column keys/order
        .limit(20)
)
rows2 = await repo.execute(q2)

where usage pattern

q = repo.list().where(UserFilter(id=1, name="Kyu"))

# Using both list(flt=...) and .where(...) may raise an exception in where(),
# because list() already set the filter.

order_by input examples (supported shapes)

# 1) string key
q = repo.list().order_by(["id"]).paging(page=1, size=10)

# 2) Enum.value (string) — example Enum
class SortKey(Enum):
    ID = "id"
    NAME = "name"
q = repo.list().order_by([SortKey.ID, SortKey.NAME]).paging(page=1, size=10)

# 3) model column attribute
q = repo.list().order_by([User.name]).paging(page=1, size=10)

# 4) asc()/desc()
q = repo.list().order_by([User.name.desc(), User.id.asc()]).paging(page=1, size=10)

# 5) ColumnElement (expression)
q = repo.list().order_by([User.name.expression]).paging(page=1, size=10)

# 6) default: omit order_by
q = repo.list().paging(page=1, size=10)

Cursor dict example (multi-column)

q = (
  repo.list()
     .order_by([User.id.asc(), User.name.desc()])
     .with_cursor({
         "id": 120,
         "name": "Z",
     })
     .limit(20)
)
rows = await repo.execute(q)

Not allowed


3.6 Read (List - get_list)

OFFSET paging

rows = await repo.get_list(
    flt=UserFilter(name="A"),
    order_by=[User.id.asc()],
    page=1,
    size=10,
)

rows = await repo.get_list(
    flt=UserFilter(id=1, name="A"),
)

rows = await repo.get_list(
    order_by=["id", User.name.desc()],
    page=2,
    size=10,
)

rows = await repo.get_list(page=1, size=10)

CURSOR (Keyset) paging

rows1 = await repo.get_list(
    order_by=[User.id.asc()],
    cursor={},   # or None (still keyset mode)
    size=10,
)

rows2 = await repo.get_list(
    order_by=[User.id.asc(), User.name.desc()],
    cursor={"id": 120, "name": "Z"},
    size=10,
)

get_list composition rules

  1. If you pass cursor (including None), it runs in keyset paging mode.
  1. If you provide both page and size, it runs in offset paging mode.

  2. Otherwise, no paging is applied.


3.7 Count

cnt = await repo.count(UserFilter(name="Alice"))

3.8 Update

# Bulk UPDATE (single UPDATE SQL)
updated = await repo.update(UserFilter(name="Bob"), {"email": "bob@new"})

# Dirty Checking on a persistent object
obj = await repo.get(UserFilter(name="Alice"), convert_schema=False)
updated_Schema = await repo.update_from_model(obj, {"email": "alice@new"})

3.9 Delete

deleted = await repo.delete(UserFilter(name="Alice"))

4) Public API Reference

4.1 Repository Instance Methods


4.2 ListQuery Chaining Methods


5) BaseRepoFilter

Rules

Example

@dataclass
class UserFilter(BaseRepoFilter):
    id: int | list[int] | None = None
    active: bool | None = None
    __aliases__ = {"org_ids": "org_id"}

6) Mapping (Schema Conversion) Options

Example

class UserMapper(BaseMapper):
    def to_schema(self, db: User) -> UserSchema:
        return UserSchema(id=db.id, name=db.name, email="Changed")

    def to_orm(self, dm: UserSchema) -> User:
        return User(**dm.model_dump(exclude=["name"]), name="fixed")

class UserRepo(BaseRepository[User, UserSchema]):
    filter_class = UserFilter
    mapper = UserMapper

row = await repo.get(UserFilter(id=1))
assert row.email == "Changed"

6.1 Schema Validation Behavior When Using a Mapper

When you set mapper(BaseMapper), BaseRepository does not validate a strict 1:1 match between mapping_schema field names and ORM model column names.

Intent

Summary

class UserRepo(BaseRepository[User, UserSchema]):
    filter_class = UserFilter
    mapper = UserMapper  # if mapper exists, column-only strict validation is disabled

7) Performance Tests

Command list

Configuration

  1. CPU
  1. DB