Files
hackathon-innovaflow5.0-cdf…/backend/app/services/notifications.py

46 lines
1.7 KiB
Python

import os
import firebase_admin
from firebase_admin import credentials, messaging
_firebase_initialized = False
def init_firebase(cred_path: str):
"""Inicializa la conexión con Firebase usando el Service Account (JSON)."""
global _firebase_initialized
if os.path.exists(cred_path):
try:
cred = credentials.Certificate(cred_path)
firebase_admin.initialize_app(cred)
_firebase_initialized = True
print(f"Firebase Admin SDK inicializado correctamente desde: {cred_path}")
except ValueError:
# Si el entorno se recarga, Firebase podría quejarse de que ya se inicializó
_firebase_initialized = True
except Exception as e:
print(f"Error al inicializar Firebase: {e}")
else:
print(f"ADVERTENCIA: Credenciales no encontradas en '{cred_path}'.")
print("Las notificaciones se ejecutarán en modo SIMULADO (solo consola).")
def send_to_topic(topic: str, payload: dict):
"""Envía una notificación push a todos los dispositivos suscritos a un topic (ej. RUTA-01)."""
title = payload.get("title", "")
body = payload.get("body", "")
if not _firebase_initialized:
print(f"[MOCK PUSH] -> Topic: {topic} | Título: '{title}' | Mensaje: '{body}'")
return
try:
message = messaging.Message(
notification=messaging.Notification(
title=title,
body=body,
),
topic=topic,
)
response = messaging.send(message)
print(f"Push enviado al topic '{topic}' exitosamente. MessageID: {response}")
except Exception as e:
print(f"Error al enviar push al topic '{topic}': {e}")