Files
hackathon-biocode-17bf223ba…/notification-service/main.py
2026-05-22 18:05:33 -06:00

276 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os, json, time, threading, schedule
from datetime import datetime
import firebase_admin
from firebase_admin import credentials, messaging
from fastapi import FastAPI, Header, HTTPException
from dotenv import load_dotenv
load_dotenv()
# ── Firebase init ──────────────────────────────────────────
cred = credentials.Certificate(os.getenv("FIREBASE_CREDENTIALS"))
firebase_admin.initialize_app(cred)
app = FastAPI(title="Notification Service")
# ── Carga los JSON del municipio ───────────────────────────
with open("../JSON/rutas.json") as f: RUTAS = {r["routeId"]: r for r in json.load(f)}
with open("../JSON/colonias.json") as f: COLONIAS = json.load(f)
with open("../JSON/notificaciones.json") as f: NOTIF_TEMPLATES = json.load(f)
# ── Estado en memoria (Redis en producción) ─────────────────
# { routeId: { positionId, notified: set() } }
route_state: dict = {}
# Simulación: tokens FCM de usuarios por colonia
# En producción esto viene de la BD de P1
fake_users = {
"Zona Centro": ["TOKEN_TEST_1"],
"Las Arboledas": ["TOKEN_TEST_1"],
"Trojes": ["TOKEN_TEST_2"],
"San Juanico": [],
"Los Olivos": [],
"Rancho Seco": [],
"Las Insurgentes":[],
}
# ── Helpers ─────────────────────────────────────────────────
def get_colonias_for_route(route_id: str) -> list:
return [c["colonia"] for c in COLONIAS if c["routeId"] == route_id]
def get_tokens_for_route(route_id: str) -> list:
tokens = []
for colonia in get_colonias_for_route(route_id):
tokens += fake_users.get(colonia, [])
return tokens
def get_template(trigger_event: str) -> dict:
for t in NOTIF_TEMPLATES:
if t["triggerEvent"] == trigger_event:
return t["pushPayload"]
return None
def send_push(tokens: list, title: str, body: str, data: dict = {}):
"""Envía push a una lista de FCM tokens."""
if not tokens:
print(f" [FCM] Sin tokens para notificar")
return
for token in tokens:
try:
message = messaging.Message(
notification=messaging.Notification(title=title, body=body),
data={k: str(v) for k, v in data.items()},
android=messaging.AndroidConfig(
priority="high",
notification=messaging.AndroidNotification(
channel_id="truck_alerts",
sound="default"
)
),
token=token,
)
messaging.send(message)
print(f" [FCM] ✓ Enviado a token ...{token[-6:]}")
except Exception as e:
print(f" [FCM] ✗ Error con token {token}: {e}")
# ── Lógica de ETA ───────────────────────────────────────────
def calculate_eta(route_id: str, current_position_id: int) -> dict:
"""
Devuelve ventana de llegada basada en timestamps del JSON.
positionId 4 = punto de proximidad (~15 min del domicilio)
"""
ruta = RUTAS.get(route_id)
if not ruta:
return {"eta": "No disponible"}
positions = ruta["positions"]
current = next((p for p in positions if p["positionId"] == current_position_id), None)
next_pos = next((p for p in positions if p["positionId"] == current_position_id + 1), None)
if not current:
return {"eta": "No disponible"}
# Convierte timestamps a hora local (ajusta timezone si es necesario)
fmt = "%Y-%m-%dT%H:%M:%SZ"
t_current = datetime.strptime(current["timestamp"], fmt)
if next_pos:
t_next = datetime.strptime(next_pos["timestamp"], fmt)
# Ventana: desde ahora hasta el siguiente punto + 15% buffer
delta = (t_next - t_current).seconds
buffer = int(delta * 0.15)
eta_min = t_current.strftime("%H:%M")
from datetime import timedelta
eta_max = (t_next + timedelta(seconds=buffer)).strftime("%H:%M")
return {
"eta_window": f"{eta_min} {eta_max}",
"message": f"El camión llegará a tu zona entre las {eta_min} y {eta_max}",
"minutes_approx": delta // 60
}
else:
return {
"eta_window": t_current.strftime("%H:%M"),
"message": f"El camión llega aproximadamente a las {t_current.strftime('%H:%M')}",
"minutes_approx": 0
}
# ── Procesador de triggers ───────────────────────────────────
def process_position_update(route_id: str, position_id: int):
"""
Llamado por el simulador (P1) o por el cron job.
Decide si dispara una notificación según positionId.
"""
state = route_state.setdefault(route_id, {"positionId": 1, "notified": set()})
state["positionId"] = position_id
trigger = None
if position_id == 2: trigger = "ROUTE_START"
elif position_id == 4: trigger = "TRUCK_PROXIMITY"
elif position_id == 8: trigger = "ROUTE_COMPLETED"
if trigger and trigger not in state["notified"]:
template = get_template(trigger)
tokens = get_tokens_for_route(route_id)
eta_info = calculate_eta(route_id, position_id)
print(f"\n[TRIGGER] {route_id}{trigger} | {len(tokens)} usuarios")
send_push(
tokens = tokens,
title = template["title"],
body = template["body"],
data = {"routeId": route_id, "trigger": trigger, **eta_info}
)
state["notified"].add(trigger)
# ── API Endpoints ────────────────────────────────────────────
@app.post("/internal/position-update")
def position_update(payload: dict):
"""
P1 llama este endpoint cuando el simulador avanza una posición.
Body: { "routeId": "RUTA-01", "positionId": 2 }
"""
route_id = payload.get("routeId")
position_id = payload.get("positionId")
if not route_id or not position_id:
raise HTTPException(400, "routeId y positionId son requeridos")
if route_id not in RUTAS:
raise HTTPException(404, f"Ruta {route_id} no encontrada")
process_position_update(route_id, position_id)
return {"status": "ok", "routeId": route_id, "positionId": position_id}
@app.get("/eta/{route_id}")
def get_eta(route_id: str):
"""
La app Android consulta esto para mostrar la ventana de llegada.
Requiere JWT (P1 valida en el gateway, aquí solo calculamos).
"""
state = route_state.get(route_id)
if not state:
return {"message": "Ruta aún no iniciada", "eta_window": None}
eta = calculate_eta(route_id, state["positionId"])
return {
"routeId": route_id,
"positionId": state["positionId"],
**eta
}
@app.post("/fcm-token")
def register_fcm_token(payload: dict):
"""
La app Android registra su token FCM al iniciar sesión.
Body: { "colonia": "Zona Centro", "token": "FCM_TOKEN_REAL" }
"""
colonia = payload.get("colonia")
token = payload.get("token")
if not colonia or not token:
raise HTTPException(400, "colonia y token requeridos")
if colonia not in fake_users:
fake_users[colonia] = []
if token not in fake_users[colonia]:
fake_users[colonia].append(token)
print(f"[TOKEN] Registrado token para {colonia}")
return {"status": "ok"}
@app.get("/health")
def health():
return {"status": "ok", "routes_active": list(route_state.keys())}
# ── Cron job propio (para demo sin depender de P1) ──────────
def _simulate_all_routes():
"""Avanza cada ruta respetando su horario del JSON."""
now_utc = datetime.utcnow()
print(f"\n[CRON] Tick — {now_utc.strftime('%H:%M:%S')} UTC")
for route_id, ruta in RUTAS.items():
state = route_state.setdefault(route_id, {"positionId": 1, "notified": set()})
current_id = state["positionId"]
if current_id >= 8:
continue # esta ruta ya terminó
# Busca la siguiente posición
positions = ruta["positions"]
next_pos = next((p for p in positions if p["positionId"] == current_id + 1), None)
if not next_pos:
continue
# Solo avanza si ya pasó el timestamp de esa posición
fmt = "%Y-%m-%dT%H:%M:%SZ"
next_time = datetime.strptime(next_pos["timestamp"], fmt)
if now_utc >= next_time:
print(f" {route_id}: positionId {current_id}{current_id + 1}")
process_position_update(route_id, current_id + 1)
# si no, espera silenciosamente
def start_cron():
"""Corre el simulador cada 2 minutos en un hilo separado."""
schedule.every(2).minutes.do(_simulate_all_routes)
while True:
schedule.run_pending()
time.sleep(10)
@app.on_event("startup")
def startup():
# Inicializa todas las rutas en positionId 1
for route_id in RUTAS:
route_state[route_id] = {"positionId": 1, "notified": set()}
# Arranca el cron en background
t = threading.Thread(target=start_cron, daemon=True)
t.start()
print(f"[STARTUP] Notification Service listo. {len(RUTAS)} rutas cargadas.")
@app.post("/internal/reset")
def reset_routes():
"""Reinicia todas las rutas a positionId 1. Útil para el demo."""
for route_id in RUTAS:
route_state[route_id] = {"positionId": 1, "notified": set()}
print("[RESET] Todas las rutas reiniciadas a positionId 1")
return {"status": "ok", "message": f"{len(RUTAS)} rutas reiniciadas"}
@app.post("/internal/demo")
def demo_trigger(payload: dict):
"""
Fuerza una ruta a un positionId específico al instante.
Ideal para demos en vivo.
Body: { "routeId": "RUTA-01", "positionId": 4 }
"""
route_id = payload.get("routeId", "RUTA-01")
position_id = payload.get("positionId", 4)
# Limpia notificaciones previas para que dispare de nuevo
state = route_state.setdefault(route_id, {"positionId": 1, "notified": set()})
state["notified"].discard("ROUTE_START")
state["notified"].discard("TRUCK_PROXIMITY")
state["notified"].discard("ROUTE_COMPLETED")
process_position_update(route_id, position_id)
return {"status": "ok", "routeId": route_id, "positionId": position_id}