import uuid from datetime import date from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.budget import Budget from app.models.category import Category from app.models.transaction import Transaction, TransactionType from app.schemas.dashboard import ( BudgetAlert, CategoryExpense, DashboardResponse, MonthlyTrend, ) def _month_bounds(month: str) -> tuple[date, date]: year, mon = map(int, month.split("-")) start = date(year, mon, 1) if mon == 12: end = date(year + 1, 1, 1) else: end = date(year, mon + 1, 1) return start, end def _prev_month(month: str) -> str: year, mon = map(int, month.split("-")) if mon == 1: return f"{year - 1}-12" return f"{year}-{mon - 1:02d}" async def get_dashboard( session: AsyncSession, user_id: uuid.UUID, month: str, ) -> DashboardResponse: start, end = _month_bounds(month) # --- All-time balance (cumulative since origin) --- balance_result = await session.execute( select( func.coalesce( func.sum( func.case( (Transaction.type == TransactionType.income, Transaction.amount_cents), else_=-Transaction.amount_cents, ) ), 0, ) ).where( Transaction.user_id == user_id, Transaction.deleted_at.is_(None), ) ) balance_cents: int = balance_result.scalar_one() # --- Month income & expenses --- month_result = await session.execute( select( func.coalesce( func.sum( func.case( (Transaction.type == TransactionType.income, Transaction.amount_cents), else_=0, ) ), 0, ).label("income"), func.coalesce( func.sum( func.case( (Transaction.type == TransactionType.expense, Transaction.amount_cents), else_=0, ) ), 0, ).label("expense"), ).where( Transaction.user_id == user_id, Transaction.deleted_at.is_(None), Transaction.transaction_date >= start, Transaction.transaction_date < end, ) ) month_row = month_result.one() total_income_cents: int = month_row.income total_expense_cents: int = month_row.expense # --- Expenses by category for the month --- cat_result = await session.execute( select( Transaction.category_id, func.sum(Transaction.amount_cents).label("total"), ) .where( Transaction.user_id == user_id, Transaction.deleted_at.is_(None), Transaction.type == TransactionType.expense, Transaction.transaction_date >= start, Transaction.transaction_date < end, ) .group_by(Transaction.category_id) ) cat_rows = cat_result.all() # Fetch category names category_ids = [row.category_id for row in cat_rows] by_category: list[CategoryExpense] = [] if category_ids: cats_result = await session.execute( select(Category).where(Category.id.in_(category_ids)) ) cats_map = {c.id: c for c in cats_result.scalars().all()} for row in cat_rows: cat = cats_map.get(row.category_id) by_category.append( CategoryExpense( category_id=row.category_id, category_name=cat.name if cat else "?", color=cat.color if cat else None, amount_cents=row.total, ) ) # --- Monthly trend: last 6 months --- monthly_trend: list[MonthlyTrend] = [] cur = month months_to_fetch = [] for _ in range(6): months_to_fetch.append(cur) cur = _prev_month(cur) months_to_fetch.reverse() for m in months_to_fetch: m_start, m_end = _month_bounds(m) trend_result = await session.execute( select( func.coalesce( func.sum( func.case( (Transaction.type == TransactionType.income, Transaction.amount_cents), else_=0, ) ), 0, ).label("income"), func.coalesce( func.sum( func.case( (Transaction.type == TransactionType.expense, Transaction.amount_cents), else_=0, ) ), 0, ).label("expense"), ).where( Transaction.user_id == user_id, Transaction.deleted_at.is_(None), Transaction.transaction_date >= m_start, Transaction.transaction_date < m_end, ) ) trend_row = trend_result.one() monthly_trend.append( MonthlyTrend( month=m, income_cents=trend_row.income, expense_cents=trend_row.expense, ) ) # --- Budget alerts (>= 80% spent) --- budgets_result = await session.execute( select(Budget) .options(selectinload(Budget.category)) .where( Budget.user_id == user_id, Budget.month == month, ) ) budgets = budgets_result.scalars().all() budget_alerts: list[BudgetAlert] = [] for budget in budgets: spent_result = await session.execute( select(func.coalesce(func.sum(Transaction.amount_cents), 0)).where( Transaction.user_id == user_id, Transaction.deleted_at.is_(None), Transaction.type == TransactionType.expense, Transaction.category_id == budget.category_id, Transaction.transaction_date >= start, Transaction.transaction_date < end, ) ) spent_cents: int = spent_result.scalar_one() percentage = (spent_cents / budget.limit_cents * 100) if budget.limit_cents > 0 else 0 if percentage >= 80: budget_alerts.append( BudgetAlert( budget_id=budget.id, category_name=budget.category.name, limit_cents=budget.limit_cents, spent_cents=spent_cents, percentage=round(percentage, 1), ) ) return DashboardResponse( month=month, balance_cents=balance_cents, total_income_cents=total_income_cents, total_expense_cents=total_expense_cents, net_cents=total_income_cents - total_expense_cents, by_category=by_category, monthly_trend=monthly_trend, budget_alerts=budget_alerts, )