fix: add project contents

This commit is contained in:
Diego Torres
2026-05-23 10:19:24 -06:00
parent c49179bc9c
commit cf4321a690
145 changed files with 12545 additions and 1 deletions

View File

@@ -0,0 +1,398 @@
"""
===============================================================
analytics.py — Motor de Reportes y Análisis Predictivo
===============================================================
Módulo independiente que se importa en main.py.
FUNCIONALIDADES:
1. Simulación de registros históricos de recolección
(en producción vendría de una tabla real en DB)
2. Detección de días con mayor volumen de residuos
3. Identificación de zonas críticas por colonia
4. Análisis predictivo simple con regresión lineal
usando numpy (sin sklearn, más ligero)
5. Recomendaciones automáticas de logística
INSTALAR:
pip install numpy
USO EN main.py:
from analytics import generar_reporte_completo, predecir_proxima_semana
===============================================================
"""
import numpy as np
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any
import hashlib
# ---------------------------------------------------------------
# SEMILLA DETERMINISTA
# Los datos simulados son siempre los mismos para el mismo día,
# lo que da consistencia en demos sin necesidad de una DB real.
# ---------------------------------------------------------------
def _semilla_dia(fecha: datetime) -> int:
s = fecha.strftime("%Y-%m-%d")
return int(hashlib.md5(s.encode()).hexdigest(), 16) % (2**32)
# ---------------------------------------------------------------
# DATOS DE REFERENCIA
# ---------------------------------------------------------------
COLONIAS = [
"Zona Centro", "Las Arboledas", "Trojes",
"San Juanico", "Los Olivos", "Rancho Seco", "Las Insurgentes",
]
RUTAS = {
"Zona Centro": "RUTA-01",
"Las Arboledas": "RUTA-01",
"Trojes": "RUTA-13",
"San Juanico": "RUTA-03",
"Los Olivos": "RUTA-04",
"Rancho Seco": "RUTA-05",
"Las Insurgentes": "RUTA-12",
}
DIAS_SEMANA = ["Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado", "Domingo"]
# Factores base por colonia (densidad poblacional simulada)
FACTOR_COLONIA = {
"Zona Centro": 1.45,
"Las Arboledas": 1.10,
"Trojes": 0.85,
"San Juanico": 0.90,
"Los Olivos": 1.00,
"Rancho Seco": 0.75,
"Las Insurgentes": 1.20,
}
# Factores por día de la semana (lunes tras el finde = más basura)
FACTOR_DIA = {
0: 1.35, # Lunes
1: 1.05, # Martes
2: 1.00, # Miércoles
3: 0.95, # Jueves
4: 1.15, # Viernes
5: 1.25, # Sábado
6: 0.80, # Domingo
}
# ---------------------------------------------------------------
# GENERADOR DE HISTÓRICO SIMULADO
# Genera 90 días de datos de recolección por colonia
# ---------------------------------------------------------------
def _generar_historico(dias: int = 90) -> List[Dict]:
registros = []
hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
for d in range(dias, 0, -1):
fecha = hoy - timedelta(days=d)
rng = random.Random(_semilla_dia(fecha))
dia_semana = fecha.weekday()
for colonia in COLONIAS:
# Volumen base en kg: entre 800 y 2500 kg por día
base = rng.uniform(800, 2500)
factor = FACTOR_COLONIA[colonia] * FACTOR_DIA[dia_semana]
# Ruido aleatorio ±15%
ruido = rng.uniform(0.85, 1.15)
volumen = round(base * factor * ruido, 1)
# Incidencias: reportes de basura tirada fuera de horario
incidencias = rng.randint(0, int(factor * 5))
# Tiempo de recolección en minutos
tiempo_min = round(30 + (volumen / 100) * rng.uniform(0.8, 1.2), 0)
registros.append({
"fecha": fecha,
"fecha_str": fecha.strftime("%Y-%m-%d"),
"dia_semana": dia_semana,
"dia_nombre": DIAS_SEMANA[dia_semana],
"colonia": colonia,
"ruta_id": RUTAS[colonia],
"volumen_kg": volumen,
"incidencias": incidencias,
"tiempo_recoleccion_min": tiempo_min,
})
return registros
# ---------------------------------------------------------------
# ANÁLISIS: Días con más residuos
# ---------------------------------------------------------------
def analisis_por_dia_semana(historico: List[Dict]) -> List[Dict]:
"""Agrupa el volumen promedio y total por día de la semana."""
acumulado = {i: {"total_kg": 0.0, "count": 0, "incidencias": 0} for i in range(7)}
for r in historico:
d = r["dia_semana"]
acumulado[d]["total_kg"] += r["volumen_kg"]
acumulado[d]["count"] += 1
acumulado[d]["incidencias"] += r["incidencias"]
resultado = []
for dia_idx in range(7):
v = acumulado[dia_idx]
count = v["count"] if v["count"] > 0 else 1
resultado.append({
"dia": DIAS_SEMANA[dia_idx],
"dia_idx": dia_idx,
"promedio_kg": round(v["total_kg"] / count, 1),
"total_kg": round(v["total_kg"], 1),
"promedio_incidencias": round(v["incidencias"] / count, 1),
"semanas_registradas": count // len(COLONIAS),
})
# Ordenar por promedio descendente para ranking
resultado.sort(key=lambda x: x["promedio_kg"], reverse=True)
return resultado
# ---------------------------------------------------------------
# ANÁLISIS: Zonas críticas por colonia
# ---------------------------------------------------------------
def analisis_zonas_criticas(historico: List[Dict]) -> List[Dict]:
"""Identifica colonias con mayor volumen e incidencias."""
acumulado: Dict[str, Dict] = {}
for r in historico:
c = r["colonia"]
if c not in acumulado:
acumulado[c] = {
"total_kg": 0.0,
"total_incidencias": 0,
"total_tiempo": 0.0,
"count": 0,
}
acumulado[c]["total_kg"] += r["volumen_kg"]
acumulado[c]["total_incidencias"] += r["incidencias"]
acumulado[c]["total_tiempo"] += r["tiempo_recoleccion_min"]
acumulado[c]["count"] += 1
resultado = []
max_vol = max(v["total_kg"] / v["count"] for v in acumulado.values())
for colonia, v in acumulado.items():
count = v["count"] if v["count"] > 0 else 1
promedio_kg = v["total_kg"] / count
promedio_incidencias = v["total_incidencias"] / count
indice_criticidad = round((promedio_kg / max_vol) * 100, 1)
# Nivel de criticidad
if indice_criticidad >= 80:
nivel = "CRÍTICO"
elif indice_criticidad >= 60:
nivel = "ALTO"
elif indice_criticidad >= 40:
nivel = "MEDIO"
else:
nivel = "BAJO"
resultado.append({
"colonia": colonia,
"ruta_id": RUTAS[colonia],
"promedio_kg_dia": round(promedio_kg, 1),
"total_kg_90dias": round(v["total_kg"], 1),
"promedio_incidencias_dia": round(promedio_incidencias, 2),
"promedio_tiempo_min": round(v["total_tiempo"] / count, 1),
"indice_criticidad": indice_criticidad,
"nivel_criticidad": nivel,
})
resultado.sort(key=lambda x: x["indice_criticidad"], reverse=True)
return resultado
# ---------------------------------------------------------------
# ANÁLISIS PREDICTIVO: Regresión lineal simple con numpy
# Predice el volumen de los próximos 7 días por colonia
# ---------------------------------------------------------------
def predecir_proxima_semana(historico: List[Dict]) -> List[Dict]:
"""
Regresión lineal por colonia sobre los últimos 30 días.
Retorna predicción para los próximos 7 días.
MÉTODO:
- X = número de día (0..N)
- Y = volumen_kg
- Ajustamos y = mx + b con numpy.polyfit(deg=1)
- Proyectamos para los días N+1 .. N+7
- Aplicamos el factor de día de la semana para ajustar estacionalidad
"""
hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
predicciones = []
for colonia in COLONIAS:
# Filtrar últimos 30 días de esta colonia
datos_colonia = [
r for r in historico
if r["colonia"] == colonia
][-30:] # últimos 30 registros
if len(datos_colonia) < 7:
continue
x = np.array(range(len(datos_colonia)), dtype=float)
y = np.array([r["volumen_kg"] for r in datos_colonia], dtype=float)
# Regresión lineal
coefs = np.polyfit(x, y, 1) # [pendiente, intercepto]
pendiente = coefs[0]
intercepto = coefs[1]
# R² para medir calidad del ajuste
y_pred = np.polyval(coefs, x)
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
r2 = round(1 - (ss_res / ss_tot) if ss_tot > 0 else 0, 3)
# Proyección próximos 7 días
n = len(datos_colonia)
dias_pred = []
for i in range(7):
fecha_pred = hoy + timedelta(days=i + 1)
dia_semana = fecha_pred.weekday()
vol_base = np.polyval(coefs, n + i)
# Ajustar por factor estacional del día
vol_ajustado = max(0, vol_base * FACTOR_DIA[dia_semana])
dias_pred.append({
"fecha": fecha_pred.strftime("%Y-%m-%d"),
"dia": DIAS_SEMANA[dia_semana],
"volumen_predicho_kg": round(float(vol_ajustado), 1),
"confianza": "Alta" if r2 > 0.7 else ("Media" if r2 > 0.4 else "Baja"),
})
predicciones.append({
"colonia": colonia,
"ruta_id": RUTAS[colonia],
"tendencia": "CRECIENTE" if pendiente > 10 else ("DECRECIENTE" if pendiente < -10 else "ESTABLE"),
"pendiente_diaria_kg": round(float(pendiente), 2),
"r2": r2,
"prediccion_7dias": dias_pred,
"volumen_predicho_total_kg": round(sum(d["volumen_predicho_kg"] for d in dias_pred), 1),
})
predicciones.sort(key=lambda x: x["volumen_predicho_total_kg"], reverse=True)
return predicciones
# ---------------------------------------------------------------
# RECOMENDACIONES AUTOMÁTICAS DE LOGÍSTICA
# ---------------------------------------------------------------
def generar_recomendaciones(
zonas: List[Dict],
dias: List[Dict],
predicciones: List[Dict],
) -> List[Dict]:
"""Genera recomendaciones concretas basadas en el análisis."""
recomendaciones = []
# 1. Zonas críticas → refuerzo de camiones
criticas = [z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]
for z in criticas:
recomendaciones.append({
"tipo": "REFUERZO_RUTA",
"prioridad": "ALTA",
"emoji": "🚛",
"titulo": f"Refuerzo en {z['colonia']}",
"descripcion": (
f"{z['colonia']} genera en promedio {z['promedio_kg_dia']:.0f} kg/día "
f"con {z['promedio_incidencias_dia']:.1f} incidencias. "
f"Se recomienda asignar un segundo camión en días pico."
),
"ruta_afectada": z["ruta_id"],
})
# 2. Días pico → ajuste de horarios
dia_pico = dias[0] if dias else None
if dia_pico:
recomendaciones.append({
"tipo": "HORARIO_PICO",
"prioridad": "MEDIA",
"emoji": "",
"titulo": f"{dia_pico['dia']} es el día con más residuos",
"descripcion": (
f"El {dia_pico['dia']} se recolectan en promedio "
f"{dia_pico['promedio_kg']:.0f} kg por colonia. "
f"Considera iniciar rutas 30 minutos antes ese día."
),
"ruta_afectada": None,
})
# 3. Colonias con tendencia creciente → alerta temprana
crecientes = [p for p in predicciones if p["tendencia"] == "CRECIENTE"]
for p in crecientes[:2]: # máximo 2 alertas
recomendaciones.append({
"tipo": "TENDENCIA_CRECIENTE",
"prioridad": "MEDIA",
"emoji": "📈",
"titulo": f"Aumento en {p['colonia']}",
"descripcion": (
f"{p['colonia']} muestra una tendencia creciente de "
f"+{p['pendiente_diaria_kg']:.1f} kg/día. "
f"Volumen predicho esta semana: {p['volumen_predicho_total_kg']:.0f} kg."
),
"ruta_afectada": p["ruta_id"],
})
# 4. Colonias con muchas incidencias → campaña de concientización
alta_incidencia = sorted(zonas, key=lambda x: x["promedio_incidencias_dia"], reverse=True)
if alta_incidencia:
z = alta_incidencia[0]
recomendaciones.append({
"tipo": "CONCIENTIZACION",
"prioridad": "BAJA",
"emoji": "📢",
"titulo": f"Campaña en {z['colonia']}",
"descripcion": (
f"{z['colonia']} registra {z['promedio_incidencias_dia']:.1f} incidencias/día "
f"de basura fuera de horario. Se recomienda campaña de concientización ciudadana."
),
"ruta_afectada": z["ruta_id"],
})
return recomendaciones
# ---------------------------------------------------------------
# FUNCIÓN PRINCIPAL: Genera el reporte completo
# ---------------------------------------------------------------
def generar_reporte_completo(dias_historico: int = 90) -> Dict[str, Any]:
"""
Punto de entrada principal. Genera todos los análisis y los empaqueta.
Cachea el resultado por 10 minutos para no recalcular en cada request.
"""
historico = _generar_historico(dias_historico)
dias_semana = analisis_por_dia_semana(historico)
zonas = analisis_zonas_criticas(historico)
predicciones = predecir_proxima_semana(historico)
recomendaciones = generar_recomendaciones(zonas, dias_semana, predicciones)
# Resumen ejecutivo
total_kg_periodo = sum(r["volumen_kg"] for r in historico)
promedio_diario = total_kg_periodo / dias_historico if dias_historico > 0 else 0
return {
"generado_en": datetime.now().isoformat(),
"periodo_dias": dias_historico,
"resumen": {
"total_kg_recolectados": round(total_kg_periodo, 1),
"promedio_kg_dia": round(promedio_diario, 1),
"colonias_analizadas": len(COLONIAS),
"zonas_criticas": len([z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]),
"dia_pico": dias_semana[0]["dia"] if dias_semana else "N/A",
"colonia_mayor_volumen": zonas[0]["colonia"] if zonas else "N/A",
},
"dias_semana": dias_semana,
"zonas_criticas": zonas,
"prediccion_proxima_semana": predicciones,
"recomendaciones": recomendaciones,
}

