from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from sqlalchemy import desc from pathlib import Path from datetime import datetime, timedelta, timezone from typing import List, Dict, Any import os import logging import httpx from backend.database import get_db from backend.models import UnitHistory, Emitter, RosterUnit log = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["activity"]) SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200") def _humanize_age(seconds: float) -> str: if seconds < 60: return "just now" if seconds < 3600: return f"{int(seconds / 60)}m ago" if seconds < 86400: hrs = seconds / 3600 return f"{int(hrs)}h {int((hrs % 1) * 60)}m ago" return f"{int(seconds / 86400)}d ago" PHOTOS_BASE_DIR = Path("data/photos") @router.get("/recent-activity") def get_recent_activity(limit: int = 20, db: Session = Depends(get_db)): """ Get recent activity feed combining unit history changes and photo uploads. Returns a unified timeline of events sorted by timestamp (newest first). """ activities = [] # Get recent history entries history_entries = db.query(UnitHistory)\ .order_by(desc(UnitHistory.changed_at))\ .limit(limit * 2)\ .all() # Get more than needed to mix with photos for entry in history_entries: activity = { "type": "history", "timestamp": entry.changed_at.isoformat(), "timestamp_unix": entry.changed_at.timestamp(), "unit_id": entry.unit_id, "change_type": entry.change_type, "field_name": entry.field_name, "old_value": entry.old_value, "new_value": entry.new_value, "source": entry.source, "notes": entry.notes } activities.append(activity) # Get recent photos if PHOTOS_BASE_DIR.exists(): image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp"} photo_activities = [] for unit_dir in PHOTOS_BASE_DIR.iterdir(): if not unit_dir.is_dir(): continue unit_id = unit_dir.name for file_path in unit_dir.iterdir(): if file_path.is_file() and file_path.suffix.lower() in image_extensions: modified_time = file_path.stat().st_mtime photo_activities.append({ "type": "photo", "timestamp": datetime.fromtimestamp(modified_time).isoformat(), "timestamp_unix": modified_time, "unit_id": unit_id, "filename": file_path.name, "photo_url": f"/api/unit/{unit_id}/photo/{file_path.name}" }) activities.extend(photo_activities) # Sort all activities by timestamp (newest first) activities.sort(key=lambda x: x["timestamp_unix"], reverse=True) # Limit to requested number activities = activities[:limit] return { "activities": activities, "total": len(activities) } @router.get("/recent-callins") def get_recent_callins(hours: int = 6, limit: int = None, db: Session = Depends(get_db)): """ Get recent unit call-ins (units that have reported recently). Returns units sorted by most recent last_seen timestamp. Args: hours: Look back this many hours (default: 6) limit: Maximum number of results (default: None = all) """ # Calculate the time threshold time_threshold = datetime.now(timezone.utc) - timedelta(hours=hours) # Query emitters with recent activity, joined with roster info recent_emitters = db.query(Emitter)\ .filter(Emitter.last_seen >= time_threshold)\ .order_by(desc(Emitter.last_seen))\ .all() # Get roster info for all units roster_dict = {r.id: r for r in db.query(RosterUnit).all()} call_ins = [] for emitter in recent_emitters: roster_unit = roster_dict.get(emitter.id) # Calculate time since last seen last_seen_utc = emitter.last_seen.replace(tzinfo=timezone.utc) if emitter.last_seen.tzinfo is None else emitter.last_seen time_diff = datetime.now(timezone.utc) - last_seen_utc # Format time ago if time_diff.total_seconds() < 60: time_ago = "just now" elif time_diff.total_seconds() < 3600: minutes = int(time_diff.total_seconds() / 60) time_ago = f"{minutes}m ago" else: hours_ago = time_diff.total_seconds() / 3600 if hours_ago < 24: time_ago = f"{int(hours_ago)}h {int((hours_ago % 1) * 60)}m ago" else: days = int(hours_ago / 24) time_ago = f"{days}d ago" call_in = { "unit_id": emitter.id, "last_seen": emitter.last_seen.isoformat(), "time_ago": time_ago, "status": emitter.status, "device_type": roster_unit.device_type if roster_unit else "seismograph", "deployed": roster_unit.deployed if roster_unit else False, "note": roster_unit.note if roster_unit and roster_unit.note else "", "location": roster_unit.address if roster_unit and roster_unit.address else (roster_unit.location if roster_unit else "") } call_ins.append(call_in) # Apply limit if specified if limit: call_ins = call_ins[:limit] return { "call_ins": call_ins, "total": len(call_ins), "hours": hours, "time_threshold": time_threshold.isoformat() } @router.get("/recent-event-callins") async def get_recent_event_callins(limit: int = 10, db: Session = Depends(get_db)): """ Recent unit call-ins derived from SFM event forwards. Architecture context: the live ACH replacement is on hold, so call-homes arrive as Blastware ACH event files forwarded by series3-watcher and landed in the SFM events store. One event ≈ one call-in. This is the forward-looking source of "recent call-ins" that will eventually replace the heartbeat-based /recent-callins endpoint entirely. Each row represents one event; multiple consecutive events from the same serial are intentionally NOT collapsed — each one is a distinct call-home. """ try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( f"{SFM_BASE_URL}/db/events", params={"limit": limit}, ) resp.raise_for_status() payload = resp.json() except httpx.HTTPError as e: log.warning("SFM /db/events failed for recent-event-callins: %s", e) return {"call_ins": [], "total": 0, "error": str(e)} events = payload.get("events", []) or [] # Bulk-resolve serials → roster (single query, no N+1) serials = list({ev.get("serial") for ev in events if ev.get("serial")}) roster_map: Dict[str, RosterUnit] = {} if serials: roster_map = { r.id: r for r in db.query(RosterUnit).filter(RosterUnit.id.in_(serials)).all() } now = datetime.now(timezone.utc) call_ins: List[Dict[str, Any]] = [] for ev in events: serial = ev.get("serial") if not serial: continue roster = roster_map.get(serial) # created_at = when SFM received the forward. Falls back to the event # timestamp if the SFM payload didn't carry created_at (older rows). created_at_str = ev.get("created_at") or ev.get("timestamp") time_ago = "—" if created_at_str: try: ts = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) time_ago = _humanize_age((now - ts).total_seconds()) except ValueError: pass call_ins.append({ "unit_id": serial, "serial": serial, "event_id": ev.get("id"), "event_timestamp": ev.get("timestamp"), "created_at": ev.get("created_at"), "time_ago": time_ago, "peak_vector_sum": ev.get("peak_vector_sum"), "false_trigger": bool(ev.get("false_trigger")), "sensor_location": ev.get("sensor_location") or "", "project": ev.get("project") or "", "device_type": roster.device_type if roster else "seismograph", "in_roster": roster is not None, "note": (roster.note if roster else "") or "", }) return { "call_ins": call_ins, "total": len(call_ins), "source": "sfm-events", }