from datetime import datetime, timedelta from fastapi import APIRouter, HTTPException, status from pydantic import BaseModel, EmailStr import jwt from passlib.context import CryptContext from app.core.config import settings from app.db.database import get_db router = APIRouter() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class UserRegister(BaseModel): email: EmailStr phone: str | None = None password: str class UserLogin(BaseModel): email: EmailStr password: str class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" user_id: str def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) payload = {"sub": user_id, "exp": expire} return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) @router.post("/register", response_model=TokenResponse) async def register(user: UserRegister): db = get_db() try: # Verificar si email existe existing = db.table("users").select("id").eq("email", user.email).execute() if existing.data: raise HTTPException(status_code=400, detail="Email already registered") # Crear usuario password_hash = hash_password(user.password) user_data = { "email": user.email, "phone": user.phone, "password_hash": password_hash, } new_user = db.table("users").insert(user_data).execute() user_id = new_user.data[0]["id"] # Crear preferencias por defecto db.table("notification_preferences").insert({ "user_id": user_id, "notify_proximity": True, "notify_breakdown": True, "notify_delay": True, "notify_route_start": True, }).execute() token = create_token(user_id) return TokenResponse(access_token=token, user_id=user_id) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/login", response_model=TokenResponse) async def login(user: UserLogin): db = get_db() try: # Buscar usuario por email result = db.table("users").select("id, password_hash").eq("email", user.email).execute() if not result.data: raise HTTPException(status_code=401, detail="Invalid credentials") db_user = result.data[0] # Verificar password if not verify_password(user.password, db_user["password_hash"]): raise HTTPException(status_code=401, detail="Invalid credentials") user_id = db_user["id"] token = create_token(user_id) return TokenResponse(access_token=token, user_id=user_id) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/verify") async def verify_token(token: str): """Verifica si el JWT es vĂ¡lido.""" try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=401, detail="Invalid token") return {"valid": True, "user_id": user_id} except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token")