feat: migrate backend from SQLite to Supabase PostgreSQL

This commit is contained in:
Alan Alonso
2026-05-23 00:55:00 -06:00
parent 327852e468
commit ff90f3eefc
4 changed files with 192 additions and 235 deletions

View File

@@ -1,134 +1,138 @@
"""
Capa de Datos — SQLite con soporte de caché
Capa de Datos — Supabase PostgreSQL con caché
"""
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional
from app.db.database import get_connection
from app.db.database import get_db
from app.domain.entities.ruta import (
Coordenada, EstadoCamion, ETAResult,
NotificationPreferences, PuntoRuta, Ruta, TruckStatus,
)
class SQLiteRutaRepository:
class SupabaseRutaRepository:
def __init__(self):
self.db = get_db()
# ── Rutas y puntos ────────────────────────────────────────────────
def obtener_ruta(self, route_id: str) -> Optional[Ruta]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM rutas WHERE id = ?", (route_id,)
).fetchone()
if not row:
conn.close()
return None
puntos = [
PuntoRuta(
orden=p["orden"],
nombre=p["nombre"],
coordenada=Coordenada(p["lat"], p["lng"]),
tiempo_estimado_min=p["tiempo_estimado_min"],
try:
ruta_data = self.db.table("rutas").select("*").eq("id", route_id).execute()
if not ruta_data.data:
return None
ruta_row = ruta_data.data[0]
puntos_data = self.db.table("puntos_ruta").select("*").eq("ruta_id", route_id).order("orden").execute()
puntos = [
PuntoRuta(
orden=p["orden"],
nombre=p["nombre"],
coordenada=Coordenada(p["lat"], p["lng"]),
tiempo_estimado_min=p["tiempo_estimado_min"],
)
for p in puntos_data.data
]
return Ruta(
id=ruta_row["id"],
nombre=ruta_row["nombre"],
puntos=puntos,
turno=ruta_row["turno"]
)
for p in conn.execute(
"SELECT * FROM puntos_ruta WHERE ruta_id=? ORDER BY orden",
(route_id,),
).fetchall()
]
conn.close()
return Ruta(id=row["id"], nombre=row["nombre"],
puntos=puntos, turno=row["turno"])
except Exception as e:
print(f"Error obtener_ruta: {e}")
return None
def obtener_ruta_por_address(self, address_id: int) -> Optional[Ruta]:
conn = get_connection()
row = conn.execute(
"SELECT route_id FROM addresses WHERE id = ?", (address_id,)
).fetchone()
conn.close()
if not row:
try:
addr_data = self.db.table("addresses").select("route_id").eq("id", address_id).execute()
if not addr_data.data:
return None
return self.obtener_ruta(addr_data.data[0]["route_id"])
except Exception as e:
print(f"Error obtener_ruta_por_address: {e}")
return None
return self.obtener_ruta(row["route_id"])
# ── truck_status ──────────────────────────────────────────
def obtener_truck_status(self, route_id: str) -> Optional[TruckStatus]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM truck_status WHERE route_id = ?", (route_id,)
).fetchone()
conn.close()
if not row:
try:
ts_data = self.db.table("truck_status").select("*").eq("route_id", route_id).execute()
if not ts_data.data:
return None
row = ts_data.data[0]
return TruckStatus(
route_id=row["route_id"],
current_position_id=row["current_position_id"],
last_update=datetime.fromisoformat(row["last_update"].replace("Z", "+00:00")),
status=EstadoCamion(row["status"]),
)
except Exception as e:
print(f"Error obtener_truck_status: {e}")
return None
return TruckStatus(
route_id=row["route_id"],
current_position_id=row["current_position_id"],
last_update=datetime.fromisoformat(row["last_update"]),
status=EstadoCamion(row["status"]),
)
def guardar_truck_status(self, ts: TruckStatus) -> None:
conn = get_connection()
conn.execute("""
INSERT INTO truck_status (route_id, current_position_id, last_update, status)
VALUES (?, ?, ?, ?)
ON CONFLICT(route_id) DO UPDATE SET
current_position_id = excluded.current_position_id,
last_update = excluded.last_update,
status = excluded.status
""", (
ts.route_id,
ts.current_position_id,
ts.last_update.isoformat(),
ts.status.value,
))
conn.commit()
conn.close()
# Invalidar caché de esta ruta
from app.core.cache import invalidate_route_cache
import asyncio
asyncio.create_task(invalidate_route_cache(ts.route_id))
try:
self.db.table("truck_status").upsert({
"route_id": ts.route_id,
"current_position_id": ts.current_position_id,
"last_update": ts.last_update.isoformat(),
"status": ts.status.value,
}).execute()
# Invalidar caché
from app.core.cache import invalidate_route_cache
import asyncio
asyncio.create_task(invalidate_route_cache(ts.route_id))
except Exception as e:
print(f"Error guardar_truck_status: {e}")
# ── Preferencias de notificación ─────────────────────────────────
def obtener_preferencias(self, user_id: int) -> NotificationPreferences:
conn = get_connection()
row = conn.execute(
"SELECT * FROM notification_preferences WHERE user_id = ?",
(user_id,),
).fetchone()
conn.close()
if not row:
def obtener_preferencias(self, user_id: str) -> NotificationPreferences:
try:
prefs_data = self.db.table("notification_preferences").select("*").eq("user_id", user_id).execute()
if not prefs_data.data:
return NotificationPreferences(user_id=user_id)
row = prefs_data.data[0]
return NotificationPreferences(
user_id=user_id,
notify_proximity=row["notify_proximity"],
notify_breakdown=row["notify_breakdown"],
notify_delay=row["notify_delay"],
notify_route_start=row["notify_route_start"],
)
except Exception as e:
print(f"Error obtener_preferencias: {e}")
return NotificationPreferences(user_id=user_id)
return NotificationPreferences(
user_id=user_id,
notify_proximity=bool(row["notify_proximity"]),
notify_breakdown=bool(row["notify_breakdown"]),
notify_delay=bool(row["notify_delay"]),
notify_route_start=bool(row["notify_route_start"]),
)
def obtener_usuarios_por_ruta(self, route_id: str) -> list[dict]:
conn = get_connection()
rows = conn.execute(
"SELECT id as address_id, user_id FROM addresses WHERE route_id = ?",
(route_id,),
).fetchall()
conn.close()
return [dict(r) for r in rows]
try:
users_data = self.db.table("addresses").select("id, user_id").eq("route_id", route_id).execute()
return [{"address_id": u["id"], "user_id": u["user_id"]} for u in users_data.data]
except Exception as e:
print(f"Error obtener_usuarios_por_ruta: {e}")
return []
# ── Templates de notificación ─────────────────────────────────────
def obtener_template(self, trigger_event: str) -> Optional[dict]:
conn = get_connection()
row = conn.execute(
"SELECT * FROM notification_templates WHERE trigger_event = ?",
(trigger_event,),
).fetchone()
conn.close()
return dict(row) if row else None
try:
template_data = self.db.table("notification_templates").select("*").eq("trigger_event", trigger_event).execute()
if not template_data.data:
return None
return template_data.data[0]
except Exception as e:
print(f"Error obtener_template: {e}")
return None
# ── ETA calculado con soporte de caché ────────────────────────────
# ── ETA calculado ────────────────────────────────────────────────
def calcular_eta(self, address_id: int) -> Optional[ETAResult]:
ruta = self.obtener_ruta_por_address(address_id)
@@ -151,7 +155,6 @@ class SQLiteRutaRepository:
eta_min = max(0, ultimo.tiempo_estimado_min - actual.tiempo_estimado_min)
from datetime import timedelta
ahora = datetime.now()
llegada = ahora + timedelta(minutes=eta_min)
v_ini = (llegada - timedelta(minutes=7)).strftime("%I:%M %p").lstrip("0")
@@ -178,12 +181,18 @@ class SQLiteRutaRepository:
def guardar_notificacion(self, tipo: str, route_id: str,
address_id: int, mensaje: str,
eta_minutos: Optional[int]) -> None:
conn = get_connection()
conn.execute("""
INSERT INTO notificaciones
(tipo, ruta_id, address_id, mensaje, eta_minutos, creada_en)
VALUES (?, ?, ?, ?, ?, ?)
""", (tipo, route_id, address_id, mensaje,
eta_minutos, datetime.utcnow().isoformat()))
conn.commit()
conn.close()
try:
self.db.table("notificaciones").insert({
"tipo": tipo,
"ruta_id": route_id,
"address_id": address_id,
"mensaje": mensaje,
"eta_minutos": eta_minutos,
"creada_en": datetime.utcnow().isoformat(),
}).execute()
except Exception as e:
print(f"Error guardar_notificacion: {e}")
# Alias para compatibilidad
SQLiteRutaRepository = SupabaseRutaRepository