193 lines
6.4 KiB
Python
193 lines
6.4 KiB
Python
import json
|
|
import math
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
import pytz
|
|
|
|
CELAYA_TZ = pytz.timezone("America/Mexico_City")
|
|
_routes_cache: Optional[list] = None
|
|
|
|
|
|
def _load_routes() -> list:
|
|
global _routes_cache
|
|
if _routes_cache is None:
|
|
path = os.path.join(os.path.dirname(__file__), "../data/routes.json")
|
|
with open(path, encoding="utf-8") as f:
|
|
_routes_cache = json.load(f)
|
|
return _routes_cache
|
|
|
|
|
|
def _haversine(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
|
"""Distance in meters between two lat/lng points."""
|
|
R = 6_371_000
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lng2 - lng1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def assign_route(lat: float, lng: float) -> Optional[str]:
|
|
"""Return the route_id whose waypoints are closest to the given address."""
|
|
routes = _load_routes()
|
|
best_id = None
|
|
best_dist = float("inf")
|
|
for route in routes:
|
|
for pos in route["positions"]:
|
|
d = _haversine(lat, lng, pos["lat"], pos["lng"])
|
|
if d < best_dist:
|
|
best_dist = d
|
|
best_id = route["routeId"]
|
|
return best_id
|
|
|
|
|
|
def _parse_time_as_local(ts_str: str) -> datetime:
|
|
"""
|
|
The mock timestamps carry Z but represent local Celaya (CST/CDT) times.
|
|
We strip the Z and treat them as naive local time, then localize to Celaya TZ.
|
|
"""
|
|
naive = datetime.fromisoformat(ts_str.replace("Z", ""))
|
|
return CELAYA_TZ.localize(naive)
|
|
|
|
|
|
def _today_at(template: datetime) -> datetime:
|
|
"""Move a template datetime to today keeping hour/minute/second."""
|
|
now = datetime.now(CELAYA_TZ)
|
|
return now.replace(
|
|
hour=template.hour,
|
|
minute=template.minute,
|
|
second=template.second,
|
|
microsecond=0,
|
|
)
|
|
|
|
|
|
def get_eta(route_id: str, user_lat: float, user_lng: float) -> dict:
|
|
routes = _load_routes()
|
|
route = next((r for r in routes if r["routeId"] == route_id), None)
|
|
if not route:
|
|
return {
|
|
"status": "NO_SERVICIO",
|
|
"message": "No se encontró una ruta asignada a este domicilio.",
|
|
"passes_today": False,
|
|
"progress": 0,
|
|
}
|
|
|
|
positions = route["positions"]
|
|
now = datetime.now(CELAYA_TZ)
|
|
|
|
# Schedule for today
|
|
first_t = _today_at(_parse_time_as_local(positions[0]["timestamp"]))
|
|
last_t = _today_at(_parse_time_as_local(positions[-1]["timestamp"]))
|
|
|
|
# Find the waypoint whose scheduled arrival is closest to user_lat/lng
|
|
closest_idx = min(
|
|
range(len(positions)),
|
|
key=lambda i: _haversine(user_lat, user_lng, positions[i]["lat"], positions[i]["lng"]),
|
|
)
|
|
user_scheduled_t = _today_at(_parse_time_as_local(positions[closest_idx]["timestamp"]))
|
|
|
|
# Determine progress along the route (0-100)
|
|
total_seconds = (last_t - first_t).total_seconds()
|
|
elapsed_seconds = (now - first_t).total_seconds()
|
|
progress = max(0.0, min(100.0, (elapsed_seconds / total_seconds) * 100)) if total_seconds > 0 else 0.0
|
|
|
|
# Route hasn't started yet today
|
|
if now < first_t:
|
|
eta_min = int((user_scheduled_t - now).total_seconds() / 60)
|
|
w_start = user_scheduled_t - timedelta(minutes=8)
|
|
w_end = user_scheduled_t + timedelta(minutes=8)
|
|
return {
|
|
"status": "PROGRAMADO",
|
|
"message": f"El camión llegará a tu zona hoy",
|
|
"eta_minutes": eta_min,
|
|
"window_start": w_start.strftime("%H:%M"),
|
|
"window_end": w_end.strftime("%H:%M"),
|
|
"progress": 0.0,
|
|
"route_name": route["name"],
|
|
"passes_today": True,
|
|
}
|
|
|
|
# Route finished for the day
|
|
if now > last_t:
|
|
return {
|
|
"status": "PASO",
|
|
"message": "El camión ya pasó por tu zona hoy. ¡Hasta mañana!",
|
|
"progress": 100.0,
|
|
"route_name": route["name"],
|
|
"passes_today": True,
|
|
}
|
|
|
|
# Route in progress — simulate current waypoint index via interpolation
|
|
current_idx = 0
|
|
for i in range(len(positions) - 1):
|
|
t_a = _today_at(_parse_time_as_local(positions[i]["timestamp"]))
|
|
t_b = _today_at(_parse_time_as_local(positions[i + 1]["timestamp"]))
|
|
if t_a <= now <= t_b:
|
|
current_idx = i
|
|
break
|
|
|
|
if current_idx > closest_idx:
|
|
return {
|
|
"status": "PASO",
|
|
"message": "El camión ya pasó por tu zona hace un momento.",
|
|
"progress": progress,
|
|
"route_name": route["name"],
|
|
"passes_today": True,
|
|
}
|
|
|
|
delta = user_scheduled_t - now
|
|
eta_min = max(1, int(delta.total_seconds() / 60))
|
|
|
|
if eta_min <= 10:
|
|
return {
|
|
"status": "LLEGANDO",
|
|
"message": "¡El camión está llegando a tu zona! Prepara tu basura.",
|
|
"eta_minutes": eta_min,
|
|
"progress": progress,
|
|
"route_name": route["name"],
|
|
"passes_today": True,
|
|
}
|
|
|
|
w_start = (user_scheduled_t - timedelta(minutes=8)).strftime("%H:%M")
|
|
w_end = (user_scheduled_t + timedelta(minutes=8)).strftime("%H:%M")
|
|
return {
|
|
"status": "EN_CAMINO",
|
|
"message": f"El camión llegará a tu zona entre las {w_start} y {w_end}",
|
|
"eta_minutes": eta_min,
|
|
"window_start": w_start,
|
|
"window_end": w_end,
|
|
"progress": progress,
|
|
"route_name": route["name"],
|
|
"passes_today": True,
|
|
}
|
|
|
|
|
|
def get_route_schedule(route_id: str) -> Optional[dict]:
|
|
routes = _load_routes()
|
|
route = next((r for r in routes if r["routeId"] == route_id), None)
|
|
if not route:
|
|
return None
|
|
positions = route["positions"]
|
|
first_t = _parse_time_as_local(positions[0]["timestamp"])
|
|
last_t = _parse_time_as_local(positions[-1]["timestamp"])
|
|
return {
|
|
"route_id": route["routeId"],
|
|
"route_name": route["name"],
|
|
"days_of_week": ["Lunes", "Miércoles", "Viernes"],
|
|
"approximate_time": f"{first_t.strftime('%H:%M')} - {last_t.strftime('%H:%M')}",
|
|
"truck_id": route["truckId"],
|
|
}
|
|
|
|
|
|
def get_all_routes_summary() -> list:
|
|
return [
|
|
{
|
|
"route_id": r["routeId"],
|
|
"name": r["name"],
|
|
"truck_id": r["truckId"],
|
|
"status": r["status"],
|
|
}
|
|
for r in _load_routes()
|
|
]
|