import os, json, time, threading, schedule from datetime import datetime import firebase_admin from firebase_admin import credentials, messaging from fastapi import FastAPI, Header, HTTPException from dotenv import load_dotenv load_dotenv() # ── Firebase init ────────────────────────────────────────── cred = credentials.Certificate(os.getenv("FIREBASE_CREDENTIALS")) firebase_admin.initialize_app(cred) app = FastAPI(title="Notification Service") # ── Carga los JSON del municipio ─────────────────────────── with open("../JSON/rutas.json") as f: RUTAS = {r["routeId"]: r for r in json.load(f)} with open("../JSON/colonias.json") as f: COLONIAS = json.load(f) with open("../JSON/notificaciones.json") as f: NOTIF_TEMPLATES = json.load(f) # ── Estado en memoria (Redis en producción) ───────────────── # { routeId: { positionId, notified: set() } } route_state: dict = {} # Simulación: tokens FCM de usuarios por colonia # En producción esto viene de la BD de P1 fake_users = { "Zona Centro": ["TOKEN_TEST_1"], "Las Arboledas": ["TOKEN_TEST_1"], "Trojes": ["TOKEN_TEST_2"], "San Juanico": [], "Los Olivos": [], "Rancho Seco": [], "Las Insurgentes":[], } # ── Helpers ───────────────────────────────────────────────── def get_colonias_for_route(route_id: str) -> list: return [c["colonia"] for c in COLONIAS if c["routeId"] == route_id] def get_tokens_for_route(route_id: str) -> list: tokens = [] for colonia in get_colonias_for_route(route_id): tokens += fake_users.get(colonia, []) return tokens def get_template(trigger_event: str) -> dict: for t in NOTIF_TEMPLATES: if t["triggerEvent"] == trigger_event: return t["pushPayload"] return None def send_push(tokens: list, title: str, body: str, data: dict = {}): """Envía push a una lista de FCM tokens.""" if not tokens: print(f" [FCM] Sin tokens para notificar") return for token in tokens: try: message = messaging.Message( notification=messaging.Notification(title=title, body=body), data={k: str(v) for k, v in data.items()}, android=messaging.AndroidConfig( priority="high", notification=messaging.AndroidNotification( channel_id="truck_alerts", sound="default" ) ), token=token, ) messaging.send(message) print(f" [FCM] ✓ Enviado a token ...{token[-6:]}") except Exception as e: print(f" [FCM] ✗ Error con token {token}: {e}") # ── Lógica de ETA ─────────────────────────────────────────── def calculate_eta(route_id: str, current_position_id: int) -> dict: """ Devuelve ventana de llegada basada en timestamps del JSON. positionId 4 = punto de proximidad (~15 min del domicilio) """ ruta = RUTAS.get(route_id) if not ruta: return {"eta": "No disponible"} positions = ruta["positions"] current = next((p for p in positions if p["positionId"] == current_position_id), None) next_pos = next((p for p in positions if p["positionId"] == current_position_id + 1), None) if not current: return {"eta": "No disponible"} # Convierte timestamps a hora local (ajusta timezone si es necesario) fmt = "%Y-%m-%dT%H:%M:%SZ" t_current = datetime.strptime(current["timestamp"], fmt) if next_pos: t_next = datetime.strptime(next_pos["timestamp"], fmt) # Ventana: desde ahora hasta el siguiente punto + 15% buffer delta = (t_next - t_current).seconds buffer = int(delta * 0.15) eta_min = t_current.strftime("%H:%M") from datetime import timedelta eta_max = (t_next + timedelta(seconds=buffer)).strftime("%H:%M") return { "eta_window": f"{eta_min} – {eta_max}", "message": f"El camión llegará a tu zona entre las {eta_min} y {eta_max}", "minutes_approx": delta // 60 } else: return { "eta_window": t_current.strftime("%H:%M"), "message": f"El camión llega aproximadamente a las {t_current.strftime('%H:%M')}", "minutes_approx": 0 } # ── Procesador de triggers ─────────────────────────────────── def process_position_update(route_id: str, position_id: int): """ Llamado por el simulador (P1) o por el cron job. Decide si dispara una notificación según positionId. """ state = route_state.setdefault(route_id, {"positionId": 1, "notified": set()}) state["positionId"] = position_id trigger = None if position_id == 2: trigger = "ROUTE_START" elif position_id == 4: trigger = "TRUCK_PROXIMITY" elif position_id == 8: trigger = "ROUTE_COMPLETED" if trigger and trigger not in state["notified"]: template = get_template(trigger) tokens = get_tokens_for_route(route_id) eta_info = calculate_eta(route_id, position_id) print(f"\n[TRIGGER] {route_id} → {trigger} | {len(tokens)} usuarios") send_push( tokens = tokens, title = template["title"], body = template["body"], data = {"routeId": route_id, "trigger": trigger, **eta_info} ) state["notified"].add(trigger) # ── API Endpoints ──────────────────────────────────────────── @app.post("/internal/position-update") def position_update(payload: dict): """ P1 llama este endpoint cuando el simulador avanza una posición. Body: { "routeId": "RUTA-01", "positionId": 2 } """ route_id = payload.get("routeId") position_id = payload.get("positionId") if not route_id or not position_id: raise HTTPException(400, "routeId y positionId son requeridos") if route_id not in RUTAS: raise HTTPException(404, f"Ruta {route_id} no encontrada") process_position_update(route_id, position_id) return {"status": "ok", "routeId": route_id, "positionId": position_id} @app.get("/eta/{route_id}") def get_eta(route_id: str): """ La app Android consulta esto para mostrar la ventana de llegada. Requiere JWT (P1 valida en el gateway, aquí solo calculamos). """ state = route_state.get(route_id) if not state: return {"message": "Ruta aún no iniciada", "eta_window": None} eta = calculate_eta(route_id, state["positionId"]) return { "routeId": route_id, "positionId": state["positionId"], **eta } @app.post("/fcm-token") def register_fcm_token(payload: dict): """ La app Android registra su token FCM al iniciar sesión. Body: { "colonia": "Zona Centro", "token": "FCM_TOKEN_REAL" } """ colonia = payload.get("colonia") token = payload.get("token") if not colonia or not token: raise HTTPException(400, "colonia y token requeridos") if colonia not in fake_users: fake_users[colonia] = [] if token not in fake_users[colonia]: fake_users[colonia].append(token) print(f"[TOKEN] Registrado token para {colonia}") return {"status": "ok"} @app.get("/health") def health(): return {"status": "ok", "routes_active": list(route_state.keys())} # ── Cron job propio (para demo sin depender de P1) ────────── def _simulate_all_routes(): """Avanza cada ruta respetando su horario del JSON.""" now_utc = datetime.utcnow() print(f"\n[CRON] Tick — {now_utc.strftime('%H:%M:%S')} UTC") for route_id, ruta in RUTAS.items(): state = route_state.setdefault(route_id, {"positionId": 1, "notified": set()}) current_id = state["positionId"] if current_id >= 8: continue # esta ruta ya terminó # Busca la siguiente posición positions = ruta["positions"] next_pos = next((p for p in positions if p["positionId"] == current_id + 1), None) if not next_pos: continue # Solo avanza si ya pasó el timestamp de esa posición fmt = "%Y-%m-%dT%H:%M:%SZ" next_time = datetime.strptime(next_pos["timestamp"], fmt) if now_utc >= next_time: print(f" {route_id}: positionId {current_id} → {current_id + 1}") process_position_update(route_id, current_id + 1) # si no, espera silenciosamente def start_cron(): """Corre el simulador cada 2 minutos en un hilo separado.""" schedule.every(2).minutes.do(_simulate_all_routes) while True: schedule.run_pending() time.sleep(10) @app.on_event("startup") def startup(): # Inicializa todas las rutas en positionId 1 for route_id in RUTAS: route_state[route_id] = {"positionId": 1, "notified": set()} # Arranca el cron en background t = threading.Thread(target=start_cron, daemon=True) t.start() print(f"[STARTUP] Notification Service listo. {len(RUTAS)} rutas cargadas.")