Files
hackathon-v-escape-4ff8b5a6…/backend/app/services/eta_service.py
marianesaldana 80dbd947e5 Initial commit
2026-05-23 08:59:34 -06:00

193 lines
6.4 KiB
Python

import json
import math
import os
from datetime import datetime, timedelta
from typing import Optional
import pytz
CELAYA_TZ = pytz.timezone("America/Mexico_City")
_routes_cache: Optional[list] = None
def _load_routes() -> list:
global _routes_cache
if _routes_cache is None:
path = os.path.join(os.path.dirname(__file__), "../data/routes.json")
with open(path, encoding="utf-8") as f:
_routes_cache = json.load(f)
return _routes_cache
def _haversine(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
"""Distance in meters between two lat/lng points."""
R = 6_371_000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lng2 - lng1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def assign_route(lat: float, lng: float) -> Optional[str]:
"""Return the route_id whose waypoints are closest to the given address."""
routes = _load_routes()
best_id = None
best_dist = float("inf")
for route in routes:
for pos in route["positions"]:
d = _haversine(lat, lng, pos["lat"], pos["lng"])
if d < best_dist:
best_dist = d
best_id = route["routeId"]
return best_id
def _parse_time_as_local(ts_str: str) -> datetime:
"""
The mock timestamps carry Z but represent local Celaya (CST/CDT) times.
We strip the Z and treat them as naive local time, then localize to Celaya TZ.
"""
naive = datetime.fromisoformat(ts_str.replace("Z", ""))
return CELAYA_TZ.localize(naive)
def _today_at(template: datetime) -> datetime:
"""Move a template datetime to today keeping hour/minute/second."""
now = datetime.now(CELAYA_TZ)
return now.replace(
hour=template.hour,
minute=template.minute,
second=template.second,
microsecond=0,
)
def get_eta(route_id: str, user_lat: float, user_lng: float) -> dict:
routes = _load_routes()
route = next((r for r in routes if r["routeId"] == route_id), None)
if not route:
return {
"status": "NO_SERVICIO",
"message": "No se encontró una ruta asignada a este domicilio.",
"passes_today": False,
"progress": 0,
}
positions = route["positions"]
now = datetime.now(CELAYA_TZ)
# Schedule for today
first_t = _today_at(_parse_time_as_local(positions[0]["timestamp"]))
last_t = _today_at(_parse_time_as_local(positions[-1]["timestamp"]))
# Find the waypoint whose scheduled arrival is closest to user_lat/lng
closest_idx = min(
range(len(positions)),
key=lambda i: _haversine(user_lat, user_lng, positions[i]["lat"], positions[i]["lng"]),
)
user_scheduled_t = _today_at(_parse_time_as_local(positions[closest_idx]["timestamp"]))
# Determine progress along the route (0-100)
total_seconds = (last_t - first_t).total_seconds()
elapsed_seconds = (now - first_t).total_seconds()
progress = max(0.0, min(100.0, (elapsed_seconds / total_seconds) * 100)) if total_seconds > 0 else 0.0
# Route hasn't started yet today
if now < first_t:
eta_min = int((user_scheduled_t - now).total_seconds() / 60)
w_start = user_scheduled_t - timedelta(minutes=8)
w_end = user_scheduled_t + timedelta(minutes=8)
return {
"status": "PROGRAMADO",
"message": f"El camión llegará a tu zona hoy",
"eta_minutes": eta_min,
"window_start": w_start.strftime("%H:%M"),
"window_end": w_end.strftime("%H:%M"),
"progress": 0.0,
"route_name": route["name"],
"passes_today": True,
}
# Route finished for the day
if now > last_t:
return {
"status": "PASO",
"message": "El camión ya pasó por tu zona hoy. ¡Hasta mañana!",
"progress": 100.0,
"route_name": route["name"],
"passes_today": True,
}
# Route in progress — simulate current waypoint index via interpolation
current_idx = 0
for i in range(len(positions) - 1):
t_a = _today_at(_parse_time_as_local(positions[i]["timestamp"]))
t_b = _today_at(_parse_time_as_local(positions[i + 1]["timestamp"]))
if t_a <= now <= t_b:
current_idx = i
break
if current_idx > closest_idx:
return {
"status": "PASO",
"message": "El camión ya pasó por tu zona hace un momento.",
"progress": progress,
"route_name": route["name"],
"passes_today": True,
}
delta = user_scheduled_t - now
eta_min = max(1, int(delta.total_seconds() / 60))
if eta_min <= 10:
return {
"status": "LLEGANDO",
"message": "¡El camión está llegando a tu zona! Prepara tu basura.",
"eta_minutes": eta_min,
"progress": progress,
"route_name": route["name"],
"passes_today": True,
}
w_start = (user_scheduled_t - timedelta(minutes=8)).strftime("%H:%M")
w_end = (user_scheduled_t + timedelta(minutes=8)).strftime("%H:%M")
return {
"status": "EN_CAMINO",
"message": f"El camión llegará a tu zona entre las {w_start} y {w_end}",
"eta_minutes": eta_min,
"window_start": w_start,
"window_end": w_end,
"progress": progress,
"route_name": route["name"],
"passes_today": True,
}
def get_route_schedule(route_id: str) -> Optional[dict]:
routes = _load_routes()
route = next((r for r in routes if r["routeId"] == route_id), None)
if not route:
return None
positions = route["positions"]
first_t = _parse_time_as_local(positions[0]["timestamp"])
last_t = _parse_time_as_local(positions[-1]["timestamp"])
return {
"route_id": route["routeId"],
"route_name": route["name"],
"days_of_week": ["Lunes", "Miércoles", "Viernes"],
"approximate_time": f"{first_t.strftime('%H:%M')} - {last_t.strftime('%H:%M')}",
"truck_id": route["truckId"],
}
def get_all_routes_summary() -> list:
return [
{
"route_id": r["routeId"],
"name": r["name"],
"truck_id": r["truckId"],
"status": r["status"],
}
for r in _load_routes()
]