feat: backend core — models, auth, CRUD, tests

This commit is contained in:
Nox (OpenClaw)
2026-03-17 16:16:08 +00:00
parent d8c2048a9b
commit 21339d771d
35 changed files with 2161 additions and 1 deletions
View File
+134
View File
@@ -0,0 +1,134 @@
import uuid
from fastapi import HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.category import Category, CategoryType
from app.models.transaction import Transaction
from app.schemas.category import CategoryCreate, CategoryUpdate
from app.utils import utcnow
# Default categories created at registration
_DEFAULT_CATEGORIES = [
{"name": "Alimentation", "type": CategoryType.expense, "color": "#22c55e", "icon": "utensils"},
{"name": "Transport", "type": CategoryType.expense, "color": "#3b82f6", "icon": "car"},
{"name": "Logement", "type": CategoryType.expense, "color": "#f59e0b", "icon": "home"},
{"name": "Santé", "type": CategoryType.expense, "color": "#ef4444", "icon": "heart-pulse"},
{"name": "Loisirs", "type": CategoryType.expense, "color": "#a855f7", "icon": "gamepad-2"},
{"name": "Divers", "type": CategoryType.expense, "color": "#6b7280", "icon": "package"},
{"name": "Salaire", "type": CategoryType.income, "color": "#10b981", "icon": "briefcase"},
{"name": "Freelance", "type": CategoryType.income, "color": "#06b6d4", "icon": "laptop"},
{"name": "Remboursement", "type": CategoryType.income, "color": "#8b5cf6", "icon": "refresh-cw"},
]
async def create_default_categories(
session: AsyncSession, user_id: uuid.UUID
) -> list[Category]:
"""Create the default categories for a newly registered user."""
categories = []
for data in _DEFAULT_CATEGORIES:
cat = Category(user_id=user_id, is_default=True, **data)
session.add(cat)
categories.append(cat)
await session.flush()
return categories
async def list_categories(
session: AsyncSession,
user_id: uuid.UUID,
*,
type_filter: CategoryType | None = None,
) -> list[Category]:
query = select(Category).where(
Category.user_id == user_id,
Category.deleted_at.is_(None),
)
if type_filter is not None:
query = query.where(Category.type == type_filter)
query = query.order_by(Category.name)
result = await session.execute(query)
return list(result.scalars().all())
async def get_category(
session: AsyncSession,
user_id: uuid.UUID,
category_id: uuid.UUID,
) -> Category:
result = await session.execute(
select(Category).where(
Category.id == category_id,
Category.user_id == user_id,
Category.deleted_at.is_(None),
)
)
cat = result.scalar_one_or_none()
if cat is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
return cat
async def create_category(
session: AsyncSession,
user_id: uuid.UUID,
data: CategoryCreate,
) -> Category:
cat = Category(
user_id=user_id,
name=data.name,
type=data.type,
color=data.color,
icon=data.icon,
is_default=False,
)
session.add(cat)
await session.commit()
await session.refresh(cat)
return cat
async def update_category(
session: AsyncSession,
user_id: uuid.UUID,
category_id: uuid.UUID,
data: CategoryUpdate,
) -> Category:
cat = await get_category(session, user_id, category_id)
if data.name is not None:
cat.name = data.name
if data.color is not None:
cat.color = data.color
if data.icon is not None:
cat.icon = data.icon
await session.commit()
await session.refresh(cat)
return cat
async def delete_category(
session: AsyncSession,
user_id: uuid.UUID,
category_id: uuid.UUID,
) -> None:
cat = await get_category(session, user_id, category_id)
# Refuse deletion if active transactions exist
count_result = await session.execute(
select(func.count()).where(
Transaction.category_id == category_id,
Transaction.user_id == user_id,
Transaction.deleted_at.is_(None),
)
)
active_count = count_result.scalar_one()
if active_count > 0:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot delete category: {active_count} active transaction(s) are linked to it",
)
cat.deleted_at = utcnow()
await session.commit()
@@ -0,0 +1,134 @@
> import uuid
> from fastapi import HTTPException, status
> from sqlalchemy import func, select
> from sqlalchemy.ext.asyncio import AsyncSession
> from app.models.category import Category, CategoryType
> from app.models.transaction import Transaction
> from app.schemas.category import CategoryCreate, CategoryUpdate
> from app.utils import utcnow
# Default categories created at registration
> _DEFAULT_CATEGORIES = [
> {"name": "Alimentation", "type": CategoryType.expense, "color": "#22c55e", "icon": "utensils"},
> {"name": "Transport", "type": CategoryType.expense, "color": "#3b82f6", "icon": "car"},
> {"name": "Logement", "type": CategoryType.expense, "color": "#f59e0b", "icon": "home"},
> {"name": "Santé", "type": CategoryType.expense, "color": "#ef4444", "icon": "heart-pulse"},
> {"name": "Loisirs", "type": CategoryType.expense, "color": "#a855f7", "icon": "gamepad-2"},
> {"name": "Divers", "type": CategoryType.expense, "color": "#6b7280", "icon": "package"},
> {"name": "Salaire", "type": CategoryType.income, "color": "#10b981", "icon": "briefcase"},
> {"name": "Freelance", "type": CategoryType.income, "color": "#06b6d4", "icon": "laptop"},
> {"name": "Remboursement", "type": CategoryType.income, "color": "#8b5cf6", "icon": "refresh-cw"},
> ]
> async def create_default_categories(
> session: AsyncSession, user_id: uuid.UUID
> ) -> list[Category]:
> """Create the default categories for a newly registered user."""
> categories = []
> for data in _DEFAULT_CATEGORIES:
> cat = Category(user_id=user_id, is_default=True, **data)
> session.add(cat)
> categories.append(cat)
> await session.flush()
! return categories
> async def list_categories(
> session: AsyncSession,
> user_id: uuid.UUID,
> *,
> type_filter: CategoryType | None = None,
> ) -> list[Category]:
> query = select(Category).where(
> Category.user_id == user_id,
> Category.deleted_at.is_(None),
> )
> if type_filter is not None:
> query = query.where(Category.type == type_filter)
> query = query.order_by(Category.name)
> result = await session.execute(query)
! return list(result.scalars().all())
> async def get_category(
> session: AsyncSession,
> user_id: uuid.UUID,
> category_id: uuid.UUID,
> ) -> Category:
> result = await session.execute(
> select(Category).where(
> Category.id == category_id,
> Category.user_id == user_id,
> Category.deleted_at.is_(None),
> )
> )
! cat = result.scalar_one_or_none()
! if cat is None:
! raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
! return cat
> async def create_category(
> session: AsyncSession,
> user_id: uuid.UUID,
> data: CategoryCreate,
> ) -> Category:
> cat = Category(
> user_id=user_id,
> name=data.name,
> type=data.type,
> color=data.color,
> icon=data.icon,
> is_default=False,
> )
> session.add(cat)
> await session.commit()
! await session.refresh(cat)
! return cat
> async def update_category(
> session: AsyncSession,
> user_id: uuid.UUID,
> category_id: uuid.UUID,
> data: CategoryUpdate,
> ) -> Category:
> cat = await get_category(session, user_id, category_id)
! if data.name is not None:
! cat.name = data.name
! if data.color is not None:
! cat.color = data.color
! if data.icon is not None:
! cat.icon = data.icon
! await session.commit()
! await session.refresh(cat)
! return cat
> async def delete_category(
> session: AsyncSession,
> user_id: uuid.UUID,
> category_id: uuid.UUID,
> ) -> None:
> cat = await get_category(session, user_id, category_id)
# Refuse deletion if active transactions exist
! count_result = await session.execute(
! select(func.count()).where(
! Transaction.category_id == category_id,
! Transaction.user_id == user_id,
! Transaction.deleted_at.is_(None),
! )
! )
! active_count = count_result.scalar_one()
! if active_count > 0:
! raise HTTPException(
! status_code=status.HTTP_409_CONFLICT,
! detail=f"Cannot delete category: {active_count} active transaction(s) are linked to it",
! )
! cat.deleted_at = utcnow()
! await session.commit()
+152
View File
@@ -0,0 +1,152 @@
import uuid
from datetime import date
from fastapi import HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.transaction import Transaction, TransactionType
from app.schemas.transaction import TransactionCreate, TransactionUpdate
from app.utils import utcnow
def _month_bounds(month: str) -> tuple[date, date]:
"""Return (start_inclusive, end_exclusive) date bounds for a YYYY-MM string."""
year, mon = map(int, month.split("-"))
start = date(year, mon, 1)
if mon == 12:
end = date(year + 1, 1, 1)
else:
end = date(year, mon + 1, 1)
return start, end
async def create_transaction(
session: AsyncSession,
user_id: uuid.UUID,
data: TransactionCreate,
) -> Transaction:
tx = Transaction(
user_id=user_id,
category_id=data.category_id,
amount_cents=data.amount_cents,
type=data.type,
description=data.description,
transaction_date=data.transaction_date,
)
session.add(tx)
await session.commit()
# Reload with category relationship
result = await session.execute(
select(Transaction)
.options(selectinload(Transaction.category))
.where(Transaction.id == tx.id)
)
return result.scalar_one()
async def list_transactions(
session: AsyncSession,
user_id: uuid.UUID,
*,
month: str | None = None,
category_id: uuid.UUID | None = None,
type_filter: TransactionType | None = None,
page: int = 1,
per_page: int = 20,
) -> tuple[list[Transaction], int]:
base_query = select(Transaction).where(
Transaction.user_id == user_id,
Transaction.deleted_at.is_(None),
)
if month is not None:
start, end = _month_bounds(month)
base_query = base_query.where(
Transaction.transaction_date >= start,
Transaction.transaction_date < end,
)
if category_id is not None:
base_query = base_query.where(Transaction.category_id == category_id)
if type_filter is not None:
base_query = base_query.where(Transaction.type == type_filter)
# Total count (before pagination)
count_result = await session.execute(
select(func.count()).select_from(base_query.subquery())
)
total = count_result.scalar_one()
# Paginated rows with category eager-loaded
items_query = (
base_query.options(selectinload(Transaction.category))
.order_by(Transaction.transaction_date.desc(), Transaction.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
)
result = await session.execute(items_query)
return list(result.scalars().all()), total
async def get_transaction(
session: AsyncSession,
user_id: uuid.UUID,
transaction_id: uuid.UUID,
) -> Transaction:
result = await session.execute(
select(Transaction)
.options(selectinload(Transaction.category))
.where(
Transaction.id == transaction_id,
Transaction.user_id == user_id,
Transaction.deleted_at.is_(None),
)
)
tx = result.scalar_one_or_none()
if tx is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Transaction not found")
return tx
async def update_transaction(
session: AsyncSession,
user_id: uuid.UUID,
transaction_id: uuid.UUID,
data: TransactionUpdate,
) -> Transaction:
tx = await get_transaction(session, user_id, transaction_id)
if data.amount_cents is not None:
tx.amount_cents = data.amount_cents
if data.type is not None:
tx.type = data.type
if data.category_id is not None:
tx.category_id = data.category_id
if data.description is not None:
tx.description = data.description
if data.transaction_date is not None:
tx.transaction_date = data.transaction_date
tx.updated_at = utcnow()
await session.commit()
# Reload with fresh category
result = await session.execute(
select(Transaction)
.options(selectinload(Transaction.category))
.where(Transaction.id == tx.id)
)
return result.scalar_one()
async def delete_transaction(
session: AsyncSession,
user_id: uuid.UUID,
transaction_id: uuid.UUID,
) -> None:
tx = await get_transaction(session, user_id, transaction_id)
tx.deleted_at = utcnow()
tx.updated_at = utcnow()
await session.commit()
@@ -0,0 +1,152 @@
> import uuid
> from datetime import date
> from fastapi import HTTPException, status
> from sqlalchemy import func, select
> from sqlalchemy.ext.asyncio import AsyncSession
> from sqlalchemy.orm import selectinload
> from app.models.transaction import Transaction, TransactionType
> from app.schemas.transaction import TransactionCreate, TransactionUpdate
> from app.utils import utcnow
> def _month_bounds(month: str) -> tuple[date, date]:
> """Return (start_inclusive, end_exclusive) date bounds for a YYYY-MM string."""
> year, mon = map(int, month.split("-"))
> start = date(year, mon, 1)
> if mon == 12:
! end = date(year + 1, 1, 1)
> else:
> end = date(year, mon + 1, 1)
> return start, end
> async def create_transaction(
> session: AsyncSession,
> user_id: uuid.UUID,
> data: TransactionCreate,
> ) -> Transaction:
> tx = Transaction(
> user_id=user_id,
> category_id=data.category_id,
> amount_cents=data.amount_cents,
> type=data.type,
> description=data.description,
> transaction_date=data.transaction_date,
> )
> session.add(tx)
> await session.commit()
# Reload with category relationship
! result = await session.execute(
! select(Transaction)
! .options(selectinload(Transaction.category))
! .where(Transaction.id == tx.id)
! )
! return result.scalar_one()
> async def list_transactions(
> session: AsyncSession,
> user_id: uuid.UUID,
> *,
> month: str | None = None,
> category_id: uuid.UUID | None = None,
> type_filter: TransactionType | None = None,
> page: int = 1,
> per_page: int = 20,
> ) -> tuple[list[Transaction], int]:
> base_query = select(Transaction).where(
> Transaction.user_id == user_id,
> Transaction.deleted_at.is_(None),
> )
> if month is not None:
> start, end = _month_bounds(month)
> base_query = base_query.where(
> Transaction.transaction_date >= start,
> Transaction.transaction_date < end,
> )
> if category_id is not None:
> base_query = base_query.where(Transaction.category_id == category_id)
> if type_filter is not None:
> base_query = base_query.where(Transaction.type == type_filter)
# Total count (before pagination)
> count_result = await session.execute(
> select(func.count()).select_from(base_query.subquery())
> )
! total = count_result.scalar_one()
# Paginated rows with category eager-loaded
! items_query = (
! base_query.options(selectinload(Transaction.category))
! .order_by(Transaction.transaction_date.desc(), Transaction.created_at.desc())
! .offset((page - 1) * per_page)
! .limit(per_page)
! )
! result = await session.execute(items_query)
! return list(result.scalars().all()), total
> async def get_transaction(
> session: AsyncSession,
> user_id: uuid.UUID,
> transaction_id: uuid.UUID,
> ) -> Transaction:
> result = await session.execute(
> select(Transaction)
> .options(selectinload(Transaction.category))
> .where(
> Transaction.id == transaction_id,
> Transaction.user_id == user_id,
> Transaction.deleted_at.is_(None),
> )
> )
! tx = result.scalar_one_or_none()
! if tx is None:
! raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Transaction not found")
! return tx
> async def update_transaction(
> session: AsyncSession,
> user_id: uuid.UUID,
> transaction_id: uuid.UUID,
> data: TransactionUpdate,
> ) -> Transaction:
> tx = await get_transaction(session, user_id, transaction_id)
! if data.amount_cents is not None:
! tx.amount_cents = data.amount_cents
! if data.type is not None:
! tx.type = data.type
! if data.category_id is not None:
! tx.category_id = data.category_id
! if data.description is not None:
! tx.description = data.description
! if data.transaction_date is not None:
! tx.transaction_date = data.transaction_date
! tx.updated_at = utcnow()
! await session.commit()
# Reload with fresh category
! result = await session.execute(
! select(Transaction)
! .options(selectinload(Transaction.category))
! .where(Transaction.id == tx.id)
! )
! return result.scalar_one()
> async def delete_transaction(
> session: AsyncSession,
> user_id: uuid.UUID,
> transaction_id: uuid.UUID,
> ) -> None:
> tx = await get_transaction(session, user_id, transaction_id)
! tx.deleted_at = utcnow()
! tx.updated_at = utcnow()
! await session.commit()