Files
hackathon-acapulquitos-boys…/server/app/api/routes/auth_router.py

121 lines
3.6 KiB
Python

from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr
import jwt
from passlib.context import CryptContext
from app.core.config import settings
from app.db.database import get_db
router = APIRouter()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserRegister(BaseModel):
email: EmailStr
phone: str | None = None
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user_id: str
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(user_id: str) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
payload = {"sub": user_id, "exp": expire}
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
@router.post("/register", response_model=TokenResponse)
async def register(user: UserRegister):
db = get_db()
try:
# Verificar si email existe
existing = db.table("users").select("id").eq("email", user.email).execute()
if existing.data:
raise HTTPException(status_code=400, detail="Email already registered")
# Crear usuario
password_hash = hash_password(user.password)
user_data = {
"email": user.email,
"phone": user.phone,
"password_hash": password_hash,
}
new_user = db.table("users").insert(user_data).execute()
user_id = new_user.data[0]["id"]
# Crear preferencias por defecto
db.table("notification_preferences").insert({
"user_id": user_id,
"notify_proximity": True,
"notify_breakdown": True,
"notify_delay": True,
"notify_route_start": True,
}).execute()
token = create_token(user_id)
return TokenResponse(access_token=token, user_id=user_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/login", response_model=TokenResponse)
async def login(user: UserLogin):
db = get_db()
try:
# Buscar usuario por email
result = db.table("users").select("id, password_hash").eq("email", user.email).execute()
if not result.data:
raise HTTPException(status_code=401, detail="Invalid credentials")
db_user = result.data[0]
# Verificar password
if not verify_password(user.password, db_user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
user_id = db_user["id"]
token = create_token(user_id)
return TokenResponse(access_token=token, user_id=user_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/verify")
async def verify_token(token: str):
"""Verifica si el JWT es válido."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
return {"valid": True, "user_id": user_id}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")