Files
terra-view/backend/routers/activity.py
serversdown 18fd0472a5 feat(dashboard): reorder top row, move schedule below map, source call-ins from SFM
- Top row left→right: Recent Alerts | Recent Call-Ins (2 cols) | Fleet Summary
- Today's Schedule becomes a horizontal collapsible card below Fleet Map.
  Collapsed by default; auto-expands when pending actions are detected in
  the rendered partial; manual toggle sticks via localStorage.
- New /api/recent-event-callins proxies SFM /db/events and bulk-joins each
  serial against RosterUnit for in-roster annotation. Phases the
  heartbeat-derived /api/recent-callins out of the UI while keeping it as
  a backup endpoint for now.
- Call-ins card renders a dense 2-column grid (last 10 events) showing
  PVS, sensor_location, false-trigger badge, event timestamp, and
  links to the unit page when rostered.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 22:58:25 +00:00

248 lines
8.7 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import desc
from pathlib import Path
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any
import os
import logging
import httpx
from backend.database import get_db
from backend.models import UnitHistory, Emitter, RosterUnit
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["activity"])
SFM_BASE_URL = os.getenv("SFM_BASE_URL", "http://localhost:8200")
def _humanize_age(seconds: float) -> str:
if seconds < 60:
return "just now"
if seconds < 3600:
return f"{int(seconds / 60)}m ago"
if seconds < 86400:
hrs = seconds / 3600
return f"{int(hrs)}h {int((hrs % 1) * 60)}m ago"
return f"{int(seconds / 86400)}d ago"
PHOTOS_BASE_DIR = Path("data/photos")
@router.get("/recent-activity")
def get_recent_activity(limit: int = 20, db: Session = Depends(get_db)):
"""
Get recent activity feed combining unit history changes and photo uploads.
Returns a unified timeline of events sorted by timestamp (newest first).
"""
activities = []
# Get recent history entries
history_entries = db.query(UnitHistory)\
.order_by(desc(UnitHistory.changed_at))\
.limit(limit * 2)\
.all() # Get more than needed to mix with photos
for entry in history_entries:
activity = {
"type": "history",
"timestamp": entry.changed_at.isoformat(),
"timestamp_unix": entry.changed_at.timestamp(),
"unit_id": entry.unit_id,
"change_type": entry.change_type,
"field_name": entry.field_name,
"old_value": entry.old_value,
"new_value": entry.new_value,
"source": entry.source,
"notes": entry.notes
}
activities.append(activity)
# Get recent photos
if PHOTOS_BASE_DIR.exists():
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp"}
photo_activities = []
for unit_dir in PHOTOS_BASE_DIR.iterdir():
if not unit_dir.is_dir():
continue
unit_id = unit_dir.name
for file_path in unit_dir.iterdir():
if file_path.is_file() and file_path.suffix.lower() in image_extensions:
modified_time = file_path.stat().st_mtime
photo_activities.append({
"type": "photo",
"timestamp": datetime.fromtimestamp(modified_time).isoformat(),
"timestamp_unix": modified_time,
"unit_id": unit_id,
"filename": file_path.name,
"photo_url": f"/api/unit/{unit_id}/photo/{file_path.name}"
})
activities.extend(photo_activities)
# Sort all activities by timestamp (newest first)
activities.sort(key=lambda x: x["timestamp_unix"], reverse=True)
# Limit to requested number
activities = activities[:limit]
return {
"activities": activities,
"total": len(activities)
}
@router.get("/recent-callins")
def get_recent_callins(hours: int = 6, limit: int = None, db: Session = Depends(get_db)):
"""
Get recent unit call-ins (units that have reported recently).
Returns units sorted by most recent last_seen timestamp.
Args:
hours: Look back this many hours (default: 6)
limit: Maximum number of results (default: None = all)
"""
# Calculate the time threshold
time_threshold = datetime.now(timezone.utc) - timedelta(hours=hours)
# Query emitters with recent activity, joined with roster info
recent_emitters = db.query(Emitter)\
.filter(Emitter.last_seen >= time_threshold)\
.order_by(desc(Emitter.last_seen))\
.all()
# Get roster info for all units
roster_dict = {r.id: r for r in db.query(RosterUnit).all()}
call_ins = []
for emitter in recent_emitters:
roster_unit = roster_dict.get(emitter.id)
# Calculate time since last seen
last_seen_utc = emitter.last_seen.replace(tzinfo=timezone.utc) if emitter.last_seen.tzinfo is None else emitter.last_seen
time_diff = datetime.now(timezone.utc) - last_seen_utc
# Format time ago
if time_diff.total_seconds() < 60:
time_ago = "just now"
elif time_diff.total_seconds() < 3600:
minutes = int(time_diff.total_seconds() / 60)
time_ago = f"{minutes}m ago"
else:
hours_ago = time_diff.total_seconds() / 3600
if hours_ago < 24:
time_ago = f"{int(hours_ago)}h {int((hours_ago % 1) * 60)}m ago"
else:
days = int(hours_ago / 24)
time_ago = f"{days}d ago"
call_in = {
"unit_id": emitter.id,
"last_seen": emitter.last_seen.isoformat(),
"time_ago": time_ago,
"status": emitter.status,
"device_type": roster_unit.device_type if roster_unit else "seismograph",
"deployed": roster_unit.deployed if roster_unit else False,
"note": roster_unit.note if roster_unit and roster_unit.note else "",
"location": roster_unit.address if roster_unit and roster_unit.address else (roster_unit.location if roster_unit else "")
}
call_ins.append(call_in)
# Apply limit if specified
if limit:
call_ins = call_ins[:limit]
return {
"call_ins": call_ins,
"total": len(call_ins),
"hours": hours,
"time_threshold": time_threshold.isoformat()
}
@router.get("/recent-event-callins")
async def get_recent_event_callins(limit: int = 10, db: Session = Depends(get_db)):
"""
Recent unit call-ins derived from SFM event forwards.
Architecture context: the live ACH replacement is on hold, so call-homes
arrive as Blastware ACH event files forwarded by series3-watcher and
landed in the SFM events store. One event ≈ one call-in. This is the
forward-looking source of "recent call-ins" that will eventually replace
the heartbeat-based /recent-callins endpoint entirely.
Each row represents one event; multiple consecutive events from the same
serial are intentionally NOT collapsed — each one is a distinct call-home.
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{SFM_BASE_URL}/db/events",
params={"limit": limit},
)
resp.raise_for_status()
payload = resp.json()
except httpx.HTTPError as e:
log.warning("SFM /db/events failed for recent-event-callins: %s", e)
return {"call_ins": [], "total": 0, "error": str(e)}
events = payload.get("events", []) or []
# Bulk-resolve serials → roster (single query, no N+1)
serials = list({ev.get("serial") for ev in events if ev.get("serial")})
roster_map: Dict[str, RosterUnit] = {}
if serials:
roster_map = {
r.id: r
for r in db.query(RosterUnit).filter(RosterUnit.id.in_(serials)).all()
}
now = datetime.now(timezone.utc)
call_ins: List[Dict[str, Any]] = []
for ev in events:
serial = ev.get("serial")
if not serial:
continue
roster = roster_map.get(serial)
# created_at = when SFM received the forward. Falls back to the event
# timestamp if the SFM payload didn't carry created_at (older rows).
created_at_str = ev.get("created_at") or ev.get("timestamp")
time_ago = ""
if created_at_str:
try:
ts = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
time_ago = _humanize_age((now - ts).total_seconds())
except ValueError:
pass
call_ins.append({
"unit_id": serial,
"serial": serial,
"event_id": ev.get("id"),
"event_timestamp": ev.get("timestamp"),
"created_at": ev.get("created_at"),
"time_ago": time_ago,
"peak_vector_sum": ev.get("peak_vector_sum"),
"false_trigger": bool(ev.get("false_trigger")),
"sensor_location": ev.get("sensor_location") or "",
"project": ev.get("project") or "",
"device_type": roster.device_type if roster else "seismograph",
"in_roster": roster is not None,
"note": (roster.note if roster else "") or "",
})
return {
"call_ins": call_ins,
"total": len(call_ins),
"source": "sfm-events",
}