View File

@@ -0,0 +1,414 @@
"""
===============================================================
analytics.py — v2: Datos REALES de la BD + fallback simulado
===============================================================
REEMPLAZA el analytics.py anterior completamente.
CAMBIO PRINCIPAL:
generar_reporte_completo_real(db) lee de RegistroRecoleccion
y ReporteUsuario en SQLite. Si hay pocos datos reales (<7 días),
mezcla con datos simulados para que el dashboard no quede vacío.
===============================================================
"""
import numpy as np
import random
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any, Optional
import hashlib
# ---------------------------------------------------------------
# CONSTANTES
# ---------------------------------------------------------------
COLONIAS = [
"Zona Centro", "Las Arboledas", "Trojes",
"San Juanico", "Los Olivos", "Rancho Seco", "Las Insurgentes",
]
RUTAS = {
"Zona Centro": "RUTA-01",
"Las Arboledas": "RUTA-01",
"Trojes": "RUTA-13",
"San Juanico": "RUTA-03",
"Los Olivos": "RUTA-04",
"Rancho Seco": "RUTA-05",
"Las Insurgentes": "RUTA-12",
}
DIAS_SEMANA = ["Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado", "Domingo"]
FACTOR_COLONIA = {
"Zona Centro": 1.45,
"Las Arboledas": 1.10,
"Trojes": 0.85,
"San Juanico": 0.90,
"Los Olivos": 1.00,
"Rancho Seco": 0.75,
"Las Insurgentes": 1.20,
}
FACTOR_DIA = {0: 1.35, 1: 1.05, 2: 1.00, 3: 0.95, 4: 1.15, 5: 1.25, 6: 0.80}
# ---------------------------------------------------------------
# GENERADOR DE HISTÓRICO SIMULADO (fallback cuando no hay datos)
# ---------------------------------------------------------------
def _semilla_dia(fecha: datetime) -> int:
s = fecha.strftime("%Y-%m-%d")
return int(hashlib.md5(s.encode()).hexdigest(), 16) % (2**32)
def _generar_historico_simulado(dias: int = 90) -> List[Dict]:
registros = []
hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
for d in range(dias, 0, -1):
fecha = hoy - timedelta(days=d)
rng = random.Random(_semilla_dia(fecha))
dia_semana = fecha.weekday()
for colonia in COLONIAS:
base = rng.uniform(800, 2500)
factor = FACTOR_COLONIA[colonia] * FACTOR_DIA[dia_semana]
ruido = rng.uniform(0.85, 1.15)
volumen = round(base * factor * ruido, 1)
incidencias = rng.randint(0, int(factor * 5))
tiempo_min = round(30 + (volumen / 100) * rng.uniform(0.8, 1.2), 0)
registros.append({
"fecha": fecha,
"fecha_str": fecha.strftime("%Y-%m-%d"),
"dia_semana": dia_semana,
"dia_nombre": DIAS_SEMANA[dia_semana],
"colonia": colonia,
"ruta_id": RUTAS[colonia],
"volumen_kg": volumen,
"incidencias": incidencias,
"tiempo_recoleccion_min": tiempo_min,
"fuente": "simulado",
})
return registros
# ---------------------------------------------------------------
# LEER DATOS REALES DE LA BD
# ---------------------------------------------------------------
def _leer_historico_real(db, dias: int = 90) -> List[Dict]:
"""Lee RegistroRecoleccion de la BD y los convierte al formato interno."""
desde = (datetime.now(timezone.utc) - timedelta(days=dias)).strftime("%Y-%m-%d")
try:
from sqlalchemy import text
rows = db.execute(
text("""
SELECT fecha, colonia, ruta_id,
SUM(volumen_kg) as volumen_kg,
SUM(incidencias) as incidencias,
AVG(tiempo_min) as tiempo_min,
fuente
FROM registros_recoleccion
WHERE fecha >= :desde
GROUP BY fecha, colonia, ruta_id, fuente
ORDER BY fecha DESC
"""),
{"desde": desde}
).fetchall()
except Exception:
return []
registros = []
for row in rows:
try:
fecha = datetime.strptime(row[0], "%Y-%m-%d")
except Exception:
continue
registros.append({
"fecha": fecha,
"fecha_str": row[0],
"dia_semana": fecha.weekday(),
"dia_nombre": DIAS_SEMANA[fecha.weekday()],
"colonia": row[1],
"ruta_id": row[2],
"volumen_kg": float(row[3] or 0),
"incidencias": int(row[4] or 0),
"tiempo_recoleccion_min": float(row[5] or 0),
"fuente": row[6],
})
return registros
def _leer_reportes_usuario(db, dias: int = 30) -> List[Dict]:
"""Lee los reportes del usuario para mostrarlos en el análisis."""
desde = (datetime.now(timezone.utc) - timedelta(days=dias)).strftime("%Y-%m-%d")
try:
from sqlalchemy import text
rows = db.execute(
text("""
SELECT r.fecha, r.colonia, r.tipo_reporte,
COUNT(*) as total, SUM(COALESCE(r.volumen_kg, 0)) as vol_total,
u.nombre
FROM reportes_usuario r
LEFT JOIN usuarios u ON r.usuario_id = u.id
WHERE r.fecha >= :desde
GROUP BY r.fecha, r.colonia, r.tipo_reporte
ORDER BY r.fecha DESC
"""),
{"desde": desde}
).fetchall()
return [
{
"fecha": row[0], "colonia": row[1],
"tipo": row[2], "total": row[3],
"volumen": float(row[4] or 0),
}
for row in rows
]
except Exception:
return []
# ---------------------------------------------------------------
# ANÁLISIS (idéntico al anterior, funciona con cualquier historico)
# ---------------------------------------------------------------
def _analisis_por_dia_semana(historico: List[Dict]) -> List[Dict]:
acumulado = {i: {"total_kg": 0.0, "count": 0, "incidencias": 0} for i in range(7)}
for r in historico:
d = r["dia_semana"]
acumulado[d]["total_kg"] += r["volumen_kg"]
acumulado[d]["count"] += 1
acumulado[d]["incidencias"] += r["incidencias"]
resultado = []
for dia_idx in range(7):
v = acumulado[dia_idx]
count = max(v["count"], 1)
resultado.append({
"dia": DIAS_SEMANA[dia_idx],
"dia_idx": dia_idx,
"promedio_kg": round(v["total_kg"] / count, 1),
"total_kg": round(v["total_kg"], 1),
"promedio_incidencias": round(v["incidencias"] / count, 1),
"semanas_registradas": count // max(len(COLONIAS), 1),
})
resultado.sort(key=lambda x: x["promedio_kg"], reverse=True)
return resultado
def _analisis_zonas_criticas(historico: List[Dict]) -> List[Dict]:
acumulado: Dict[str, Dict] = {}
for r in historico:
c = r["colonia"]
if c not in acumulado:
acumulado[c] = {"total_kg": 0.0, "total_incidencias": 0, "total_tiempo": 0.0, "count": 0}
acumulado[c]["total_kg"] += r["volumen_kg"]
acumulado[c]["total_incidencias"] += r["incidencias"]
acumulado[c]["total_tiempo"] += r["tiempo_recoleccion_min"]
acumulado[c]["count"] += 1
if not acumulado:
return []
max_vol = max(v["total_kg"] / max(v["count"], 1) for v in acumulado.values())
resultado = []
for colonia, v in acumulado.items():
count = max(v["count"], 1)
promedio_kg = v["total_kg"] / count
promedio_inc = v["total_incidencias"] / count
idx = round((promedio_kg / max_vol) * 100, 1) if max_vol > 0 else 0
if idx >= 80: nivel = "CRÍTICO"
elif idx >= 60: nivel = "ALTO"
elif idx >= 40: nivel = "MEDIO"
else: nivel = "BAJO"
resultado.append({
"colonia": colonia,
"ruta_id": RUTAS.get(colonia, ""),
"promedio_kg_dia": round(promedio_kg, 1),
"total_kg_90dias": round(v["total_kg"], 1),
"promedio_incidencias_dia": round(promedio_inc, 2),
"promedio_tiempo_min": round(v["total_tiempo"] / count, 1),
"indice_criticidad": idx,
"nivel_criticidad": nivel,
})
resultado.sort(key=lambda x: x["indice_criticidad"], reverse=True)
return resultado
def _predecir_proxima_semana(historico: List[Dict]) -> List[Dict]:
hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
predicciones = []
for colonia in COLONIAS:
datos = [r for r in historico if r["colonia"] == colonia][-30:]
if len(datos) < 5:
continue
x = np.array(range(len(datos)), dtype=float)
y = np.array([r["volumen_kg"] for r in datos], dtype=float)
coefs = np.polyfit(x, y, 1)
y_pred = np.polyval(coefs, x)
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
r2 = round(1 - (ss_res / ss_tot) if ss_tot > 0 else 0, 3)
n = len(datos)
dias_pred = []
for i in range(7):
fecha_pred = hoy + timedelta(days=i + 1)
dia_semana = fecha_pred.weekday()
vol_base = np.polyval(coefs, n + i)
vol_ajustado = max(0, vol_base * FACTOR_DIA[dia_semana])
dias_pred.append({
"fecha": fecha_pred.strftime("%Y-%m-%d"),
"dia": DIAS_SEMANA[dia_semana],
"volumen_predicho_kg": round(float(vol_ajustado), 1),
"confianza": "Alta" if r2 > 0.7 else ("Media" if r2 > 0.4 else "Baja"),
})
pendiente = float(coefs[0])
predicciones.append({
"colonia": colonia,
"ruta_id": RUTAS.get(colonia, ""),
"tendencia": "CRECIENTE" if pendiente > 10 else ("DECRECIENTE" if pendiente < -10 else "ESTABLE"),
"pendiente_diaria_kg": round(pendiente, 2),
"r2": r2,
"prediccion_7dias": dias_pred,
"volumen_predicho_total_kg": round(sum(d["volumen_predicho_kg"] for d in dias_pred), 1),
})
predicciones.sort(key=lambda x: x["volumen_predicho_total_kg"], reverse=True)
return predicciones
def _generar_recomendaciones(zonas, dias, predicciones) -> List[Dict]:
recomendaciones = []
criticas = [z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]
for z in criticas:
recomendaciones.append({
"tipo": "REFUERZO_RUTA", "prioridad": "ALTA", "emoji": "🚛",
"titulo": f"Refuerzo en {z['colonia']}",
"descripcion": (
f"{z['colonia']} genera {z['promedio_kg_dia']:.0f} kg/día promedio "
f"con {z['promedio_incidencias_dia']:.1f} incidencias. "
f"Se recomienda reforzar el camión en días pico."
),
"ruta_afectada": z["ruta_id"],
})
if dias:
dia_pico = dias[0]
recomendaciones.append({
"tipo": "HORARIO_PICO", "prioridad": "MEDIA", "emoji": "",
"titulo": f"{dia_pico['dia']} es el día con más residuos",
"descripcion": (
f"El {dia_pico['dia']} se recolectan {dia_pico['promedio_kg']:.0f} kg/colonia en promedio. "
f"Considera iniciar 30 minutos antes ese día."
),
"ruta_afectada": None,
})
crecientes = [p for p in predicciones if p["tendencia"] == "CRECIENTE"]
for p in crecientes[:2]:
recomendaciones.append({
"tipo": "TENDENCIA_CRECIENTE", "prioridad": "MEDIA", "emoji": "📈",
"titulo": f"Aumento en {p['colonia']}",
"descripcion": (
f"Tendencia creciente de +{p['pendiente_diaria_kg']:.1f} kg/día. "
f"Volumen predicho esta semana: {p['volumen_predicho_total_kg']:.0f} kg."
),
"ruta_afectada": p["ruta_id"],
})
if zonas:
z = max(zonas, key=lambda x: x["promedio_incidencias_dia"])
recomendaciones.append({
"tipo": "CONCIENTIZACION", "prioridad": "BAJA", "emoji": "📢",
"titulo": f"Campaña en {z['colonia']}",
"descripcion": (
f"{z['colonia']} tiene {z['promedio_incidencias_dia']:.1f} incidencias/día. "
f"Se recomienda campaña de concientización."
),
"ruta_afectada": z["ruta_id"],
})
return recomendaciones
# ---------------------------------------------------------------
# FUNCIÓN PRINCIPAL — DATOS REALES + FALLBACK
# ---------------------------------------------------------------
def generar_reporte_completo_real(db) -> Dict[str, Any]:
"""
Lee datos reales de la BD. Si hay menos de 7 días de datos reales,
complementa con datos simulados para que el dashboard no quede vacío.
"""
historico_real = _leer_historico_real(db, dias=90)
reportes_usuario = _leer_reportes_usuario(db, dias=30)
# Calcular días únicos con datos reales
dias_con_datos = len({r["fecha_str"] for r in historico_real})
if dias_con_datos < 7:
# Pocos datos reales — mezclar con simulados
historico_sim = _generar_historico_simulado(dias=90 - dias_con_datos)
historico = historico_real + historico_sim
fuente_datos = f"mixto ({dias_con_datos} días reales + simulados)"
else:
historico = historico_real
fuente_datos = f"real ({dias_con_datos} días)"
dias_semana = _analisis_por_dia_semana(historico)
zonas = _analisis_zonas_criticas(historico)
predicciones = _predecir_proxima_semana(historico)
recomendaciones = _generar_recomendaciones(zonas, dias_semana, predicciones)
total_kg = sum(r["volumen_kg"] for r in historico)
n_dias = max(len({r["fecha_str"] for r in historico}), 1)
# Estadísticas de reportes de usuarios
total_reportes_usuario = sum(r["total"] for r in reportes_usuario)
incidencias_por_colonia = {}
for r in reportes_usuario:
if r["tipo"] == "incidencia":
incidencias_por_colonia[r["colonia"]] = \
incidencias_por_colonia.get(r["colonia"], 0) + r["total"]
return {
"generado_en": datetime.now().isoformat(),
"fuente_datos": fuente_datos,
"periodo_dias": 90,
"resumen": {
"total_kg_recolectados": round(total_kg, 1),
"promedio_kg_dia": round(total_kg / n_dias, 1),
"colonias_analizadas": len(COLONIAS),
"zonas_criticas": len([z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]),
"dia_pico": dias_semana[0]["dia"] if dias_semana else "N/A",
"colonia_mayor_volumen": zonas[0]["colonia"] if zonas else "N/A",
"reportes_ciudadanos": total_reportes_usuario,
"colonias_con_incidencias": len(incidencias_por_colonia),
},
"dias_semana": dias_semana,
"zonas_criticas": zonas,
"prediccion_proxima_semana": predicciones,
"recomendaciones": recomendaciones,
"reportes_ciudadanos_recientes": reportes_usuario[:10],
}
# Mantener compatibilidad con el import anterior
def generar_reporte_completo(db=None, dias_historico: int = 90) -> Dict[str, Any]:
"""Versión sin DB (fallback puro simulado)."""
historico = _generar_historico_simulado(dias_historico)
dias_semana = _analisis_por_dia_semana(historico)
zonas = _analisis_zonas_criticas(historico)
predicciones = _predecir_proxima_semana(historico)
recomendaciones = _generar_recomendaciones(zonas, dias_semana, predicciones)
total_kg = sum(r["volumen_kg"] for r in historico)
return {
"generado_en": datetime.now().isoformat(),
"fuente_datos": "simulado",
"periodo_dias": dias_historico,
"resumen": {
"total_kg_recolectados": round(total_kg, 1),
"promedio_kg_dia": round(total_kg / max(dias_historico, 1), 1),
"colonias_analizadas": len(COLONIAS),
"zonas_criticas": len([z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]),
"dia_pico": dias_semana[0]["dia"] if dias_semana else "N/A",
"colonia_mayor_volumen": zonas[0]["colonia"] if zonas else "N/A",
"reportes_ciudadanos": 0,
"colonias_con_incidencias": 0,
},
"dias_semana": dias_semana,
"zonas_criticas": zonas,
"prediccion_proxima_semana": predicciones,
"recomendaciones": recomendaciones,
"reportes_ciudadanos_recientes": [],
}

Binary file not shown.

File diff suppressed because it is too large Load Diff