Files
seismo-relay/sfm/database.py
T
serversdown 6b2a44ff02 fix(import): overlay BW report onto Event + upsert DB row on re-import
Two compounding bugs caused forwarded events to land in the DB with
broken-codec peak values (~10 in/s saturation on every channel) and
no project info, even when the watcher correctly paired a BW ASCII
report with the binary.

Bug 1: save_imported_bw built the sidecar JSON with the report's
authoritative peak / project values via event_to_sidecar_dict(
bw_report=...), but never overlaid those onto the in-memory Event
that flows to db.insert_events().  So the DB row got peak_values
from read_blastware_file()._peaks_from_samples() — which runs the
still-undecoded waveform body codec assuming raw int16 LE and
produces ±32K-shaped noise (= ±10 in/s at Normal range) regardless
of the actual signal.  The sidecar JSON had the truth but the DB
columns (which the webapp queries for fast filter/sort) lied.

Bug 2: insert_events' IntegrityError handler only refreshed the
filename/filesize/a5_pickle/sidecar columns when a duplicate
(serial, timestamp) was seen.  Peak values, project info,
sample_rate, record_type stayed locked in at whatever the FIRST
insert wrote.  So even after Bug 1 was fixed, the historical
events in the DB (already inserted with broken-codec peaks) would
never get their values corrected, because a re-forward would just
hit IntegrityError and skip the field refresh.

Fix 1 (minimateplus/event_file_io.py + sfm/waveform_store.py):
  - New apply_report_to_event(event, report) helper folds the BW
    report's device-authoritative fields onto the Event in-place:
    per-channel PPV, peak vector sum, mic PSPL→psi, project /
    client / operator / sensor_location, sample_rate, record_time.
  - save_imported_bw() calls the helper right after parsing the
    report.  The Event that flows to insert_events() now carries
    correct values.

Fix 2 (sfm/database.py):
  - insert_events()'s IntegrityError UPDATE now refreshes every
    device-authoritative column from the new data: tran_ppv,
    vert_ppv, long_ppv, peak_vector_sum, mic_ppv, project, client,
    operator, sensor_location, sample_rate, record_type, plus
    the existing filename/filesize/a5_pickle/sidecar fields.
  - Preserves: id, waveform_key, session_id, created_at (immutable
    / FK fields), and false_trigger (operator review state).

End-to-end simulation verified:
  - Step 1: import without report → DB has ±10 in/s peaks, no project
  - Step 2: re-import WITH report → upsert path fires, DB now has
            device-authoritative 0.005 in/s peaks + sensor_location
  - Step 3: operator sets false_trigger=1, re-import again → flag
            preserved, peaks remain correct

For the user's situation: deleting the watcher state file forces a
re-forward of all events.  Each re-forward now pairs with its
_ASCII.TXT, applies the report onto the Event, and the upsert
refreshes the DB row.  No DB nuke needed.

Full SFM suite: 62 passed, 44 skipped.
2026-05-11 05:51:39 +00:00

679 lines
30 KiB
Python

