Files

56 lines
1.9 KiB
Python

"""
Gestor de conexiones WebSocket.
Mantiene un registro de qué clientes están conectados y a qué zona pertenecen.
El simulador llama a broadcast_zona() para empujar eventos sin polling.
"""
import json
from collections import defaultdict
from fastapi import WebSocket
class WebSocketManager:
def __init__(self):
# zona_id -> lista de WebSockets activos
self._conexiones: dict[str, list[WebSocket]] = defaultdict(list)
async def conectar(self, websocket: WebSocket, zona_id: str) -> None:
await websocket.accept()
self._conexiones[zona_id].append(websocket)
def desconectar(self, websocket: WebSocket, zona_id: str) -> None:
conexiones = self._conexiones.get(zona_id, [])
if websocket in conexiones:
conexiones.remove(websocket)
async def broadcast_zona(self, zona_id: str, payload: dict) -> None:
"""Envía un mensaje a todos los clientes de una zona."""
mensaje = json.dumps(payload, ensure_ascii=False)
muertos: list[WebSocket] = []
for ws in self._conexiones.get(zona_id, []):
try:
await ws.send_text(mensaje)
except Exception:
muertos.append(ws)
for ws in muertos:
self.desconectar(ws, zona_id)
async def broadcast_ruta(self, ruta_id: str, payload: dict) -> None:
"""
Envía a TODAS las zonas de una ruta.
El filtro real de privacidad está en el backend (RBAC del endpoint REST).
Aquí simplemente distribuimos por zona registrada.
"""
for zona_id, conexiones in self._conexiones.items():
if conexiones:
await self.broadcast_zona(zona_id, payload)
def zonas_activas(self) -> list[str]:
return [z for z, ws in self._conexiones.items() if ws]
# Singleton global compartido por el simulador y el router de WebSocket
ws_manager = WebSocketManager()