Files
hackathon-acapulquitos-boys…/server/app/data/repositories/ruta_repository.py

189 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Capa de Datos — SQLite con soporte de caché
"""
from datetime import datetime
from typing import Optional
from app.db.database import get_connection
from app.domain.entities.ruta import (
Coordenada, EstadoCamion, ETAResult,
NotificationPreferences, PuntoRuta, Ruta, TruckStatus,
)
class SQLiteRutaRepository:
# ── Rutas y puntos ────────────────────────────────────────────────
def obtener_ruta(self, route_id: str) -> Optional[Ruta]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM rutas WHERE id = ?", (route_id,)
).fetchone()
if not row:
conn.close()
return None
puntos = [
PuntoRuta(
orden=p["orden"],
nombre=p["nombre"],
coordenada=Coordenada(p["lat"], p["lng"]),
tiempo_estimado_min=p["tiempo_estimado_min"],
)
for p in conn.execute(
"SELECT * FROM puntos_ruta WHERE ruta_id=? ORDER BY orden",
(route_id,),
).fetchall()
]
conn.close()
return Ruta(id=row["id"], nombre=row["nombre"],
puntos=puntos, turno=row["turno"])
def obtener_ruta_por_address(self, address_id: int) -> Optional[Ruta]:
conn = get_connection()
row = conn.execute(
"SELECT route_id FROM addresses WHERE id = ?", (address_id,)
).fetchone()
conn.close()
if not row:
return None
return self.obtener_ruta(row["route_id"])
# ── truck_status ──────────────────────────────────────────
def obtener_truck_status(self, route_id: str) -> Optional[TruckStatus]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM truck_status WHERE route_id = ?", (route_id,)
).fetchone()
conn.close()
if not row:
return None
return TruckStatus(
route_id=row["route_id"],
current_position_id=row["current_position_id"],
last_update=datetime.fromisoformat(row["last_update"]),
status=EstadoCamion(row["status"]),
)
def guardar_truck_status(self, ts: TruckStatus) -> None:
conn = get_connection()
conn.execute("""
INSERT INTO truck_status (route_id, current_position_id, last_update, status)
VALUES (?, ?, ?, ?)
ON CONFLICT(route_id) DO UPDATE SET
current_position_id = excluded.current_position_id,
last_update = excluded.last_update,
status = excluded.status
""", (
ts.route_id,
ts.current_position_id,
ts.last_update.isoformat(),
ts.status.value,
))
conn.commit()
conn.close()
# Invalidar caché de esta ruta
from app.core.cache import invalidate_route_cache
import asyncio
asyncio.create_task(invalidate_route_cache(ts.route_id))
# ── Preferencias de notificación ─────────────────────────────────
def obtener_preferencias(self, user_id: int) -> NotificationPreferences:
conn = get_connection()
row = conn.execute(
"SELECT * FROM notification_preferences WHERE user_id = ?",
(user_id,),
).fetchone()
conn.close()
if not row:
return NotificationPreferences(user_id=user_id)
return NotificationPreferences(
user_id=user_id,
notify_proximity=bool(row["notify_proximity"]),
notify_breakdown=bool(row["notify_breakdown"]),
notify_delay=bool(row["notify_delay"]),
notify_route_start=bool(row["notify_route_start"]),
)
def obtener_usuarios_por_ruta(self, route_id: str) -> list[dict]:
conn = get_connection()
rows = conn.execute(
"SELECT id as address_id, user_id FROM addresses WHERE route_id = ?",
(route_id,),
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Templates de notificación ─────────────────────────────────────
def obtener_template(self, trigger_event: str) -> Optional[dict]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM notification_templates WHERE trigger_event = ?",
(trigger_event,),
).fetchone()
conn.close()
return dict(row) if row else None
# ── ETA calculado con soporte de caché ────────────────────────────
def calcular_eta(self, address_id: int) -> Optional[ETAResult]:
ruta = self.obtener_ruta_por_address(address_id)
if not ruta:
return None
ts = self.obtener_truck_status(ruta.id)
if not ts:
return ETAResult(
address_id=address_id, route_id=ruta.id,
status="SIN_INICIAR", eta_minutos=None,
ventana_inicio=None, ventana_fin=None,
mensaje="El camión aún no ha iniciado su ruta.",
)
pos = ts.current_position_id
puntos = {p.orden: p for p in ruta.puntos}
ultimo = ruta.puntos[-1]
actual = puntos.get(pos, ruta.puntos[0])
eta_min = max(0, ultimo.tiempo_estimado_min - actual.tiempo_estimado_min)
from datetime import timedelta
ahora = datetime.now()
llegada = ahora + timedelta(minutes=eta_min)
v_ini = (llegada - timedelta(minutes=7)).strftime("%I:%M %p").lstrip("0")
v_fin = (llegada + timedelta(minutes=7)).strftime("%I:%M %p").lstrip("0")
if ts.status == EstadoCamion.APROXIMANDOSE:
msg = f"El camión llegará a tu zona entre las {v_ini} y {v_fin}."
elif ts.status in (EstadoCamion.AVERIADA, EstadoCamion.RETRASADA):
msg = "El camión reportó una incidencia. Te notificaremos cuando se reanude."
v_ini = v_fin = None
else:
msg = f"El camión está en camino. Llegada estimada: {v_ini} {v_fin}."
return ETAResult(
address_id=address_id,
route_id=ruta.id,
status=ts.status.value,
eta_minutos=eta_min,
ventana_inicio=v_ini,
ventana_fin=v_fin,
mensaje=msg,
)
def guardar_notificacion(self, tipo: str, route_id: str,
address_id: int, mensaje: str,
eta_minutos: Optional[int]) -> None:
conn = get_connection()
conn.execute("""
INSERT INTO notificaciones
(tipo, ruta_id, address_id, mensaje, eta_minutos, creada_en)
VALUES (?, ?, ?, ?, ?, ?)
""", (tipo, route_id, address_id, mensaje,
eta_minutos, datetime.utcnow().isoformat()))
conn.commit()
conn.close()