"""
sfm/database.py — SQLite persistence layer for seismo-relay.
Three tables, all keyed by unit serial number:
ach_sessions — one row per inbound ACH call-home
events — one row per triggered waveform event (deduped by serial+timestamp)
monitor_log — one row per monitoring interval (deduped by serial+start_time)
The DB file lives at:
<output_dir>/seismo_relay.db (default: bridges/captures/seismo_relay.db)
Usage
-----
from sfm.database import SeismoDb
db = SeismoDb("bridges/captures/seismo_relay.db")
# Write a call-home session
session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920",
events_downloaded=3, monitor_entries=2,
duration_seconds=47.3)
# Write events (silently skips duplicates)
db.insert_events(events, serial="BE11529", session_id=session_id)
# Write monitor log entries
db.insert_monitor_log(entries, session_id=session_id)
# Query
rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...))
"""
from __future__ import annotations
import datetime
import logging
import sqlite3
import uuid
from pathlib import Path
from typing import Optional
from minimateplus.models import Event, MonitorLogEntry
log = logging.getLogger("sfm.database")
# ── Schema ─────────────────────────────────────────────────────────────────────
_SCHEMA = """
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS ach_sessions (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
session_time TEXT NOT NULL, -- ISO-8601 UTC
peer TEXT, -- "ip:port"
events_downloaded INTEGER NOT NULL DEFAULT 0,
monitor_entries INTEGER NOT NULL DEFAULT 0,
duration_seconds REAL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time);
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK → ach_sessions.id
timestamp TEXT, -- ISO-8601 local time from device
tran_ppv REAL, -- in/s
vert_ppv REAL, -- in/s
long_ppv REAL, -- in/s
peak_vector_sum REAL, -- in/s
mic_ppv REAL, -- psi or dB depending on setup
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT, -- "single_shot" | "continuous"
false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag)
blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp)
blastware_filesize INTEGER, -- bytes; NULL if no event file saved
a5_pickle_filename TEXT, -- "<filename>.a5.pkl" sidecar
sidecar_filename TEXT, -- "<filename>.sfm.json" review/metadata sidecar
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp)
);
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
CREATE TABLE IF NOT EXISTS monitor_log (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK → ach_sessions.id
start_time TEXT, -- ISO-8601
stop_time TEXT, -- ISO-8601
duration_seconds REAL,
geo_threshold_ips REAL, -- in/s
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, start_time)
);
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
"""
# ── SeismoDb class ─────────────────────────────────────────────────────────────
class SeismoDb:
"""
Thin SQLite wrapper for seismo-relay persistence.
Thread-safe: each call opens, uses, and closes a connection with
check_same_thread=False and WAL mode enabled. For the ACH server's
single-writer / occasional-reader pattern this is more than sufficient.
"""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
log.info("SeismoDb initialised at %s", self.db_path)
# ── Internal helpers ───────────────────────────────────────────────────────
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _init_schema(self) -> None:
with self._connect() as conn:
conn.executescript(_SCHEMA)
self._migrate(conn)
def _migrate(self, conn: sqlite3.Connection) -> None:
"""Apply in-place schema migrations for existing databases."""
# Migration 1: change events UNIQUE from (serial, waveform_key) [or any
# waveform_key-based variant] to (serial, timestamp).
# Rationale: device key counter resets to 01110000 after every erase, so
# waveform_key is not a stable dedup field across erase cycles. The event
# timestamp (from the device clock) is the correct natural key.
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='events'"
).fetchone()
if row and "UNIQUE(serial, timestamp)" not in row[0]:
log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)")
conn.executescript("""
ALTER TABLE events RENAME TO events_old;
CREATE TABLE events (
id TEXT PRIMARY KEY,
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL,
session_id TEXT,
timestamp TEXT,
tran_ppv REAL,
vert_ppv REAL,
long_ppv REAL,
peak_vector_sum REAL,
mic_ppv REAL,
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT,
false_trigger INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, timestamp)
);
INSERT OR IGNORE INTO events SELECT * FROM events_old;
DROP TABLE events_old;
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
""")
log.info("_migrate: events table rebuilt OK")
# Migration 1b: add Blastware-file columns to existing events tables.
# New columns are NULLable so old rows just read NULL.
existing_cols = {
r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()
}
for col, ddl in (
("blastware_filename", "TEXT"),
("blastware_filesize", "INTEGER"),
("a5_pickle_filename", "TEXT"),
("sidecar_filename", "TEXT"),
):
if col not in existing_cols:
log.info("_migrate: events ADD COLUMN %s %s", col, ddl)
conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}")
# Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to
# (serial, start_time) — same reasoning as events.
row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'"
).fetchone()
if row and "UNIQUE(serial, start_time)" not in row[0]:
log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)")
conn.executescript("""
ALTER TABLE monitor_log RENAME TO monitor_log_old;
CREATE TABLE monitor_log (
id TEXT PRIMARY KEY,
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL,
session_id TEXT,
start_time TEXT,
stop_time TEXT,
duration_seconds REAL,
geo_threshold_ips REAL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, start_time)
);
INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old;
DROP TABLE monitor_log_old;
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
""")
log.info("_migrate: monitor_log table rebuilt OK")
@staticmethod
def _iso(dt: Optional[datetime.datetime]) -> Optional[str]:
return dt.isoformat() if dt is not None else None
@staticmethod
def _new_id() -> str:
return str(uuid.uuid4())
# ── ACH sessions ──────────────────────────────────────────────────────────
def insert_ach_session(
self,
*,
serial: str,
peer: Optional[str] = None,
events_downloaded: int = 0,
monitor_entries: int = 0,
duration_seconds: Optional[float] = None,
session_time: Optional[datetime.datetime] = None,
) -> str:
"""Insert a new ACH session row. Returns the new session UUID."""
sid = self._new_id()
ts = self._iso(session_time or datetime.datetime.utcnow())
with self._connect() as conn:
conn.execute(
"""
INSERT INTO ach_sessions
(id, serial, session_time, peer,
events_downloaded, monitor_entries, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(sid, serial, ts, peer,
events_downloaded, monitor_entries, duration_seconds),
)
log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d",
sid, serial, events_downloaded, monitor_entries)
return sid
def get_sessions(
self,
serial: Optional[str] = None,
limit: int = 50,
) -> list[dict]:
"""Return recent ACH sessions, newest first."""
with self._connect() as conn:
if serial:
rows = conn.execute(
"SELECT * FROM ach_sessions WHERE serial=? "
"ORDER BY session_time DESC LIMIT ?",
(serial, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# ── Events ────────────────────────────────────────────────────────────────
def insert_events(
self,
events: list[Event],
*,
serial: str,
session_id: Optional[str] = None,
waveform_records: Optional[dict[str, dict]] = None,
) -> tuple[int, int]:
"""
Insert triggered events. Silently skips duplicates (serial+timestamp).
Returns (inserted, skipped).
``waveform_records`` (optional): dict keyed by event waveform_key (hex)
whose value is a record from ``WaveformStore.save()``:
{"filename": str, "filesize": int, "a5_pickle_filename": str}
For events whose key is in this dict, the matching columns are
populated. If a row with the same (serial, timestamp) already exists
(dedup hit), the matching waveform record is upserted onto the
existing row so a re-download via the live endpoint refreshes the
file metadata.
"""
inserted = skipped = 0
wave_recs = waveform_records or {}
with self._connect() as conn:
for ev in events:
key = ev._waveform_key.hex() if ev._waveform_key else None
if key is None:
skipped += 1
continue
ts = None
if ev.timestamp:
try:
ts = datetime.datetime(
ev.timestamp.year, ev.timestamp.month, ev.timestamp.day,
ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second,
).isoformat()
except Exception:
ts = str(ev.timestamp)
pv = ev.peak_values
pi = ev.project_info
rec = wave_recs.get(key) or {}
try:
conn.execute(
"""
INSERT INTO events
(id, serial, waveform_key, session_id, timestamp,
tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv,
project, client, operator, sensor_location,
sample_rate, record_type,
blastware_filename, blastware_filesize,
a5_pickle_filename, sidecar_filename)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(), serial, key, session_id, ts,
pv.tran if pv else None,
pv.vert if pv else None,
pv.long if pv else None,
pv.peak_vector_sum if pv else None,
pv.micl if pv else None,
pi.project if pi else None,
pi.client if pi else None,
pi.operator if pi else None,
pi.sensor_location if pi else None,
ev.sample_rate,
ev.record_type,
rec.get("filename"),
rec.get("filesize"),
rec.get("a5_pickle_filename"),
rec.get("sidecar_filename"),
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
# UPSERT path: a row for this (serial, timestamp) already
# exists. Refresh every device-authoritative field from
# the new data so that a re-import with better data (e.g.
# a watcher re-forward where the previous attempt missed
# the paired BW ASCII report) replaces stale peaks /
# project info / sample_rate.
#
# Preserved (not in this UPDATE):
# id, waveform_key, session_id, created_at — immutable / FK
# false_trigger — operator review state
#
# Behaviour change vs prior versions: this UPDATE used
# to only refresh filename / filesize / a5_pickle /
# sidecar fields. As a result, the first insert's
# broken-codec peak values were locked in forever even
# if subsequent re-forwards arrived with correct
# report-derived values. Now every re-import lifts the
# DB row up to whatever the latest Event carries.
conn.execute(
"""
UPDATE events
SET tran_ppv = ?,
vert_ppv = ?,
long_ppv = ?,
peak_vector_sum = ?,
mic_ppv = ?,
project = ?,
client = ?,
operator = ?,
sensor_location = ?,
sample_rate = ?,
record_type = ?,
blastware_filename = ?,
blastware_filesize = ?,
a5_pickle_filename = ?,
sidecar_filename = ?
WHERE serial = ? AND timestamp = ?
""",
(
pv.tran if pv else None,
pv.vert if pv else None,
pv.long if pv else None,
pv.peak_vector_sum if pv else None,
pv.micl if pv else None,
pi.project if pi else None,
pi.client if pi else None,
pi.operator if pi else None,
pi.sensor_location if pi else None,
ev.sample_rate,
ev.record_type,
rec.get("filename") if rec else None,
rec.get("filesize") if rec else None,
rec.get("a5_pickle_filename") if rec else None,
rec.get("sidecar_filename") if rec else None,
serial,
ts,
),
)
log.debug("insert_events serial=%s inserted=%d skipped=%d",
serial, inserted, skipped)
return inserted, skipped
def get_event(self, event_id: str) -> Optional[dict]:
"""Return one event row by id, or None."""
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM events WHERE id = ?", (event_id,),
).fetchone()
return dict(row) if row else None
def query_events(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query events with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("timestamp >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("timestamp <= ?")
params.append(to_dt.isoformat())
if false_trigger is not None:
clauses.append("false_trigger = ?")
params.append(1 if false_trigger else 0)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM events {where} "
f"ORDER BY timestamp DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
def set_false_trigger(self, event_id: str, value: bool) -> bool:
"""Set or clear the false_trigger flag on an event. Returns True if found."""
with self._connect() as conn:
cur = conn.execute(
"UPDATE events SET false_trigger=? WHERE id=?",
(1 if value else 0, event_id),
)
return cur.rowcount > 0
def update_event_review(self, event_id: str, review: dict) -> bool:
"""
Sync derived index columns from a sidecar's `review` block.
Currently the only derived index is `events.false_trigger` — kept
in sync so `/db/events?false_trigger=true` queries don't have to
scan every sidecar JSON on disk. The sidecar JSON itself remains
the source of truth for the full review state.
Returns True when the row exists, False otherwise. No-op fields
(review without `false_trigger`) leave the column untouched.
"""
if not isinstance(review, dict):
return False
if "false_trigger" not in review:
# Nothing derived to update; just confirm the row exists.
with self._connect() as conn:
row = conn.execute(
"SELECT 1 FROM events WHERE id=?", (event_id,),
).fetchone()
return row is not None
flag = 1 if review.get("false_trigger") else 0
with self._connect() as conn:
cur = conn.execute(
"UPDATE events SET false_trigger=? WHERE id=?",
(flag, event_id),
)
return cur.rowcount > 0
# ── Monitor log ───────────────────────────────────────────────────────────
def insert_monitor_log(
self,
entries: list[MonitorLogEntry],
*,
session_id: Optional[str] = None,
) -> tuple[int, int]:
"""
Insert monitor log entries. Silently skips duplicates (serial+start_time).
Returns (inserted, skipped).
"""
inserted = skipped = 0
with self._connect() as conn:
for e in entries:
try:
conn.execute(
"""
INSERT INTO monitor_log
(id, serial, waveform_key, session_id,
start_time, stop_time, duration_seconds,
geo_threshold_ips)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(),
e.serial or "",
e.key,
session_id,
self._iso(e.start_time),
self._iso(e.stop_time),
e.duration_seconds,
e.geo_threshold_ips,
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped)
return inserted, skipped
def query_monitor_log(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query monitor log entries with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("start_time >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("start_time <= ?")
params.append(to_dt.isoformat())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM monitor_log {where} "
f"ORDER BY start_time DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
# ── Fleet overview ────────────────────────────────────────────────────────
def query_units(self) -> list[dict]:
"""
Return one row per known serial with summary stats.
Aggregates from BOTH source tables:
- `events` — populated by every ingest path
(live ACH, /db/import/blastware_file
from the series3-watcher forwarder, etc.)
- `ach_sessions` — only populated by the live ACH server;
empty for events that came in via the
BW-importer route.
Earlier this method only joined on `ach_sessions`, which made
watcher-forwarded units invisible to the SFM webapp's fleet
overview even though their events were correctly populated in
`events`. Now we union the two and surface every serial that
has activity in either table.
Fields:
serial — unit serial number (e.g. "BE11529")
last_seen — most recent of MAX(events.timestamp)
and MAX(ach_sessions.session_time)
total_events — COUNT(*) from `events` (the
authoritative count regardless of
ingest path)
total_monitor_entries — from `ach_sessions`, 0 when absent
total_sessions — COUNT(*) from `ach_sessions`, 0 when absent
"""
with self._connect() as conn:
event_stats = {
row["serial"]: row
for row in conn.execute(
"""
SELECT serial,
MAX(timestamp) AS last_event_at,
COUNT(*) AS total_events
FROM events
GROUP BY serial
""",
).fetchall()
}
session_stats = {
row["serial"]: row
for row in conn.execute(
"""
SELECT serial,
MAX(session_time) AS last_session_at,
SUM(monitor_entries) AS total_monitor_entries,
COUNT(*) AS total_sessions
FROM ach_sessions
GROUP BY serial
""",
).fetchall()
}
all_serials = set(event_stats) | set(session_stats)
units = []
for serial in all_serials:
e = event_stats.get(serial)
s = session_stats.get(serial)
last_event_at = e["last_event_at"] if e else None
last_session_at = s["last_session_at"] if s else None
# Prefer whichever timestamp is more recent
last_seen = max(
(t for t in (last_event_at, last_session_at) if t),
default=None,
)
units.append({
"serial": serial,
"last_seen": last_seen,
"total_events": e["total_events"] if e else 0,
"total_monitor_entries": s["total_monitor_entries"] if s else 0,
"total_sessions": s["total_sessions"] if s else 0,
})
# Sort by last_seen desc; serials with no timestamp at all sink to the bottom.
units.sort(key=lambda u: u.get("last_seen") or "", reverse=True)
return units