cc57a8e618
Previous query_units() only joined on ach_sessions, which is created
exclusively by the live ACH server. The BW-importer path
(/db/import/blastware_file → WaveformStore.save_imported_bw →
SeismoDb.insert_events) populates `events` but never creates an
ach_sessions row. Consequence: every serial whose events flowed in
through the series3-watcher forwarder was invisible to
/db/units (and therefore to the SFM webapp's fleet overview / units
list), even though the events were correctly populated in the
events table with proper serial attribution.
Rewrite query_units() to aggregate from BOTH tables and union the
serials:
- total_events / last_event_at come from `events` (every ingest path)
- last_session_at / total_monitor_entries / total_sessions
come from `ach_sessions` (ACH-only),
0 when no sessions exist for the serial
- last_seen = max(last_event_at, last_session_at)
Verified on the user's actual prod DB after the
repair_unknown_serials run: /db/units now returns 24 serials instead
of 2. All 3,257 watcher-forwarded events become visible in the
fleet overview without any further DB surgery.
643 lines
27 KiB
Python
643 lines
27 KiB
Python
"""
|
|
sfm/database.py — SQLite persistence layer for seismo-relay.
|
|
|
|
Three tables, all keyed by unit serial number:
|
|
|
|
ach_sessions — one row per inbound ACH call-home
|
|
events — one row per triggered waveform event (deduped by serial+timestamp)
|
|
monitor_log — one row per monitoring interval (deduped by serial+start_time)
|
|
|
|
The DB file lives at:
|
|
<output_dir>/seismo_relay.db (default: bridges/captures/seismo_relay.db)
|
|
|
|
Usage
|
|
-----
|
|
from sfm.database import SeismoDb
|
|
|
|
db = SeismoDb("bridges/captures/seismo_relay.db")
|
|
|
|
# Write a call-home session
|
|
session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920",
|
|
events_downloaded=3, monitor_entries=2,
|
|
duration_seconds=47.3)
|
|
|
|
# Write events (silently skips duplicates)
|
|
db.insert_events(events, serial="BE11529", session_id=session_id)
|
|
|
|
# Write monitor log entries
|
|
db.insert_monitor_log(entries, session_id=session_id)
|
|
|
|
# Query
|
|
rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...))
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
import sqlite3
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from minimateplus.models import Event, MonitorLogEntry
|
|
|
|
log = logging.getLogger("sfm.database")
|
|
|
|
# ── Schema ─────────────────────────────────────────────────────────────────────
|
|
|
|
_SCHEMA = """
|
|
PRAGMA journal_mode = WAL;
|
|
PRAGMA foreign_keys = ON;
|
|
|
|
CREATE TABLE IF NOT EXISTS ach_sessions (
|
|
id TEXT PRIMARY KEY, -- UUID
|
|
serial TEXT NOT NULL,
|
|
session_time TEXT NOT NULL, -- ISO-8601 UTC
|
|
peer TEXT, -- "ip:port"
|
|
events_downloaded INTEGER NOT NULL DEFAULT 0,
|
|
monitor_entries INTEGER NOT NULL DEFAULT 0,
|
|
duration_seconds REAL,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial);
|
|
CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time);
|
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id TEXT PRIMARY KEY, -- UUID
|
|
serial TEXT NOT NULL,
|
|
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
|
|
session_id TEXT, -- FK → ach_sessions.id
|
|
timestamp TEXT, -- ISO-8601 local time from device
|
|
tran_ppv REAL, -- in/s
|
|
vert_ppv REAL, -- in/s
|
|
long_ppv REAL, -- in/s
|
|
peak_vector_sum REAL, -- in/s
|
|
mic_ppv REAL, -- psi or dB depending on setup
|
|
project TEXT,
|
|
client TEXT,
|
|
operator TEXT,
|
|
sensor_location TEXT,
|
|
sample_rate INTEGER,
|
|
record_type TEXT, -- "single_shot" | "continuous"
|
|
false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag)
|
|
blastware_filename TEXT, -- event file within waveform store; extension is per-event (AB0T encodes timestamp)
|
|
blastware_filesize INTEGER, -- bytes; NULL if no event file saved
|
|
a5_pickle_filename TEXT, -- "<filename>.a5.pkl" sidecar
|
|
sidecar_filename TEXT, -- "<filename>.sfm.json" review/metadata sidecar
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
|
UNIQUE(serial, timestamp)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
|
|
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS monitor_log (
|
|
id TEXT PRIMARY KEY, -- UUID
|
|
serial TEXT NOT NULL,
|
|
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
|
|
session_id TEXT, -- FK → ach_sessions.id
|
|
start_time TEXT, -- ISO-8601
|
|
stop_time TEXT, -- ISO-8601
|
|
duration_seconds REAL,
|
|
geo_threshold_ips REAL, -- in/s
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
|
UNIQUE(serial, start_time)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
|
|
"""
|
|
|
|
|
|
# ── SeismoDb class ─────────────────────────────────────────────────────────────
|
|
|
|
class SeismoDb:
|
|
"""
|
|
Thin SQLite wrapper for seismo-relay persistence.
|
|
|
|
Thread-safe: each call opens, uses, and closes a connection with
|
|
check_same_thread=False and WAL mode enabled. For the ACH server's
|
|
single-writer / occasional-reader pattern this is more than sufficient.
|
|
"""
|
|
|
|
def __init__(self, db_path: str | Path) -> None:
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_schema()
|
|
log.info("SeismoDb initialised at %s", self.db_path)
|
|
|
|
# ── Internal helpers ───────────────────────────────────────────────────────
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode = WAL")
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
return conn
|
|
|
|
def _init_schema(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.executescript(_SCHEMA)
|
|
self._migrate(conn)
|
|
|
|
def _migrate(self, conn: sqlite3.Connection) -> None:
|
|
"""Apply in-place schema migrations for existing databases."""
|
|
|
|
# Migration 1: change events UNIQUE from (serial, waveform_key) [or any
|
|
# waveform_key-based variant] to (serial, timestamp).
|
|
# Rationale: device key counter resets to 01110000 after every erase, so
|
|
# waveform_key is not a stable dedup field across erase cycles. The event
|
|
# timestamp (from the device clock) is the correct natural key.
|
|
row = conn.execute(
|
|
"SELECT sql FROM sqlite_master WHERE type='table' AND name='events'"
|
|
).fetchone()
|
|
if row and "UNIQUE(serial, timestamp)" not in row[0]:
|
|
log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)")
|
|
conn.executescript("""
|
|
ALTER TABLE events RENAME TO events_old;
|
|
|
|
CREATE TABLE events (
|
|
id TEXT PRIMARY KEY,
|
|
serial TEXT NOT NULL,
|
|
waveform_key TEXT NOT NULL,
|
|
session_id TEXT,
|
|
timestamp TEXT,
|
|
tran_ppv REAL,
|
|
vert_ppv REAL,
|
|
long_ppv REAL,
|
|
peak_vector_sum REAL,
|
|
mic_ppv REAL,
|
|
project TEXT,
|
|
client TEXT,
|
|
operator TEXT,
|
|
sensor_location TEXT,
|
|
sample_rate INTEGER,
|
|
record_type TEXT,
|
|
false_trigger INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
|
UNIQUE(serial, timestamp)
|
|
);
|
|
|
|
INSERT OR IGNORE INTO events SELECT * FROM events_old;
|
|
DROP TABLE events_old;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
|
|
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
|
|
""")
|
|
log.info("_migrate: events table rebuilt OK")
|
|
|
|
# Migration 1b: add Blastware-file columns to existing events tables.
|
|
# New columns are NULLable so old rows just read NULL.
|
|
existing_cols = {
|
|
r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()
|
|
}
|
|
for col, ddl in (
|
|
("blastware_filename", "TEXT"),
|
|
("blastware_filesize", "INTEGER"),
|
|
("a5_pickle_filename", "TEXT"),
|
|
("sidecar_filename", "TEXT"),
|
|
):
|
|
if col not in existing_cols:
|
|
log.info("_migrate: events ADD COLUMN %s %s", col, ddl)
|
|
conn.execute(f"ALTER TABLE events ADD COLUMN {col} {ddl}")
|
|
|
|
# Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to
|
|
# (serial, start_time) — same reasoning as events.
|
|
row = conn.execute(
|
|
"SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'"
|
|
).fetchone()
|
|
if row and "UNIQUE(serial, start_time)" not in row[0]:
|
|
log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)")
|
|
conn.executescript("""
|
|
ALTER TABLE monitor_log RENAME TO monitor_log_old;
|
|
|
|
CREATE TABLE monitor_log (
|
|
id TEXT PRIMARY KEY,
|
|
serial TEXT NOT NULL,
|
|
waveform_key TEXT NOT NULL,
|
|
session_id TEXT,
|
|
start_time TEXT,
|
|
stop_time TEXT,
|
|
duration_seconds REAL,
|
|
geo_threshold_ips REAL,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
|
UNIQUE(serial, start_time)
|
|
);
|
|
|
|
INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old;
|
|
DROP TABLE monitor_log_old;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
|
|
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
|
|
""")
|
|
log.info("_migrate: monitor_log table rebuilt OK")
|
|
|
|
@staticmethod
|
|
def _iso(dt: Optional[datetime.datetime]) -> Optional[str]:
|
|
return dt.isoformat() if dt is not None else None
|
|
|
|
@staticmethod
|
|
def _new_id() -> str:
|
|
return str(uuid.uuid4())
|
|
|
|
# ── ACH sessions ──────────────────────────────────────────────────────────
|
|
|
|
def insert_ach_session(
|
|
self,
|
|
*,
|
|
serial: str,
|
|
peer: Optional[str] = None,
|
|
events_downloaded: int = 0,
|
|
monitor_entries: int = 0,
|
|
duration_seconds: Optional[float] = None,
|
|
session_time: Optional[datetime.datetime] = None,
|
|
) -> str:
|
|
"""Insert a new ACH session row. Returns the new session UUID."""
|
|
sid = self._new_id()
|
|
ts = self._iso(session_time or datetime.datetime.utcnow())
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO ach_sessions
|
|
(id, serial, session_time, peer,
|
|
events_downloaded, monitor_entries, duration_seconds)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(sid, serial, ts, peer,
|
|
events_downloaded, monitor_entries, duration_seconds),
|
|
)
|
|
log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d",
|
|
sid, serial, events_downloaded, monitor_entries)
|
|
return sid
|
|
|
|
def get_sessions(
|
|
self,
|
|
serial: Optional[str] = None,
|
|
limit: int = 50,
|
|
) -> list[dict]:
|
|
"""Return recent ACH sessions, newest first."""
|
|
with self._connect() as conn:
|
|
if serial:
|
|
rows = conn.execute(
|
|
"SELECT * FROM ach_sessions WHERE serial=? "
|
|
"ORDER BY session_time DESC LIMIT ?",
|
|
(serial, limit),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ── Events ────────────────────────────────────────────────────────────────
|
|
|
|
def insert_events(
|
|
self,
|
|
events: list[Event],
|
|
*,
|
|
serial: str,
|
|
session_id: Optional[str] = None,
|
|
waveform_records: Optional[dict[str, dict]] = None,
|
|
) -> tuple[int, int]:
|
|
"""
|
|
Insert triggered events. Silently skips duplicates (serial+timestamp).
|
|
Returns (inserted, skipped).
|
|
|
|
``waveform_records`` (optional): dict keyed by event waveform_key (hex)
|
|
whose value is a record from ``WaveformStore.save()``:
|
|
{"filename": str, "filesize": int, "a5_pickle_filename": str}
|
|
|
|
For events whose key is in this dict, the matching columns are
|
|
populated. If a row with the same (serial, timestamp) already exists
|
|
(dedup hit), the matching waveform record is upserted onto the
|
|
existing row so a re-download via the live endpoint refreshes the
|
|
file metadata.
|
|
"""
|
|
inserted = skipped = 0
|
|
wave_recs = waveform_records or {}
|
|
with self._connect() as conn:
|
|
for ev in events:
|
|
key = ev._waveform_key.hex() if ev._waveform_key else None
|
|
if key is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
ts = None
|
|
if ev.timestamp:
|
|
try:
|
|
ts = datetime.datetime(
|
|
ev.timestamp.year, ev.timestamp.month, ev.timestamp.day,
|
|
ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second,
|
|
).isoformat()
|
|
except Exception:
|
|
ts = str(ev.timestamp)
|
|
|
|
pv = ev.peak_values
|
|
pi = ev.project_info
|
|
rec = wave_recs.get(key) or {}
|
|
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO events
|
|
(id, serial, waveform_key, session_id, timestamp,
|
|
tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv,
|
|
project, client, operator, sensor_location,
|
|
sample_rate, record_type,
|
|
blastware_filename, blastware_filesize,
|
|
a5_pickle_filename, sidecar_filename)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
self._new_id(), serial, key, session_id, ts,
|
|
pv.tran if pv else None,
|
|
pv.vert if pv else None,
|
|
pv.long if pv else None,
|
|
pv.peak_vector_sum if pv else None,
|
|
pv.micl if pv else None,
|
|
pi.project if pi else None,
|
|
pi.client if pi else None,
|
|
pi.operator if pi else None,
|
|
pi.sensor_location if pi else None,
|
|
ev.sample_rate,
|
|
ev.record_type,
|
|
rec.get("filename"),
|
|
rec.get("filesize"),
|
|
rec.get("a5_pickle_filename"),
|
|
rec.get("sidecar_filename"),
|
|
),
|
|
)
|
|
inserted += 1
|
|
except sqlite3.IntegrityError:
|
|
skipped += 1
|
|
# Upsert waveform fields onto the existing dedup row so a
|
|
# re-download via the live endpoint refreshes filename /
|
|
# size / sidecar without churning the rest of the row.
|
|
if rec and ts:
|
|
conn.execute(
|
|
"""
|
|
UPDATE events
|
|
SET blastware_filename = ?,
|
|
blastware_filesize = ?,
|
|
a5_pickle_filename = ?,
|
|
sidecar_filename = ?
|
|
WHERE serial = ? AND timestamp = ?
|
|
""",
|
|
(
|
|
rec.get("filename"),
|
|
rec.get("filesize"),
|
|
rec.get("a5_pickle_filename"),
|
|
rec.get("sidecar_filename"),
|
|
serial,
|
|
ts,
|
|
),
|
|
)
|
|
|
|
log.debug("insert_events serial=%s inserted=%d skipped=%d",
|
|
serial, inserted, skipped)
|
|
return inserted, skipped
|
|
|
|
def get_event(self, event_id: str) -> Optional[dict]:
|
|
"""Return one event row by id, or None."""
|
|
with self._connect() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM events WHERE id = ?", (event_id,),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def query_events(
|
|
self,
|
|
serial: Optional[str] = None,
|
|
from_dt: Optional[datetime.datetime] = None,
|
|
to_dt: Optional[datetime.datetime] = None,
|
|
false_trigger: Optional[bool] = None,
|
|
limit: int = 500,
|
|
offset: int = 0,
|
|
) -> list[dict]:
|
|
"""Query events with optional filters. Returns newest first."""
|
|
clauses: list[str] = []
|
|
params: list = []
|
|
|
|
if serial:
|
|
clauses.append("serial = ?")
|
|
params.append(serial)
|
|
if from_dt:
|
|
clauses.append("timestamp >= ?")
|
|
params.append(from_dt.isoformat())
|
|
if to_dt:
|
|
clauses.append("timestamp <= ?")
|
|
params.append(to_dt.isoformat())
|
|
if false_trigger is not None:
|
|
clauses.append("false_trigger = ?")
|
|
params.append(1 if false_trigger else 0)
|
|
|
|
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params += [limit, offset]
|
|
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
f"SELECT * FROM events {where} "
|
|
f"ORDER BY timestamp DESC LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def set_false_trigger(self, event_id: str, value: bool) -> bool:
|
|
"""Set or clear the false_trigger flag on an event. Returns True if found."""
|
|
with self._connect() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE events SET false_trigger=? WHERE id=?",
|
|
(1 if value else 0, event_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
def update_event_review(self, event_id: str, review: dict) -> bool:
|
|
"""
|
|
Sync derived index columns from a sidecar's `review` block.
|
|
|
|
Currently the only derived index is `events.false_trigger` — kept
|
|
in sync so `/db/events?false_trigger=true` queries don't have to
|
|
scan every sidecar JSON on disk. The sidecar JSON itself remains
|
|
the source of truth for the full review state.
|
|
|
|
Returns True when the row exists, False otherwise. No-op fields
|
|
(review without `false_trigger`) leave the column untouched.
|
|
"""
|
|
if not isinstance(review, dict):
|
|
return False
|
|
if "false_trigger" not in review:
|
|
# Nothing derived to update; just confirm the row exists.
|
|
with self._connect() as conn:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM events WHERE id=?", (event_id,),
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
flag = 1 if review.get("false_trigger") else 0
|
|
with self._connect() as conn:
|
|
cur = conn.execute(
|
|
"UPDATE events SET false_trigger=? WHERE id=?",
|
|
(flag, event_id),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
# ── Monitor log ───────────────────────────────────────────────────────────
|
|
|
|
def insert_monitor_log(
|
|
self,
|
|
entries: list[MonitorLogEntry],
|
|
*,
|
|
session_id: Optional[str] = None,
|
|
) -> tuple[int, int]:
|
|
"""
|
|
Insert monitor log entries. Silently skips duplicates (serial+start_time).
|
|
Returns (inserted, skipped).
|
|
"""
|
|
inserted = skipped = 0
|
|
with self._connect() as conn:
|
|
for e in entries:
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO monitor_log
|
|
(id, serial, waveform_key, session_id,
|
|
start_time, stop_time, duration_seconds,
|
|
geo_threshold_ips)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
self._new_id(),
|
|
e.serial or "",
|
|
e.key,
|
|
session_id,
|
|
self._iso(e.start_time),
|
|
self._iso(e.stop_time),
|
|
e.duration_seconds,
|
|
e.geo_threshold_ips,
|
|
),
|
|
)
|
|
inserted += 1
|
|
except sqlite3.IntegrityError:
|
|
skipped += 1
|
|
|
|
log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped)
|
|
return inserted, skipped
|
|
|
|
def query_monitor_log(
|
|
self,
|
|
serial: Optional[str] = None,
|
|
from_dt: Optional[datetime.datetime] = None,
|
|
to_dt: Optional[datetime.datetime] = None,
|
|
limit: int = 500,
|
|
offset: int = 0,
|
|
) -> list[dict]:
|
|
"""Query monitor log entries with optional filters. Returns newest first."""
|
|
clauses: list[str] = []
|
|
params: list = []
|
|
|
|
if serial:
|
|
clauses.append("serial = ?")
|
|
params.append(serial)
|
|
if from_dt:
|
|
clauses.append("start_time >= ?")
|
|
params.append(from_dt.isoformat())
|
|
if to_dt:
|
|
clauses.append("start_time <= ?")
|
|
params.append(to_dt.isoformat())
|
|
|
|
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params += [limit, offset]
|
|
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
f"SELECT * FROM monitor_log {where} "
|
|
f"ORDER BY start_time DESC LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ── Fleet overview ────────────────────────────────────────────────────────
|
|
|
|
def query_units(self) -> list[dict]:
|
|
"""
|
|
Return one row per known serial with summary stats.
|
|
|
|
Aggregates from BOTH source tables:
|
|
- `events` — populated by every ingest path
|
|
(live ACH, /db/import/blastware_file
|
|
from the series3-watcher forwarder, etc.)
|
|
- `ach_sessions` — only populated by the live ACH server;
|
|
empty for events that came in via the
|
|
BW-importer route.
|
|
|
|
Earlier this method only joined on `ach_sessions`, which made
|
|
watcher-forwarded units invisible to the SFM webapp's fleet
|
|
overview even though their events were correctly populated in
|
|
`events`. Now we union the two and surface every serial that
|
|
has activity in either table.
|
|
|
|
Fields:
|
|
serial — unit serial number (e.g. "BE11529")
|
|
last_seen — most recent of MAX(events.timestamp)
|
|
and MAX(ach_sessions.session_time)
|
|
total_events — COUNT(*) from `events` (the
|
|
authoritative count regardless of
|
|
ingest path)
|
|
total_monitor_entries — from `ach_sessions`, 0 when absent
|
|
total_sessions — COUNT(*) from `ach_sessions`, 0 when absent
|
|
"""
|
|
with self._connect() as conn:
|
|
event_stats = {
|
|
row["serial"]: row
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT serial,
|
|
MAX(timestamp) AS last_event_at,
|
|
COUNT(*) AS total_events
|
|
FROM events
|
|
GROUP BY serial
|
|
""",
|
|
).fetchall()
|
|
}
|
|
session_stats = {
|
|
row["serial"]: row
|
|
for row in conn.execute(
|
|
"""
|
|
SELECT serial,
|
|
MAX(session_time) AS last_session_at,
|
|
SUM(monitor_entries) AS total_monitor_entries,
|
|
COUNT(*) AS total_sessions
|
|
FROM ach_sessions
|
|
GROUP BY serial
|
|
""",
|
|
).fetchall()
|
|
}
|
|
|
|
all_serials = set(event_stats) | set(session_stats)
|
|
units = []
|
|
for serial in all_serials:
|
|
e = event_stats.get(serial)
|
|
s = session_stats.get(serial)
|
|
last_event_at = e["last_event_at"] if e else None
|
|
last_session_at = s["last_session_at"] if s else None
|
|
# Prefer whichever timestamp is more recent
|
|
last_seen = max(
|
|
(t for t in (last_event_at, last_session_at) if t),
|
|
default=None,
|
|
)
|
|
units.append({
|
|
"serial": serial,
|
|
"last_seen": last_seen,
|
|
"total_events": e["total_events"] if e else 0,
|
|
"total_monitor_entries": s["total_monitor_entries"] if s else 0,
|
|
"total_sessions": s["total_sessions"] if s else 0,
|
|
})
|
|
|
|
# Sort by last_seen desc; serials with no timestamp at all sink to the bottom.
|
|
units.sort(key=lambda u: u.get("last_seen") or "", reverse=True)
|
|
return units
|