""" =============================================================== SISTEMA DE NOTIFICACIÓN PRIVADA DE RECOLECCIÓN DE RESIDUOS Backend MVP - Hackathon 24h =============================================================== Stack: FastAPI + SQLite (SQLAlchemy) + Firebase Admin SDK CÓMO CORRER: pip install fastapi uvicorn sqlalchemy firebase-admin uvicorn main:app --reload --port 8000 ATAJO DE HACKATHON: Usamos SQLite para cero configuración. En producción cambiaría a PostgreSQL solo cambiando DATABASE_URL. =============================================================== """ from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, inspect, text from sqlalchemy.orm import declarative_base, sessionmaker, Session, relationship from pydantic import BaseModel from typing import Optional, List, Dict from datetime import datetime, timezone, timedelta import logging # --------------------------------------------------------------- # CONFIGURACIÓN DE LOGGING # Útil para ver en consola qué está pasando sin un debugger real # --------------------------------------------------------------- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --------------------------------------------------------------- # CONFIGURACIÓN DE BASE DE DATOS (SQLite - Hackathon mode) # # DATABASE_URL apunta a un archivo local "hackathon.db". # check_same_thread=False es NECESARIO para FastAPI porque # maneja requests en múltiples threads con el mismo engine. # --------------------------------------------------------------- DATABASE_URL = "sqlite:///./hackathon.db" engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False) Base = declarative_base() # =============================================================== # MODELOS DE BASE DE DATOS (SQLAlchemy ORM) # =============================================================== class Usuario(Base): """ Tabla de usuarios del sistema. fcm_token: Token de Firebase Cloud Messaging. Cada dispositivo móvil genera uno único. Es lo que necesitamos para enviar notificaciones push. Se guarda aquí al hacer login en la app. ATAJO: No hay autenticación real (JWT, OAuth). Para el hackathon el usuario_id es suficiente. En producción: agrega auth. """ __tablename__ = "usuarios" id = Column(Integer, primary_key=True, index=True) nombre = Column(String, nullable=False) email = Column(String, nullable=False, unique=True, index=True) # Token FCM que Flutter registrará al iniciar la app fcm_token = Column(String, nullable=True) # Relación 1-a-muchos con Domicilio (un usuario puede tener varias direcciones) direcciones = relationship("Domicilio", back_populates="usuario", cascade="all, delete-orphan") class Domicilio(Base): """ Tabla de domicilios asociados a cada usuario. colonia: Nombre de la colonia, es la clave para buscar en nuestros datos mockeados y obtener el route_id correspondiente. PRIVACIDAD POR DISEÑO: El route_id se guarda internamente pero NUNCA se expone en los endpoints públicos. El usuario solo ve su ETA, no la ruta completa del camión. """ __tablename__ = "domicilios" id = Column(Integer, primary_key=True, index=True) usuario_id = Column(Integer, ForeignKey("usuarios.id")) colonia = Column(String, nullable=False) direccion = Column(String, nullable=False) # route_id es dato interno - no lo devolvemos al cliente route_id = Column(String, nullable=False) usuario = relationship("Usuario", back_populates="direcciones") # Asegura que la DB local tenga las columnas necesarias cuando se actualiza el esquema. inspector = inspect(engine) if inspector.has_table("usuarios"): columnas_usuario = [col["name"] for col in inspector.get_columns("usuarios")] if "email" not in columnas_usuario: with engine.connect() as conn: conn.execute(text("ALTER TABLE usuarios ADD COLUMN email TEXT")) if inspector.has_table("domicilios"): columnas_domicilio = [col["name"] for col in inspector.get_columns("domicilios")] if "direccion" not in columnas_domicilio: with engine.connect() as conn: conn.execute(text("ALTER TABLE domicilios ADD COLUMN direccion TEXT NOT NULL DEFAULT ''")) # Crea las tablas nuevas en hackathon.db si no existen. Base.metadata.create_all(bind=engine) # =============================================================== # DATOS MOCKEADOS EN MEMORIA # # ATAJO DE HACKATHON: Evitamos una tabla extra de "Rutas" y # "Horarios" usando simples diccionarios. Esto nos ahorra 2h de # desarrollo. En producción, estos datos vendrían de la DB. # =============================================================== # Rutas disponibles y sus posiciones GPS para monitoreo de avance. ROUTE_DATA: List[Dict[str, object]] = [ { "route_id": "RUTA-01", "name": "Zona Centro - Las Arboledas", "truck_id": 101, "status": "EN_RUTA", "positions": [ {"positionId": 1, "lat": 20.5111, "lng": -100.9037, "speed": 0, "timestamp": "2026-05-22T06:00:00Z"}, {"positionId": 2, "lat": 20.5185, "lng": -100.8450, "speed": 45, "timestamp": "2026-05-22T06:12:00Z"}, {"positionId": 3, "lat": 20.5215, "lng": -100.8142, "speed": 22, "timestamp": "2026-05-22T06:25:00Z"}, {"positionId": 4, "lat": 20.5212, "lng": -100.8175, "speed": 15, "timestamp": "2026-05-22T06:38:00Z"}, {"positionId": 5, "lat": 20.5210, "lng": -100.8210, "speed": 0, "timestamp": "2026-05-22T06:50:00Z"}, {"positionId": 6, "lat": 20.5235, "lng": -100.8212, "speed": 18, "timestamp": "2026-05-22T07:05:00Z"}, {"positionId": 7, "lat": 20.5260, "lng": -100.8215, "speed": 20, "timestamp": "2026-05-22T07:18:00Z"}, {"positionId": 8, "lat": 20.5111, "lng": -100.9037, "speed": 40, "timestamp": "2026-05-22T07:40:00Z"}, ], }, { "route_id": "RUTA-02", "name": "Sector Norte - Av. Tecnológico", "truck_id": 102, "status": "EN_RUTA", "positions": [ {"positionId": 1, "lat": 20.5111, "lng": -100.9037, "speed": 0, "timestamp": "2026-05-22T06:05:00Z"}, {"positionId": 2, "lat": 20.5280, "lng": -100.8135, "speed": 38, "timestamp": "2026-05-22T06:18:00Z"}, {"positionId": 3, "lat": 20.5410, "lng": -100.8130, "speed": 25, "timestamp": "2026-05-22T06:30:00Z"}, {"positionId": 4, "lat": 20.5445, "lng": -100.8132, "speed": 12, "timestamp": "2026-05-22T06:45:00Z"}, {"positionId": 5, "lat": 20.5480, "lng": -100.8135, "speed": 0, "timestamp": "2026-05-22T06:58:00Z"}, {"positionId": 6, "lat": 20.5515, "lng": -100.8138, "speed": 15, "timestamp": "2026-05-22T07:10:00Z"}, {"positionId": 7, "lat": 20.5540, "lng": -100.8110, "speed": 22, "timestamp": "2026-05-22T07:25:00Z"}, {"positionId": 8, "lat": 20.5111, "lng": -100.9037, "speed": 45, "timestamp": "2026-05-22T07:50:00Z"}, ], }, ] ROUTAS_POR_ID: Dict[str, Dict[str, object]] = {route["route_id"]: route for route in ROUTE_DATA} HORARIOS_POR_RUTA: Dict[str, Dict[str, object]] = { "RUTA-01": {"eta_texto": "Llega en aproximadamente 15 minutos", "eta_minutos": 15}, "RUTA-02": {"eta_texto": "Llega en aproximadamente 30 minutos", "eta_minutos": 30}, "RUTA-03": {"eta_texto": "Llega en aproximadamente 40 minutos", "eta_minutos": 40}, "RUTA-04": {"eta_texto": "Llega en aproximadamente 60 minutos", "eta_minutos": 60}, "RUTA-05": {"eta_texto": "Llega en aproximadamente 75 minutos", "eta_minutos": 75}, "RUTA-12": {"eta_texto": "Llega en aproximadamente 35 minutos", "eta_minutos": 35}, "RUTA-13": {"eta_texto": "Llega en aproximadamente 40 minutos", "eta_minutos": 40}, } HORARIOS_POR_COLONIA: List[Dict[str, str]] = [ {"colonia": "Zona Centro", "routeId": "RUTA-01", "horarioEstimado": "Matutino (06:30 - 07:15)"}, {"colonia": "Las Arboledas", "routeId": "RUTA-01", "horarioEstimado": "Matutino (07:00 - 07:30)"}, {"colonia": "Trojes", "routeId": "RUTA-13", "horarioEstimado": "Matutino (06:40 - 07:10)"}, {"colonia": "San Juanico", "routeId": "RUTA-03", "horarioEstimado": "Matutino (06:45 - 07:15)"}, {"colonia": "Los Olivos", "routeId": "RUTA-04", "horarioEstimado": "Matutino (07:00 - 07:40)"}, {"colonia": "Rancho Seco", "routeId": "RUTA-05", "horarioEstimado": "Vespertino (14:15 - 15:00)"}, {"colonia": "Las Insurgentes", "routeId": "RUTA-12", "horarioEstimado": "Matutino (06:35 - 07:10)"}, ] COLONIAS_A_RUTAS: Dict[str, str] = { item["colonia"]: item["routeId"] for item in HORARIOS_POR_COLONIA } TRIGGER_NOTIFICATIONS = { "ROUTE_START": { "position_id": 2, "title": "¡Ruta Iniciada!", "body": "El camión recolector ha salido del Relleno Sanitario rumbo a tu sector. Asegúrate de tener listos tus residuos." }, "TRUCK_PROXIMITY": { "position_id": 4, "title": "Camión Cercano", "body": "El camión está a menos de 15 minutos de tu domicilio. Es momento de sacar tus bolsas a la acera." }, "ROUTE_COMPLETED": { "position_id": 8, "title": "Servicio Finalizado", "body": "El camión de tu sector ha concluido su jornada de recolección diaria." }, "GPS_OUTAGE": { "title": "Alerta GPS", "body": "El GPS del camión dejó de reportar su ubicación. Estamos investigando la ruta." }, } # Estado de avance de cada ruta en memoria. ROUTE_STATE: Dict[str, Dict[str, object]] = {} for route in ROUTE_DATA: posiciones = route.get("positions", []) ROUTE_STATE[route["route_id"]] = { "last_position_id": posiciones[0]["positionId"] if posiciones else 0, "last_timestamp": datetime.now(timezone.utc), "gps_ok": True, "gps_alert_sent": False, "triggers_sent": {trigger_key: False for trigger_key in TRIGGER_NOTIFICATIONS if trigger_key != "GPS_OUTAGE"}, } # Tipos de evento válidos para el simulador TIPOS_EVENTO_VALIDOS = ["en_camino", "llegando", "completado", "retrasado"] # =============================================================== # CONFIGURACIÓN FIREBASE ADMIN SDK # # CÓMO ACTIVARLO: # 1. Ve a Firebase Console -> Configuración del Proyecto -> # Cuentas de Servicio -> Generar nueva clave privada # 2. Guarda el JSON como "firebase-credentials.json" junto a main.py # 3. Descomenta el bloque de inicialización de abajo # =============================================================== import firebase_admin from firebase_admin import credentials, messaging try: cred = credentials.Certificate("firebase-credentials.json") firebase_admin.initialize_app(cred) logger.info("✅ Firebase Admin SDK inicializado correctamente") except Exception as e: logger.error(f"❌ Error inicializando Firebase: {e}") FIREBASE_ACTIVO = True # Cambia a True al desbloquear Firebase # =============================================================== # INICIALIZACIÓN DE FASTAPI # =============================================================== app = FastAPI( title="Sistema de Notificación de Residuos - MVP Hackathon", description="API privada para notificaciones de recolección de basura", version="0.1.0-hackathon" ) # CORS abierto para desarrollo. En producción: restringe origins. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------- # DEPENDENCIA: Sesión de Base de Datos # # FastAPI usa Dependency Injection. Esta función provee una sesión # de DB a cada endpoint y garantiza que se cierre al terminar, # sin importar si hubo error. Es el patrón estándar de FastAPI+SQLAlchemy. # --------------------------------------------------------------- def get_db(): db = SessionLocal() try: yield db finally: db.close() # =============================================================== # SCHEMAS PYDANTIC (Validación de request/response) # Pydantic valida automáticamente los tipos. FastAPI los usa para # generar la documentación en /docs sin esfuerzo adicional. # =============================================================== class ETAResponse(BaseModel): """Respuesta del endpoint de ETA. Sin route_id por privacidad.""" usuario_id: int colonia: str ruta_nombre: str ruta_status: str gps_ok: bool eta_texto: str eta_minutos: int mensaje_preventivo: str # Ej: "No saques tu basura aún" class UsuarioRegisterRequest(BaseModel): nombre: str email: str colonia: str direccion: str class DireccionRequest(BaseModel): colonia: str direccion: str class UsuarioLoginRequest(BaseModel): email: str class DomicilioResponse(BaseModel): colonia: str direccion: str class UsuarioResponse(BaseModel): usuario_id: int nombre: str email: str direcciones: List[DomicilioResponse] class RoutePositionUpdateRequest(BaseModel): position_id: int lat: float lng: float timestamp: str class RouteStatusResponse(BaseModel): route_id: str name: str status: str last_position_id: int last_timestamp: str gps_ok: bool class RegisterResponse(BaseModel): usuario_id: int mensaje: str class LoginResponse(BaseModel): usuario_id: int mensaje: str class SimularEventoRequest(BaseModel): """ Payload para simular un evento de ruta. route_id: ID interno de la ruta (RUTA-01, RUTA-02...) tipo_evento: Tipo de evento a simular """ route_id: str tipo_evento: str # "en_camino" | "llegando" | "completado" | "retrasado" class SimularEventoResponse(BaseModel): """Respuesta del simulador de eventos.""" usuarios_notificados: int route_id: str tipo_evento: str detalle: list[str] # Log de qué pasó con cada usuario # =============================================================== # UTILIDADES INTERNAS # =============================================================== def generar_mensaje_preventivo(eta_minutos: int) -> str: """ Genera un mensaje contextual basado en el tiempo de llegada. La MENSAJERÍA PREVENTIVA es el core del sistema: evitamos que el usuario saque la basura demasiado pronto o demasiado tarde, mejorando la experiencia y la higiene. """ if eta_minutos <= 5: return "🚛 ¡El camión está muy cerca! Saca tu basura AHORA." elif eta_minutos <= 15: return "⏰ Prepárate, el camión llega pronto. No saques tu basura aún." elif eta_minutos <= 30: return "🕐 Tienes tiempo. No saques tu basura todavía." else: return "😌 Aún falta bastante. Mantén tu basura adentro por ahora." def enviar_notificacion_firebase(fcm_token: str, titulo: str, cuerpo: str) -> bool: """ Envía una notificación push via Firebase Cloud Messaging. ATAJO: La función existe y está lista, pero si FIREBASE_ACTIVO=False solo simula el envío en los logs. Esto nos permite desarrollar el flujo completo sin credenciales reales. Retorna True si el envío fue exitoso (o simulado), False si falló. """ if not FIREBASE_ACTIVO: # Modo simulación: log del intento sin llamar a Firebase logger.info(f"[SIMULADO] Push -> Token: {fcm_token[:20]}... | {titulo}: {cuerpo}") return True # --- CÓDIGO REAL DE FIREBASE (desbloquear cuando FIREBASE_ACTIVO=True) --- try: message = messaging.Message( notification=messaging.Notification(title=titulo, body=cuerpo), token=fcm_token, ) response = messaging.send(message) logger.info(f"✅ Notificación enviada: {response}") return True except Exception as e: logger.error(f"❌ Error enviando push a {fcm_token[:20]}...: {e}") return False def _obtener_ruta_por_colonia(colonia: str) -> Optional[Dict[str, object]]: route_id = COLONIAS_A_RUTAS.get(colonia) if not route_id: return None return ROUTAS_POR_ID.get(route_id) def _calcular_eta_por_ruta(route_id: str) -> Dict[str, object]: """Calcula un ETA con base en la posición actual de la ruta.""" estado = ROUTE_STATE.get(route_id) ruta = ROUTAS_POR_ID.get(route_id, {}) horario = HORARIOS_POR_RUTA.get(route_id) if not horario: return { "eta_texto": "No hay horario disponible.", "eta_minutos": 60, } if estado and ruta.get("positions"): posiciones = ruta["positions"] ultimo_id = estado.get("last_position_id", 1) if ultimo_id >= len(posiciones): return { "eta_texto": "El servicio ya pasó por tu zona.", "eta_minutos": 0, } pasos_restantes = max(0, len(posiciones) - ultimo_id) eta = pasos_restantes * 10 return { "eta_texto": f"Llega en aproximadamente {eta} minutos", "eta_minutos": eta, } return { "eta_texto": horario["eta_texto"], "eta_minutos": horario["eta_minutos"], } def _verificar_gps_outage(route_id: str, db: Session) -> None: """Verifica si una ruta dejó de reportar GPS y notifica a los usuarios una sola vez.""" estado = ROUTE_STATE.get(route_id) if not estado: return ultimo_timestamp = estado.get("last_timestamp", datetime.now(timezone.utc)) gps_ok = (datetime.now(timezone.utc) - ultimo_timestamp) < timedelta(minutes=10) if not gps_ok and not estado.get("gps_alert_sent", False): logger.warning(f"Alerta GPS outage para {route_id}. Enviando notificaciones.") _notificar_ruta(db, route_id, "GPS_OUTAGE") estado["gps_alert_sent"] = True def _obtener_estado_ruta(route_id: str, db: Optional[Session] = None) -> Dict[str, object]: ruta = ROUTAS_POR_ID.get(route_id) estado = ROUTE_STATE.get(route_id, {}) if not ruta: raise ValueError("Ruta no encontrada") ultimo_timestamp = estado.get("last_timestamp", datetime.now(timezone.utc)) gps_ok = (datetime.now(timezone.utc) - ultimo_timestamp) < timedelta(minutes=10) if db is not None and not gps_ok: _verificar_gps_outage(route_id, db) return { "route_id": route_id, "name": ruta.get("name", "Ruta desconocida"), "status": ruta.get("status", "DESCONOCIDA"), "last_position_id": estado.get("last_position_id", 0), "last_timestamp": ultimo_timestamp.isoformat(), "gps_ok": gps_ok, } def _procesar_trigger_posicion(route_id: str, position_id: int, db: Session) -> list[str]: """Envía notificaciones basadas en el position_id y evita duplicados.""" estado = ROUTE_STATE.get(route_id) if not estado: return [] mensajes = [] sent_map = estado.setdefault("triggers_sent", {}) for trigger_key, trigger in TRIGGER_NOTIFICATIONS.items(): if trigger_key == "GPS_OUTAGE": continue if trigger.get("position_id") == position_id and not sent_map.get(trigger_key, False): mensajes.extend(_notificar_ruta(db, route_id, trigger_key)) sent_map[trigger_key] = True return mensajes def _notificar_ruta(db: Session, route_id: str, trigger_key: str) -> list[str]: trigger = TRIGGER_NOTIFICATIONS.get(trigger_key) if not trigger: return [f"Trigger desconocido: {trigger_key}"] domicilios = db.query(Domicilio).filter(Domicilio.route_id == route_id).all() mensajes = [] for domicilio in domicilios: usuario = domicilio.usuario if not usuario or not usuario.fcm_token: mensajes.append(f"Usuario no tiene token o no existe.") continue enviado = enviar_notificacion_firebase(usuario.fcm_token, trigger["title"], trigger["body"]) if enviado: mensajes.append(f"Notificación enviada a {usuario.nombre} (ID {usuario.id}).") else: mensajes.append(f"Fallo al enviar a {usuario.nombre} (ID {usuario.id}).") return mensajes # =============================================================== # ENDPOINT: SEED DE DATOS DE PRUEBA # # Crea usuarios de prueba para poder testear sin un frontend. # Llama: POST /api/seed # ATAJO: En producción, eliminar este endpoint. # =============================================================== @app.post("/api/seed", tags=["Utilidades"]) def seed_datos(db: Session = Depends(get_db)): """ Crea usuarios de prueba en la DB para demos rápidas. Idempotente: si los usuarios ya existen, no hace nada. """ # Verifica si ya hay datos para no duplicar if db.query(Usuario).count() > 0: return {"mensaje": "Ya hay datos en la DB. No se hizo nada."} usuarios_seed = [ {"nombre": "Ana García", "email": "ana@example.com", "colonia": "Zona Centro", "direccion": "Calle Principal 123", "fcm_token": "token-ana-fake-001"}, {"nombre": "Carlos López", "email": "carlos@example.com", "colonia": "Col. Hidalgo", "direccion": "Av. Hidalgo 45", "fcm_token": "token-carlos-fake-002"}, {"nombre": "María Torres", "email": "maria@example.com", "colonia": "Col. Independencia", "direccion": "Calle Luna 12", "fcm_token": "token-maria-fake-003"}, {"nombre": "Pedro Ruiz", "email": "pedro@example.com", "colonia": "Col. San Juan", "direccion": "Calle Sol 78", "fcm_token": "token-pedro-fake-004"}, ] for u in usuarios_seed: colonia = u["colonia"] route_id = COLONIAS_A_RUTAS.get(colonia, "RUTA-01") usuario = Usuario(nombre=u["nombre"], email=u["email"], fcm_token=u["fcm_token"]) db.add(usuario) db.flush() # flush para obtener el id antes del commit domicilio = Domicilio( usuario_id=usuario.id, colonia=colonia, direccion=u["direccion"], route_id=route_id, ) db.add(domicilio) db.commit() logger.info("✅ Seed completado: 4 usuarios creados") return {"mensaje": "Seed exitoso. Usuarios IDs: 1, 2, 3, 4"} @app.post("/api/usuarios/register", response_model=RegisterResponse, tags=["Usuarios"]) def registrar_usuario(payload: UsuarioRegisterRequest, db: Session = Depends(get_db)): existing = db.query(Usuario).filter(Usuario.email == payload.email).first() if existing: raise HTTPException(status_code=400, detail="El correo ya está registrado.") route_id = COLONIAS_A_RUTAS.get(payload.colonia) if not route_id: raise HTTPException(status_code=400, detail="Colonia no válida.") usuario = Usuario(nombre=payload.nombre, email=payload.email.lower().strip(), fcm_token=None) db.add(usuario) db.flush() direccion = Domicilio( usuario_id=usuario.id, colonia=payload.colonia, direccion=payload.direccion, route_id=route_id, ) db.add(direccion) db.commit() return RegisterResponse(usuario_id=usuario.id, mensaje="Usuario registrado correctamente.") @app.post("/api/usuarios/login", response_model=LoginResponse, tags=["Usuarios"]) def login_usuario(payload: UsuarioLoginRequest, db: Session = Depends(get_db)): usuario = db.query(Usuario).filter(Usuario.email == payload.email.lower().strip()).first() if not usuario: raise HTTPException(status_code=404, detail="Usuario no encontrado. Regístrate primero.") return LoginResponse(usuario_id=usuario.id, mensaje="Login exitoso.") @app.get("/api/usuarios/{usuario_id}", response_model=UsuarioResponse, tags=["Usuarios"]) def obtener_usuario(usuario_id: int, db: Session = Depends(get_db)): usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException(status_code=404, detail="Usuario no encontrado.") direcciones = [ DomicilioResponse(colonia=d.colonia, direccion=d.direccion) for d in usuario.direcciones ] return UsuarioResponse( usuario_id=usuario.id, nombre=usuario.nombre, email=usuario.email, direcciones=direcciones, ) @app.post("/api/usuarios/{usuario_id}/direcciones", tags=["Usuarios"]) def agregar_direccion(usuario_id: int, payload: DireccionRequest, db: Session = Depends(get_db)): usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException(status_code=404, detail="Usuario no encontrado.") route_id = COLONIAS_A_RUTAS.get(payload.colonia) if not route_id: raise HTTPException(status_code=400, detail="Colonia no válida.") direccion = Domicilio( usuario_id=usuario.id, colonia=payload.colonia, direccion=payload.direccion, route_id=route_id, ) db.add(direccion) db.commit() return {"mensaje": "Dirección agregada correctamente."} @app.get("/api/rutas", tags=["Rutas"]) def listar_rutas(db: Session = Depends(get_db)): return {"rutas": [_obtener_estado_ruta(route["route_id"], db) for route in ROUTE_DATA]} @app.post("/api/rutas/{route_id}/avanzar", response_model=RouteStatusResponse, tags=["Rutas"]) def avanzar_ruta(route_id: str, db: Session = Depends(get_db)): if route_id not in ROUTAS_POR_ID: raise HTTPException(status_code=404, detail="Ruta no encontrada.") ruta = ROUTAS_POR_ID[route_id] estado = ROUTE_STATE.get(route_id) posiciones = ruta.get("positions", []) if not estado or not posiciones: raise HTTPException(status_code=500, detail="Estado interno de la ruta no disponible.") actual = estado.get("last_position_id", 0) if actual < len(posiciones): siguiente = actual + 1 estado["last_position_id"] = siguiente estado["last_timestamp"] = datetime.now(timezone.utc) estado["gps_ok"] = True estado["gps_alert_sent"] = False mensaje_log = _procesar_trigger_posicion(route_id, siguiente, db) if siguiente >= len(posiciones): ruta["status"] = "COMPLETADO" else: ruta["status"] = "EN_RUTA" else: mensaje_log = [] if mensaje_log: logger.info(f"Notificaciones disparadas en {route_id}: {mensaje_log}") return RouteStatusResponse(**_obtener_estado_ruta(route_id, db)) @app.get("/api/rutas/{route_id}/estado", response_model=RouteStatusResponse, tags=["Rutas"]) def estado_ruta(route_id: str, db: Session = Depends(get_db)): try: estado = _obtener_estado_ruta(route_id, db) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) return RouteStatusResponse(**estado) @app.post("/api/rutas/{route_id}/posicion", tags=["Rutas"]) def actualizar_posicion_ruta(route_id: str, payload: RoutePositionUpdateRequest, db: Session = Depends(get_db)): if route_id not in ROUTAS_POR_ID: raise HTTPException(status_code=404, detail="Ruta no encontrada.") estado = ROUTE_STATE.get(route_id) if not estado: raise HTTPException(status_code=500, detail="Estado de ruta no inicializado.") try: timestamp = datetime.fromisoformat(payload.timestamp.replace("Z", "+00:00")) except ValueError: raise HTTPException(status_code=400, detail="Timestamp inválido. Usa formato ISO 8601 UTC.") mensaje_log = [] if payload.position_id > estado["last_position_id"]: mensaje_log.extend(_procesar_trigger_posicion(route_id, payload.position_id, db)) estado["last_position_id"] = payload.position_id estado["last_timestamp"] = timestamp estado["gps_ok"] = True estado["gps_alert_sent"] = False if payload.position_id == 8: ROUTAS_POR_ID[route_id]["status"] = "COMPLETADO" return { "route_id": route_id, "position_id": payload.position_id, "timestamp": payload.timestamp, "gps_ok": True, "mensajes": mensaje_log, } # =============================================================== # ENDPOINT 1: GET /api/eta/{usuario_id} # # Consulta el ETA de la ruta asignada al domicilio del usuario. # # PRIVACIDAD POR DISEÑO: # - El usuario_id es la única info que el cliente manda # - El route_id se resuelve internamente, NUNCA se devuelve # - El cliente ve el ETA y el mensaje, no la infraestructura # =============================================================== @app.get("/api/eta/{usuario_id}", response_model=ETAResponse, tags=["Core"]) def obtener_eta(usuario_id: int, db: Session = Depends(get_db)): """ Devuelve el tiempo estimado de llegada del camión para el usuario. Flujo: 1. Busca el usuario en la DB 2. Obtiene su domicilio (colonia + route_id interno) 3. Consulta el horario de esa ruta en los datos mockeados 4. Retorna ETA + mensaje preventivo SIN exponer el route_id """ # Paso 1: Verificar que el usuario existe usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException( status_code=404, detail=f"Usuario {usuario_id} no encontrado. ¿Corriste /api/seed?" ) # Paso 2: Verificar que tiene al menos una dirección registrada if not usuario.direcciones: raise HTTPException( status_code=404, detail=f"El usuario {usuario_id} no tiene direcciones registradas." ) direccion = db.query(Domicilio).filter(Domicilio.usuario_id == usuario_id).order_by(Domicilio.id.desc()).first() colonia = direccion.colonia route_id = direccion.route_id # Uso INTERNO, no se devuelve ruta = ROUTAS_POR_ID.get(route_id, {}) estado_ruta = _obtener_estado_ruta(route_id, db) # Paso 3: Calcular ETA usando el estado actual de la ruta calculo = _calcular_eta_por_ruta(route_id) return ETAResponse( usuario_id=usuario_id, colonia=colonia, ruta_nombre=ruta.get("name", "Ruta desconocida"), ruta_status=estado_ruta["status"], eta_texto=calculo["eta_texto"], eta_minutos=calculo["eta_minutos"], mensaje_preventivo=generar_mensaje_preventivo(calculo["eta_minutos"]), ) # =============================================================== # ENDPOINT 2: POST /api/simular-evento # # Simula que un camión generó un evento (ej. "llegando") y # dispara notificaciones push a todos los usuarios de esa ruta. # # En un sistema real, este endpoint sería llamado por el GPS # del camión o un sistema de despacho, no por el usuario. # =============================================================== @app.post("/api/simular-evento", response_model=SimularEventoResponse, tags=["Core"]) def simular_evento(payload: SimularEventoRequest, db: Session = Depends(get_db)): """ Recibe un evento de ruta y notifica a todos sus usuarios. Flujo: 1. Valida el tipo de evento y que la ruta exista 2. Busca todos los Domicilios asignados a esa ruta 3. Para cada domicilio -> obtiene el usuario -> envía push 4. Retorna un log de lo que pasó con cada usuario """ # Validación del tipo de evento if payload.tipo_evento not in TIPOS_EVENTO_VALIDOS: raise HTTPException( status_code=400, detail=f"tipo_evento inválido. Opciones: {TIPOS_EVENTO_VALIDOS}" ) # Validar que la ruta existe en nuestros datos if payload.route_id not in HORARIOS_POR_RUTA: raise HTTPException( status_code=404, detail=f"Ruta {payload.route_id} no encontrada. Rutas válidas: {list(HORARIOS_POR_RUTA.keys())}" ) # Buscar todos los domicilios asignados a esta ruta domicilios = db.query(Domicilio).filter(Domicilio.route_id == payload.route_id).all() if not domicilios: return SimularEventoResponse( usuarios_notificados=0, route_id=payload.route_id, tipo_evento=payload.tipo_evento, detalle=["No hay usuarios registrados en esta ruta."] ) # Construir el mensaje según el tipo de evento mensajes_por_evento = { "en_camino": ("🚛 Camión en camino", "El camión de recolección está en ruta hacia tu zona."), "llegando": ("⚠️ ¡El camión está cerca!", "Saca tu basura ahora, el camión llega en minutos."), "completado": ("✅ Recolección completada", "El camión ya pasó por tu zona. Nos vemos mañana."), "retrasado": ("🕐 Retraso en ruta", "El camión se ha retrasado. Te avisaremos cuando esté cerca."), } titulo, cuerpo = mensajes_por_evento[payload.tipo_evento] # Enviar notificación a cada usuario de la ruta detalle_log = [] usuarios_notificados = 0 for domicilio in domicilios: usuario = domicilio.usuario if not usuario: detalle_log.append(f"Domicilio ID {domicilio.id}: Sin usuario asociado (dato corrupto).") continue if not usuario.fcm_token: # Sin token no hay push. En producción: guardar en cola para reintentar detalle_log.append(f"Usuario '{usuario.nombre}' (ID {usuario.id}): Sin FCM token. Push omitido.") continue # Intentar enviar la notificación (real o simulada) exito = enviar_notificacion_firebase(usuario.fcm_token, titulo, cuerpo) if exito: usuarios_notificados += 1 detalle_log.append(f"✅ Push enviado a '{usuario.nombre}' (ID {usuario.id}) en {domicilio.colonia}.") else: detalle_log.append(f"❌ Fallo push para '{usuario.nombre}' (ID {usuario.id}).") logger.info(f"Evento '{payload.tipo_evento}' en {payload.route_id}: {usuarios_notificados}/{len(domicilios)} notificados.") return SimularEventoResponse( usuarios_notificados=usuarios_notificados, route_id=payload.route_id, tipo_evento=payload.tipo_evento, detalle=detalle_log ) # =============================================================== # ENDPOINT: GET /api/colonias # # Devuelve la lista de colonias disponibles para el Dropdown # del login en Flutter. Simple y directo. # =============================================================== @app.get("/api/colonias", tags=["Utilidades"]) def listar_colonias(): """ Lista todas las colonias disponibles. Flutter las usa para poblar el Dropdown del login screen. """ return {"colonias": [item["colonia"] for item in HORARIOS_POR_COLONIA]} # =============================================================== # ENDPOINT: PUT /api/usuarios/{usuario_id}/fcm-token # # Flutter llama este endpoint al iniciar la app para registrar # o actualizar el FCM token del dispositivo. # =============================================================== class ActualizarTokenRequest(BaseModel): fcm_token: str @app.put("/api/usuarios/{usuario_id}/fcm-token", tags=["Utilidades"]) def actualizar_fcm_token( usuario_id: int, payload: ActualizarTokenRequest, db: Session = Depends(get_db) ): """ Actualiza el FCM token de un usuario. Flutter llama esto cuando: - El usuario inicia sesión por primera vez - Firebase renueva el token del dispositivo (pasa periódicamente) """ usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException(status_code=404, detail="Usuario no encontrado.") usuario.fcm_token = payload.fcm_token db.commit() logger.info(f"FCM token actualizado para usuario {usuario_id}") return {"mensaje": f"Token actualizado para usuario {usuario_id}"} # --------------------------------------------------------------- # PUNTO DE ENTRADA PARA DESARROLLO DIRECTO # Corre con: python main.py (o preferiblemente: uvicorn main:app --reload) # --------------------------------------------------------------- if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)