Files
hackathon-innovaflow5.0-cdf…/backend/app/services/simulation.py
shinra32 ba5e5ea12c Co-authored-by: MENDOZA BALLARDO GAEL RICARDO <gael-meb123@users.noreply.github.com>
Co-authored-by: Azareth-Tr <Azareth-Tr@users.noreply.github.com>
Co-authored-by: eddgranados12 <eddgranados12@users.noreply.github.com>

configuracion inicial para supoabase y endpoints
2026-05-22 17:06:17 -06:00

95 lines
2.6 KiB
Python

import json
import os
from typing import Dict, List, Optional
from app.services import notifications
ROOT = os.path.dirname(os.path.dirname(__file__)) # backend/app
DATA_DIR = os.path.join(ROOT, "data")
ROUTES: List[Dict] = []
NOTIFS: List[Dict] = []
COLONIAS: List[Dict] = []
ESTADO: Dict[str, int] = {}
STATUS: Dict[str, str] = {}
LAST_EVENTS: List[Dict] = []
def _load_json(filename: str):
path = os.path.join(DATA_DIR, filename)
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def load_data():
global ROUTES, NOTIFS, COLONIAS
ROUTES = _load_json("rutas.json")
NOTIFS = _load_json("notificaciones.json")
COLONIAS = _load_json("colonias-rutas.json")
def start_simulation_state():
"""Inicializa el estado (positionId) para cada ruta presente en `rutas.json`."""
global ESTADO, STATUS
ESTADO = {}
STATUS = {}
for r in ROUTES:
rid = r.get("routeId")
ESTADO[rid] = 1
STATUS[rid] = r.get("status", "PENDIENTE")
def get_colonias():
return COLONIAS
def get_route_position(routeId: str) -> Optional[int]:
return ESTADO.get(routeId)
def get_route_status(routeId: str) -> Optional[str]:
return STATUS.get(routeId)
def _find_notif(event_name: str) -> Optional[Dict]:
for n in NOTIFS:
if n.get("triggerEvent") == event_name:
return n
return None
def tick() -> List[Dict]:
"""Avanza todas las rutas en memoria (pos 1..8) y devuelve eventos disparados."""
global ESTADO, LAST_EVENTS
events = []
for route_id, pos in list(ESTADO.items()):
if pos < 8:
antes = pos
ahora = pos + 1
ESTADO[route_id] = ahora
evt = None
if antes == 1 and ahora == 2:
evt = "ROUTE_START"
elif ahora == 4:
evt = "TRUCK_PROXIMITY"
elif ahora == 8:
evt = "ROUTE_COMPLETED"
if evt:
notif = _find_notif(evt)
payload = notif.get("pushPayload") if notif else {"title": evt, "body": ""}
simulated = {"routeId": route_id, "event": evt, "payload": payload}
events.append(simulated)
LAST_EVENTS.append(simulated)
# Enviar push vía servicio de notificaciones (FCM) o mock
topic = f"topic_{route_id}"
try:
notifications.send_to_topic(topic, payload)
except Exception:
print(f"[SIM PUSH FAIL] {route_id} -> {evt}: {payload.get('title')} - {payload.get('body')}")
return events
def get_last_events() -> List[Dict]:
return LAST_EVENTS[-20:]