""" =============================================================== analytics.py — v2: Datos REALES de la BD + fallback simulado =============================================================== REEMPLAZA el analytics.py anterior completamente. CAMBIO PRINCIPAL: generar_reporte_completo_real(db) lee de RegistroRecoleccion y ReporteUsuario en SQLite. Si hay pocos datos reales (<7 días), mezcla con datos simulados para que el dashboard no quede vacío. =============================================================== """ import numpy as np import random from datetime import datetime, timedelta, timezone from typing import List, Dict, Any, Optional import hashlib # --------------------------------------------------------------- # CONSTANTES # --------------------------------------------------------------- COLONIAS = [ "Zona Centro", "Las Arboledas", "Trojes", "San Juanico", "Los Olivos", "Rancho Seco", "Las Insurgentes", ] RUTAS = { "Zona Centro": "RUTA-01", "Las Arboledas": "RUTA-01", "Trojes": "RUTA-13", "San Juanico": "RUTA-03", "Los Olivos": "RUTA-04", "Rancho Seco": "RUTA-05", "Las Insurgentes": "RUTA-12", } DIAS_SEMANA = ["Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado", "Domingo"] FACTOR_COLONIA = { "Zona Centro": 1.45, "Las Arboledas": 1.10, "Trojes": 0.85, "San Juanico": 0.90, "Los Olivos": 1.00, "Rancho Seco": 0.75, "Las Insurgentes": 1.20, } FACTOR_DIA = {0: 1.35, 1: 1.05, 2: 1.00, 3: 0.95, 4: 1.15, 5: 1.25, 6: 0.80} # --------------------------------------------------------------- # GENERADOR DE HISTÓRICO SIMULADO (fallback cuando no hay datos) # --------------------------------------------------------------- def _semilla_dia(fecha: datetime) -> int: s = fecha.strftime("%Y-%m-%d") return int(hashlib.md5(s.encode()).hexdigest(), 16) % (2**32) def _generar_historico_simulado(dias: int = 90) -> List[Dict]: registros = [] hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) for d in range(dias, 0, -1): fecha = hoy - timedelta(days=d) rng = random.Random(_semilla_dia(fecha)) dia_semana = fecha.weekday() for colonia in COLONIAS: base = rng.uniform(800, 2500) factor = FACTOR_COLONIA[colonia] * FACTOR_DIA[dia_semana] ruido = rng.uniform(0.85, 1.15) volumen = round(base * factor * ruido, 1) incidencias = rng.randint(0, int(factor * 5)) tiempo_min = round(30 + (volumen / 100) * rng.uniform(0.8, 1.2), 0) registros.append({ "fecha": fecha, "fecha_str": fecha.strftime("%Y-%m-%d"), "dia_semana": dia_semana, "dia_nombre": DIAS_SEMANA[dia_semana], "colonia": colonia, "ruta_id": RUTAS[colonia], "volumen_kg": volumen, "incidencias": incidencias, "tiempo_recoleccion_min": tiempo_min, "fuente": "simulado", }) return registros # --------------------------------------------------------------- # LEER DATOS REALES DE LA BD # --------------------------------------------------------------- def _leer_historico_real(db, dias: int = 90) -> List[Dict]: """Lee RegistroRecoleccion de la BD y los convierte al formato interno.""" desde = (datetime.now(timezone.utc) - timedelta(days=dias)).strftime("%Y-%m-%d") try: from sqlalchemy import text rows = db.execute( text(""" SELECT fecha, colonia, ruta_id, SUM(volumen_kg) as volumen_kg, SUM(incidencias) as incidencias, AVG(tiempo_min) as tiempo_min, fuente FROM registros_recoleccion WHERE fecha >= :desde GROUP BY fecha, colonia, ruta_id, fuente ORDER BY fecha DESC """), {"desde": desde} ).fetchall() except Exception: return [] registros = [] for row in rows: try: fecha = datetime.strptime(row[0], "%Y-%m-%d") except Exception: continue registros.append({ "fecha": fecha, "fecha_str": row[0], "dia_semana": fecha.weekday(), "dia_nombre": DIAS_SEMANA[fecha.weekday()], "colonia": row[1], "ruta_id": row[2], "volumen_kg": float(row[3] or 0), "incidencias": int(row[4] or 0), "tiempo_recoleccion_min": float(row[5] or 0), "fuente": row[6], }) return registros def _leer_reportes_usuario(db, dias: int = 30) -> List[Dict]: """Lee los reportes del usuario para mostrarlos en el análisis.""" desde = (datetime.now(timezone.utc) - timedelta(days=dias)).strftime("%Y-%m-%d") try: from sqlalchemy import text rows = db.execute( text(""" SELECT r.fecha, r.colonia, r.tipo_reporte, COUNT(*) as total, SUM(COALESCE(r.volumen_kg, 0)) as vol_total, u.nombre FROM reportes_usuario r LEFT JOIN usuarios u ON r.usuario_id = u.id WHERE r.fecha >= :desde GROUP BY r.fecha, r.colonia, r.tipo_reporte ORDER BY r.fecha DESC """), {"desde": desde} ).fetchall() return [ { "fecha": row[0], "colonia": row[1], "tipo": row[2], "total": row[3], "volumen": float(row[4] or 0), } for row in rows ] except Exception: return [] # --------------------------------------------------------------- # ANÁLISIS (idéntico al anterior, funciona con cualquier historico) # --------------------------------------------------------------- def _analisis_por_dia_semana(historico: List[Dict]) -> List[Dict]: acumulado = {i: {"total_kg": 0.0, "count": 0, "incidencias": 0} for i in range(7)} for r in historico: d = r["dia_semana"] acumulado[d]["total_kg"] += r["volumen_kg"] acumulado[d]["count"] += 1 acumulado[d]["incidencias"] += r["incidencias"] resultado = [] for dia_idx in range(7): v = acumulado[dia_idx] count = max(v["count"], 1) resultado.append({ "dia": DIAS_SEMANA[dia_idx], "dia_idx": dia_idx, "promedio_kg": round(v["total_kg"] / count, 1), "total_kg": round(v["total_kg"], 1), "promedio_incidencias": round(v["incidencias"] / count, 1), "semanas_registradas": count // max(len(COLONIAS), 1), }) resultado.sort(key=lambda x: x["promedio_kg"], reverse=True) return resultado def _analisis_zonas_criticas(historico: List[Dict]) -> List[Dict]: acumulado: Dict[str, Dict] = {} for r in historico: c = r["colonia"] if c not in acumulado: acumulado[c] = {"total_kg": 0.0, "total_incidencias": 0, "total_tiempo": 0.0, "count": 0} acumulado[c]["total_kg"] += r["volumen_kg"] acumulado[c]["total_incidencias"] += r["incidencias"] acumulado[c]["total_tiempo"] += r["tiempo_recoleccion_min"] acumulado[c]["count"] += 1 if not acumulado: return [] max_vol = max(v["total_kg"] / max(v["count"], 1) for v in acumulado.values()) resultado = [] for colonia, v in acumulado.items(): count = max(v["count"], 1) promedio_kg = v["total_kg"] / count promedio_inc = v["total_incidencias"] / count idx = round((promedio_kg / max_vol) * 100, 1) if max_vol > 0 else 0 if idx >= 80: nivel = "CRÍTICO" elif idx >= 60: nivel = "ALTO" elif idx >= 40: nivel = "MEDIO" else: nivel = "BAJO" resultado.append({ "colonia": colonia, "ruta_id": RUTAS.get(colonia, ""), "promedio_kg_dia": round(promedio_kg, 1), "total_kg_90dias": round(v["total_kg"], 1), "promedio_incidencias_dia": round(promedio_inc, 2), "promedio_tiempo_min": round(v["total_tiempo"] / count, 1), "indice_criticidad": idx, "nivel_criticidad": nivel, }) resultado.sort(key=lambda x: x["indice_criticidad"], reverse=True) return resultado def _predecir_proxima_semana(historico: List[Dict]) -> List[Dict]: hoy = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) predicciones = [] for colonia in COLONIAS: datos = [r for r in historico if r["colonia"] == colonia][-30:] if len(datos) < 5: continue x = np.array(range(len(datos)), dtype=float) y = np.array([r["volumen_kg"] for r in datos], dtype=float) coefs = np.polyfit(x, y, 1) y_pred = np.polyval(coefs, x) ss_res = np.sum((y - y_pred) ** 2) ss_tot = np.sum((y - np.mean(y)) ** 2) r2 = round(1 - (ss_res / ss_tot) if ss_tot > 0 else 0, 3) n = len(datos) dias_pred = [] for i in range(7): fecha_pred = hoy + timedelta(days=i + 1) dia_semana = fecha_pred.weekday() vol_base = np.polyval(coefs, n + i) vol_ajustado = max(0, vol_base * FACTOR_DIA[dia_semana]) dias_pred.append({ "fecha": fecha_pred.strftime("%Y-%m-%d"), "dia": DIAS_SEMANA[dia_semana], "volumen_predicho_kg": round(float(vol_ajustado), 1), "confianza": "Alta" if r2 > 0.7 else ("Media" if r2 > 0.4 else "Baja"), }) pendiente = float(coefs[0]) predicciones.append({ "colonia": colonia, "ruta_id": RUTAS.get(colonia, ""), "tendencia": "CRECIENTE" if pendiente > 10 else ("DECRECIENTE" if pendiente < -10 else "ESTABLE"), "pendiente_diaria_kg": round(pendiente, 2), "r2": r2, "prediccion_7dias": dias_pred, "volumen_predicho_total_kg": round(sum(d["volumen_predicho_kg"] for d in dias_pred), 1), }) predicciones.sort(key=lambda x: x["volumen_predicho_total_kg"], reverse=True) return predicciones def _generar_recomendaciones(zonas, dias, predicciones) -> List[Dict]: recomendaciones = [] criticas = [z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"] for z in criticas: recomendaciones.append({ "tipo": "REFUERZO_RUTA", "prioridad": "ALTA", "emoji": "🚛", "titulo": f"Refuerzo en {z['colonia']}", "descripcion": ( f"{z['colonia']} genera {z['promedio_kg_dia']:.0f} kg/día promedio " f"con {z['promedio_incidencias_dia']:.1f} incidencias. " f"Se recomienda reforzar el camión en días pico." ), "ruta_afectada": z["ruta_id"], }) if dias: dia_pico = dias[0] recomendaciones.append({ "tipo": "HORARIO_PICO", "prioridad": "MEDIA", "emoji": "⏰", "titulo": f"{dia_pico['dia']} es el día con más residuos", "descripcion": ( f"El {dia_pico['dia']} se recolectan {dia_pico['promedio_kg']:.0f} kg/colonia en promedio. " f"Considera iniciar 30 minutos antes ese día." ), "ruta_afectada": None, }) crecientes = [p for p in predicciones if p["tendencia"] == "CRECIENTE"] for p in crecientes[:2]: recomendaciones.append({ "tipo": "TENDENCIA_CRECIENTE", "prioridad": "MEDIA", "emoji": "📈", "titulo": f"Aumento en {p['colonia']}", "descripcion": ( f"Tendencia creciente de +{p['pendiente_diaria_kg']:.1f} kg/día. " f"Volumen predicho esta semana: {p['volumen_predicho_total_kg']:.0f} kg." ), "ruta_afectada": p["ruta_id"], }) if zonas: z = max(zonas, key=lambda x: x["promedio_incidencias_dia"]) recomendaciones.append({ "tipo": "CONCIENTIZACION", "prioridad": "BAJA", "emoji": "📢", "titulo": f"Campaña en {z['colonia']}", "descripcion": ( f"{z['colonia']} tiene {z['promedio_incidencias_dia']:.1f} incidencias/día. " f"Se recomienda campaña de concientización." ), "ruta_afectada": z["ruta_id"], }) return recomendaciones # --------------------------------------------------------------- # FUNCIÓN PRINCIPAL — DATOS REALES + FALLBACK # --------------------------------------------------------------- def generar_reporte_completo_real(db) -> Dict[str, Any]: """ Lee datos reales de la BD. Si hay menos de 7 días de datos reales, complementa con datos simulados para que el dashboard no quede vacío. """ historico_real = _leer_historico_real(db, dias=90) reportes_usuario = _leer_reportes_usuario(db, dias=30) # Calcular días únicos con datos reales dias_con_datos = len({r["fecha_str"] for r in historico_real}) if dias_con_datos < 7: # Pocos datos reales — mezclar con simulados historico_sim = _generar_historico_simulado(dias=90 - dias_con_datos) historico = historico_real + historico_sim fuente_datos = f"mixto ({dias_con_datos} días reales + simulados)" else: historico = historico_real fuente_datos = f"real ({dias_con_datos} días)" dias_semana = _analisis_por_dia_semana(historico) zonas = _analisis_zonas_criticas(historico) predicciones = _predecir_proxima_semana(historico) recomendaciones = _generar_recomendaciones(zonas, dias_semana, predicciones) total_kg = sum(r["volumen_kg"] for r in historico) n_dias = max(len({r["fecha_str"] for r in historico}), 1) # Estadísticas de reportes de usuarios total_reportes_usuario = sum(r["total"] for r in reportes_usuario) incidencias_por_colonia = {} for r in reportes_usuario: if r["tipo"] == "incidencia": incidencias_por_colonia[r["colonia"]] = \ incidencias_por_colonia.get(r["colonia"], 0) + r["total"] return { "generado_en": datetime.now().isoformat(), "fuente_datos": fuente_datos, "periodo_dias": 90, "resumen": { "total_kg_recolectados": round(total_kg, 1), "promedio_kg_dia": round(total_kg / n_dias, 1), "colonias_analizadas": len(COLONIAS), "zonas_criticas": len([z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]), "dia_pico": dias_semana[0]["dia"] if dias_semana else "N/A", "colonia_mayor_volumen": zonas[0]["colonia"] if zonas else "N/A", "reportes_ciudadanos": total_reportes_usuario, "colonias_con_incidencias": len(incidencias_por_colonia), }, "dias_semana": dias_semana, "zonas_criticas": zonas, "prediccion_proxima_semana": predicciones, "recomendaciones": recomendaciones, "reportes_ciudadanos_recientes": reportes_usuario[:10], } # Mantener compatibilidad con el import anterior def generar_reporte_completo(db=None, dias_historico: int = 90) -> Dict[str, Any]: """Versión sin DB (fallback puro simulado).""" historico = _generar_historico_simulado(dias_historico) dias_semana = _analisis_por_dia_semana(historico) zonas = _analisis_zonas_criticas(historico) predicciones = _predecir_proxima_semana(historico) recomendaciones = _generar_recomendaciones(zonas, dias_semana, predicciones) total_kg = sum(r["volumen_kg"] for r in historico) return { "generado_en": datetime.now().isoformat(), "fuente_datos": "simulado", "periodo_dias": dias_historico, "resumen": { "total_kg_recolectados": round(total_kg, 1), "promedio_kg_dia": round(total_kg / max(dias_historico, 1), 1), "colonias_analizadas": len(COLONIAS), "zonas_criticas": len([z for z in zonas if z["nivel_criticidad"] == "CRÍTICO"]), "dia_pico": dias_semana[0]["dia"] if dias_semana else "N/A", "colonia_mayor_volumen": zonas[0]["colonia"] if zonas else "N/A", "reportes_ciudadanos": 0, "colonias_con_incidencias": 0, }, "dias_semana": dias_semana, "zonas_criticas": zonas, "prediccion_proxima_semana": predicciones, "recomendaciones": recomendaciones, "reportes_ciudadanos_recientes": [], }