BaseRepository[TModel]BaseRepoFilter (dataclass 기반)ListQuery 체이닝 (where → order_by → paging 또는 with_cursor + limit)# SQLAlchemy ORM
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str]
# Pydantic 스키마
class UserSchema(BaseModel):
id: int
name: str
email: str
model_config = ConfigDict(from_attributes=True)
@dataclass
class UserFilter(BaseRepoFilter):
id: int | None = None
name: str | None = None
class UserRepo(BaseRepository[User, UserSchema]): # (기본) schema(UserSchema) 반환 활성화
filter_class = UserFilter
옵션
mapper: type[BaseMapper] | None 제공 시, 스키마↔ORM 변환 규칙을 커스터마이징mapping_schema, mapper, default_convert_schema를 덮어쓸 수 있음BaseRepository의 세션 우선순위
SessionProvider가 설정되어 있으면, repo는 항상 provider.get_session()을 사용합니다.session(specific session)을 사용합니다.추가로, 각 메서드 호출에서 session=...을 전달하면 그 호출에서는 항상 그 세션이 최우선입니다.
# 앱 초기화 시점에 1회 설정
UserRepo.configure_session_provider(session_provider)
# 이후 repo는 세션을 직접 들고 있을 필요가 없음
repo = UserRepo()
rows = await repo.get_list(
flt=UserFilter(name="A"),
page=1,
size=10,
)
repo = UserRepo()
async with AsyncSession(engine) as session:
rows = await repo.get_list(
flt=UserFilter(name="A"),
page=1,
size=10,
session=session, # 이 호출에서만 사용됨
)
async with AsyncSession(engine) as session:
repo = UserRepo(session=session)
rows = await repo.get_list(flt=UserFilter(name="A"), page=1, size=10)
주의
UserRepo(session=...)로 세션을 넣어도 Provider가 우선이며, 해당 세션은 무시됩니다(경고 발생).commit()을 호출하지 않습니다.create/create_many/create_from_model/update_from_model은 flush()까지만 수행합니다.update/delete/execute/get/get_list/count는 flush/commit을 수행하지 않습니다.commit/rollback)는 호출자(서비스/유스케이스/미들웨어)가 책임집니다.# 단건: 스키마 또는 dict
created = await repo.create(UserSchema(name="Alice", email="a@test.com"))
created2 = await repo.create({"name": "Bob", "email": "b@test.com"})
# 다건
bulk = await repo.create_many([
{"name": "C", "email": "c@test.com"},
UserSchema(name="D", email="d@test.com"),
])
# ORM 모델 직접
created3 = await repo.create_from_model(User(name="E", email="e@test.com"))
PK 처리 규칙
create() / create_many()는 autoincrement PK 입력을 무시합니다(컬럼-only payload로 정리 + autoinc pk 제거).create_from_model()은 ORM 객체를 그대로 add+flush 하므로, PK를 설정해 넘기면 INSERT에 포함될 수 있습니다.# 단건
row = await repo.get(UserFilter(name="Alice")) # 기본: 도메인 반환 (mapping_schema 설정 시)
row_raw = await repo.get(UserFilter(name="Alice"), convert_schema=False) # ORM 반환
q = (
repo.list(flt=UserFilter())
.order_by([User.id.asc()]) # 선택(미지정 시 구현체에서 보강될 수 있음)
.paging(page=1, size=20) # page>=1, size>=1
)
# 또는 where 체이닝
q = (
repo.list()
.where(UserFilter())
.order_by([User.id.asc()])
.paging(page=1, size=20)
)
rows = await repo.execute(q) # SELECT는 항상 list 반환
# 첫 페이지: None 또는 {}
q1 = (
repo.list()
.order_by([User.id.asc()]) # 커서 이전에 지정
.with_cursor(None)
.limit(20) # size>=1
)
rows1 = await repo.execute(q1)
# 다음 페이지
next_cursor = {"id": 123}
q2 = (
repo.list()
.order_by([User.id.asc()])
.with_cursor(next_cursor) # order_by 컬럼 키/순서와 동일해야 함
.limit(20)
)
rows2 = await repo.execute(q2)
# 여러 필드 조합
q = repo.list().where(UserFilter(id=1, name="Kyu"))
# list(flt=...)와 .where(...)를 같이 쓰면 where()에서 예외가 날 수 있습니다.
# (list()에서 이미 filter가 설정된 상태이기 때문)
# 1) 문자열 키
q = repo.list().order_by(["id"]).paging(page=1, size=10)
# 2) Enum.value (문자열) — 예시 Enum
class SortKey(Enum):
ID = "id"
NAME = "name"
q = repo.list().order_by([SortKey.ID, SortKey.NAME]).paging(page=1, size=10)
# 3) 모델 컬럼 속성
q = repo.list().order_by([User.name]).paging(page=1, size=10)
# 4) asc()/desc()
q = repo.list().order_by([User.name.desc(), User.id.asc()]).paging(page=1, size=10)
# 5) ColumnElement (expression)
q = repo.list().order_by([User.name.expression]).paging(page=1, size=10)
# 6) 기본값: order_by 생략
q = repo.list().paging(page=1, size=10)
q = (
repo.list()
.order_by([User.id.asc(), User.name.desc()])
.with_cursor({
"id": 120,
"name": "Z",
})
.limit(20)
)
rows = await repo.execute(q)
불허
func(...), text("..."), 임의 타입Nonerows = await repo.get_list(
flt=UserFilter(name="A"),
order_by=[User.id.asc()],
page=1,
size=10,
)
rows = await repo.get_list(
flt=UserFilter(id=1, name="A"),
)
rows = await repo.get_list(
order_by=["id", User.name.desc()],
page=2,
size=10,
)
rows = await repo.get_list(page=1, size=10)
rows1 = await repo.get_list(
order_by=[User.id.asc()],
cursor={}, # 또는 None (이 경우에도 keyset 모드)
size=10,
)
rows2 = await repo.get_list(
order_by=[User.id.asc(), User.name.desc()],
cursor={"id": 120, "name": "Z"},
size=10,
)
get_list 조합 규칙
cursor를 전달하면(None 포함) keyset 페이징으로 동작합니다.order_by 필수size 필수 (keyset limit)page와 size를 같이 지정하면 offset 페이징으로 동작합니다.
그 외는 페이징이 적용되지 않습니다.
page만 주는 형태는 페이징이 되지 않으니 지양하세요.cnt = await repo.count(UserFilter(name="Alice"))
# 일괄 UPDATE (단일 UPDATE SQL)
updated = await repo.update(UserFilter(name="Bob"), {"email": "bob@new"})
# 영속 객체 Dirty Checking
obj = await repo.get(UserFilter(name="Alice"), convert_schema=False)
updated_schema = await repo.update_from_model(obj, {"email": "alice@new"})
deleted = await repo.delete(UserFilter(name="Alice"))
configure_session_provider(provider: SessionProvider) -> None (class method)
session -> AsyncSession (property)
list(flt: BaseRepoFilter | None = None) -> ListQuery
execute(q_or_stmt, *, session=None, convert_schema=None)
ListQuery 또는 statement를 실행합니다.list를 반환합니다.get(flt, *, convert_schema=None, session=None) / get_or_fail(...)
get_list(*, flt=None, order_by=None, cursor=None, page=None, size=None, session=None, convert_schema=None)
ListQuery를 조립해 execute()까지 수행합니다.count(flt=None, *, session=None) / delete(flt, *, session=None)
add(obj, *, session=None) / add_all(objs, *, session=None)
create(data, *, session=None, convert_schema=None) / create_many(items, *, session=None, convert_schema=None) / create_from_model(obj, *, session=None, convert_schema=None)
create/create_many는 autoincrement PK 입력을 무시합니다.create_from_model은 ORM을 그대로 추가하므로 PK가 세팅되어 있으면 INSERT에 포함될 수 있습니다.update(flt, update, *, session=None)
update_from_model(base, update, *, session=None, convert_schema=None)
flush()합니다. 반환: 도메인 또는 ORM..where(flt: BaseRepoFilter | None) -> ListQuery
None이면 무시..order_by(items: Sequence[Any]) -> ListQuery
Enum.value(문자열), 모델 컬럼 속성, asc()/desc(), 해당 컬럼의 expression.func(...), text("..."), 임의 타입..with_cursor(cursor: dict[str, Any] | None = None) -> ListQuery
None/{}는 첫 페이지.order_by(...) 선행(빈 리스트 불가).order_by 컬럼과 동일, 값은 None 금지..limit(size: int) -> ListQuery
size>=1. OFFSET 모드와 병용 불가..paging(*, page: int, size: int) -> ListQuery
page>=1, size>=1, 1회만 호출. CURSOR와 병용 불가.규칙
bool -> .is_(val).in_(...) (빈 시퀀스는 생략)==__aliases__로 필드명->컬럼명 매핑 가능__strict__ = True면 매핑 실패 시 예외예시
@dataclass
class UserFilter(BaseRepoFilter):
id: int | list[int] | None = None
active: bool | None = None
__aliases__ = {"org_ids": "org_id"}
UserFilter(id=[1, 2, 3]) 형태로 사용 가능합니다.mapping_schema 지정 시 기본 반환은 Schema(Pydantic)convert_schema=False로 ORM 반환 가능BaseMapper를 지정하면 도메인↔ORM 변환을 커스터마이즈예시
class UserMapper(BaseMapper):
def to_schema(self, db: User) -> UserSchema:
return UserSchema(id=db.id, name=db.name, email="Changed")
def to_orm(self, dm: UserSchema) -> User:
return User(**dm.model_dump(exclude=["name"]), name="fixed")
class UserRepo(BaseRepository[User, UserSchema]):
filter_class = UserFilter
mapper = UserMapper
row = await repo.get(UserFilter(id=1))
assert row.email == "Changed"
mapper(BaseMapper)를 지정한 경우, BaseRepository는 mapping_schema 필드 이름과
ORM 모델 컬럼 이름의 1:1 일치 여부를 검증하지 않습니다.
의도
동작 요약
class UserRepo(BaseRepository[User, UserSchema]):
filter_class = UserFilter
mapper = UserMapper # mapper가 존재하면 column-only strict 검증 비활성화
명령어 모음
makefile의 명령어를 참고하세요.
perf-list
tests/perf/results에 jsonl로 저장perf-cpu
perf-db
perf-view
RUN_ID={RUN_ID}를 붙이면 특정 RUN_ID 결과 표시세팅 방법
tests/perf/perf_cpu_only.py 상단에서 ROW_VALUES, ITERATIONS 변경 가능tests/perf/seed.config.py에서 SEED_DATA_ROWS 변경 가능