Files
hackathon-fenix-dd4242d3e4a…/backend/main.py

104 lines
4.2 KiB
Python

from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from apscheduler.schedulers.background import BackgroundScheduler
from database import engine, get_db
import models, schemas, auth, simulator
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="HackOnLinces 2026 - Recolección de Residuos")
app.add_middleware(CORSMiddleware, allow_origins=["*"],
allow_methods=["*"], allow_headers=["*"])
scheduler = BackgroundScheduler()
scheduler.add_job(simulator.avanzar_rutas, "interval", minutes=2)
scheduler.start()
@app.on_event("startup")
def startup():
db = next(get_db())
simulator.init_rutas(db)
@app.get("/")
def root():
return {"message": "API HackOnLinces 2026 funcionando"}
@app.post("/auth/register", response_model=schemas.Token)
def register(user: schemas.UsuarioCreate, db: Session = Depends(get_db)):
if db.query(models.Usuario).filter_by(email=user.email).first():
raise HTTPException(status_code=400, detail="Email ya registrado")
nuevo = models.Usuario(email=user.email,
hashed_password=auth.hash_password(user.password))
db.add(nuevo)
db.commit()
db.refresh(nuevo)
token = auth.create_token({"sub": nuevo.email})
return {"access_token": token, "token_type": "bearer"}
@app.post("/auth/login", response_model=schemas.Token)
def login(user: schemas.UsuarioLogin, db: Session = Depends(get_db)):
db_user = db.query(models.Usuario).filter_by(email=user.email).first()
if not db_user or not auth.verify_password(user.password, db_user.hashed_password):
raise HTTPException(status_code=401, detail="Credenciales incorrectas")
token = auth.create_token({"sub": db_user.email})
return {"access_token": token, "token_type": "bearer"}
@app.post("/domicilios", response_model=schemas.DomicilioResponse)
def crear_domicilio(data: schemas.DomicilioCreate,
current_user=Depends(auth.get_current_user),
db: Session = Depends(get_db)):
colonia_key = data.colonia.lower()
colonia_info = simulator.COLONIAS.get(colonia_key)
if not colonia_info:
raise HTTPException(status_code=404, detail="Colonia no encontrada en el sistema")
dom = models.Domicilio(direccion=data.direccion, colonia=data.colonia,
lat=data.lat, lng=data.lng,
route_id=colonia_info["routeId"],
usuario_id=current_user.id)
db.add(dom)
db.commit()
db.refresh(dom)
return dom
@app.get("/eta/{domicilio_id}", response_model=schemas.ETAResponse)
def get_eta(domicilio_id: int,
current_user=Depends(auth.get_current_user),
db: Session = Depends(get_db)):
dom = db.query(models.Domicilio).filter_by(id=domicilio_id).first()
if not dom:
raise HTTPException(status_code=404, detail="Domicilio no encontrado")
if dom.usuario_id != current_user.id:
raise HTTPException(status_code=403, detail="No tienes acceso a este domicilio")
eta = simulator.get_eta(dom.route_id, db)
if not eta:
raise HTTPException(status_code=404, detail="Ruta no encontrada")
return {**eta, "route_id": dom.route_id, "colonia": dom.colonia}
@app.post("/reportes")
def crear_reporte(
domicilio_id: int,
tipo: str,
descripcion: str,
current_user=Depends(auth.get_current_user),
db: Session = Depends(get_db)
):
dom = db.query(models.Domicilio).filter_by(id=domicilio_id).first()
if not dom or dom.usuario_id != current_user.id:
raise HTTPException(status_code=403, detail="No tienes acceso a este domicilio")
return {
"mensaje": "Reporte recibido correctamente",
"tipo": tipo,
"domicilio_id": domicilio_id,
"descripcion": descripcion,
"estado": "PENDIENTE"
}
@app.get("/domicilios")
def listar_domicilios(
current_user=Depends(auth.get_current_user),
db: Session = Depends(get_db)
):
domicilios = db.query(models.Domicilio).filter_by(usuario_id=current_user.id).all()
return [{"id": d.id, "direccion": d.direccion, "colonia": d.colonia, "route_id": d.route_id} for d in domicilios]