2 Commits

Author SHA1 Message Date
claude 03d224ccc3 v0.11.0 — SQLite persistence layer (SeismoDb)
sfm/database.py (new)
- SeismoDb class: three tables keyed by unit serial number
  - ach_sessions: one row per ACH call-home
  - events: one row per triggered event, deduped by (serial, waveform_key)
  - monitor_log: one row per monitoring interval, deduped by (serial, waveform_key)
- WAL mode, per-request connections, silent dedup via UNIQUE constraint
- Query helpers: query_events(), query_monitor_log(), get_sessions(), query_units()
- false_trigger flag on events for future review UI / report filtering

bridges/ach_server.py
- Import SeismoDb; create shared instance at startup pointed at
  bridges/captures/seismo_relay.db
- After each call-home: insert_events() + insert_monitor_log() + insert_ach_session()
- DB failures logged as warnings, never abort the session

sfm/server.py
- Import SeismoDb; lazy singleton via _get_db()
- New DB read endpoints: GET /db/units, /db/events, /db/monitor_log, /db/sessions
- PATCH /db/events/{id}/false_trigger for manual review flagging

CLAUDE.md / CHANGELOG.md
- Document DB schema, SFM DB endpoints, architecture decision (unit-keyed only)
- Version bump to v0.11.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 00:45:38 -04:00
claude ef2c38e7db v0.10.0 — monitor log entry support (SUB 0x0A partial records)
Add full decode pipeline for 0x2C partial records from the device's event
list, representing continuous monitoring intervals where no threshold was
crossed.  These records appear interleaved with full triggered events in the
browse walk and were previously ignored.

minimateplus/models.py
- Add MonitorLogEntry dataclass: key, start_time, stop_time, serial,
  geo_threshold_ips, raw_header, duration_seconds property

minimateplus/protocol.py
- read_waveform_header() now returns (data_rsp.data, length) — full payload
  including the record-type byte at position 0 — instead of the sliced header.
  Callers that need the old slice use raw_data[11:11+length] as before.

minimateplus/client.py
- Add _decode_0a_partial_header(): auto-detects 9-byte (sub_code=0x10) vs
  10-byte (sub_code=0x03) timestamp format, handles 1-byte inter-timestamp
  gap, extracts serial via BE anchor and geo threshold via Geo: anchor.
- Add get_monitor_log_entries(skip_keys=None): browse walk (1E → 0A → 1F),
  decodes partial records, skips full records and already-seen keys.

minimateplus/__init__.py
- Export MonitorLogEntry

bridges/ach_server.py
- After get_events(), call get_monitor_log_entries(skip_keys=seen_keys) and
  save new entries to monitor_log.json in the session directory.
- Add _monitor_log_entry_to_dict() helper.
- Include monitor log keys in downloaded_keys for state persistence.

CLAUDE.md / CHANGELOG.md
- Document 0x2C partial record layout (timestamp format, ASCII metadata
  region, 1-byte gap edge case) confirmed from 4-11-26 MITM capture.
- Version bump to v0.10.0; update What's next.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 02:59:40 -04:00
9 changed files with 1132 additions and 19 deletions
+89
View File
@@ -4,6 +4,95 @@ All notable changes to seismo-relay are documented here.
---
## v0.11.0 — 2026-04-13
### Added
- **`sfm/database.py` — SeismoDb** — SQLite persistence layer for all ACH data.
Three tables, all unit-keyed by serial number:
- `ach_sessions` — one row per inbound call-home: serial, timestamp, peer IP,
events_downloaded, monitor_entries, duration_seconds
- `events` — one row per triggered waveform event: serial, waveform_key (dedup),
timestamp, Tran/Vert/Long/VectorSum/Mic PPV, project/client/operator/sensor_location
strings, sample_rate, record_type, false_trigger flag
- `monitor_log` — one row per monitoring interval: serial, waveform_key (dedup),
start_time, stop_time, duration_seconds, geo_threshold_ips
- WAL mode, per-request connections — safe for the single-writer / occasional-reader
ACH server pattern
- Deduplication by `(serial, waveform_key)` UNIQUE constraint — re-runs and repeat
call-homes never produce duplicate rows
- **`ach_server.py` — DB integration** — after each successful call-home, writes new
events and monitor log entries to `seismo_relay.db` then records the session in
`ach_sessions`. DB write failures are logged as warnings and do not abort the session.
- **`sfm/server.py` — DB read endpoints**:
- `GET /db/units` — distinct serials with last_seen, total_events, total_monitor_entries
- `GET /db/events` — query events with serial / date range / false_trigger filters
- `GET /db/monitor_log` — query monitoring intervals
- `GET /db/sessions` — query ACH call-home sessions
- `PATCH /db/events/{id}/false_trigger` — flag/unflag false triggers (for review UI)
### Architecture
- seismo-relay DB is unit-keyed only — no project concepts. Project aggregation is
terra-view's responsibility via `UnitAssignment` / `DeploymentRecord` + date range
queries against the SFM DB endpoints.
- DB file lives at `bridges/captures/seismo_relay.db` by default.
---
## v0.10.0 — 2026-04-11
### Added
- **`MiniMateClient.get_monitor_log_entries(skip_keys=None)`** — browse-mode walk
(`1E → 0A → 1F`) that collects partial records (`0x2C` record type) from the device's
event list without triggering a full waveform download (no 0C or 5A). Returns
`list[MonitorLogEntry]`. Each entry represents one continuous monitoring interval where
no threshold was exceeded.
- **`_decode_0a_partial_header(raw_data, index, key4)`** in `client.py` — decodes a SUB
0x0A response payload whose record type is `0x2C`. Extracts:
- `start_time` / `stop_time` — two consecutive timestamps; auto-detects 9-byte
(sub_code=0x10, single-shot) vs 10-byte (sub_code=0x03, continuous) format from
`raw_data[11]`. Handles a 1-byte gap between the two timestamps that occurs when
ts1 and ts2 share the same minute:second.
- `serial` — device serial string found via `b"BE"` anchor scan.
- `geo_threshold_ips` — trigger level found via `b"Geo: "` anchor scan.
- **`MonitorLogEntry` dataclass** in `models.py` — new model for partial records:
`index`, `key`, `start_time`, `stop_time`, `serial`, `geo_threshold_ips`,
`raw_header`, and a `duration_seconds` property.
- **`read_waveform_header()` return value extended** — now returns `(data_rsp.data, length)`
(full payload) instead of `(data_rsp.data[11:11+length], length)`. Callers get the
complete payload including the record-type byte at position 0. Full records use
`raw_data[11:11+length]` as before; partial records are detected by `raw_data[0] == 0x2C`.
- **ACH server: monitor log collection** — after `get_events()`, calls
`get_monitor_log_entries(skip_keys=seen_keys)` and saves new entries to
`monitor_log.json` in the session directory. Monitor log keys are included in
`downloaded_keys` for state persistence (no re-processing on next call-home).
- **`_monitor_log_entry_to_dict()`** in `ach_server.py` — serialises a `MonitorLogEntry`
to a JSON-compatible dict with ISO-format timestamps.
### Protocol / Documentation
- **SUB 0x0A partial record (0x2C) format confirmed** (✅ 4-11-26 MITM capture, 12 frames):
- Record type `0x2C` at `raw_data[0]`; length < 64 bytes.
- Two timestamps at `raw_data[11:]` — start and stop of the monitoring interval.
- ASCII metadata region after timestamps: `BE<serial>\x00Geo: <float> in/s`.
- Edge case: 1-byte separator between timestamps when ts1 and ts2 share minute:second.
- 10-byte timestamp format (sub_code=0x03) signalled by `raw_data[11] == 0x10`.
- **Key reuse detection for monitor log entries** — monitor log keys are tracked alongside
event keys in `ach_state.json` so the ACH server does not re-process them after a
call-home cycle.
---
## v0.9.0 — 2026-04-11
### Added
+146 -4
View File
@@ -2,7 +2,7 @@
Ground-up Python replacement for **Blastware**, Instantel's Windows-only software for
managing MiniMate Plus seismographs. Connects over direct RS-232 or cellular modem
(Sierra Wireless RV50 / RV55). Current version: **v0.9.0**.
(Sierra Wireless RV50 / RV55). Current version: **v0.10.0**.
---
@@ -25,9 +25,9 @@ CHANGELOG.md ← version history
---
## Current implementation state (v0.9.0)
## Current implementation state (v0.10.0)
Full read pipeline + write pipeline + erase pipeline working end-to-end over TCP/cellular:
Full read pipeline + write pipeline + erase pipeline + monitor log working end-to-end over TCP/cellular:
| Step | SUB | Status |
|---|---|---|
@@ -42,7 +42,8 @@ Full read pipeline + write pipeline + erase pipeline working end-to-end over TCP
| **Bulk waveform stream (event-time metadata)** | **5A** | ✅ new v0.6.0 |
| Event advance / next key | 1F | ✅ |
| **Write commands (push config to device)** | **6883** | ✅ new v0.8.0 |
| **Erase all events** | **0xA3 → 0x1C → 0x06 → 0xA2** | ✅ **new v0.9.0** |
| **Erase all events** | **0xA3 → 0x1C → 0x06 → 0xA2** | ✅ new v0.9.0 |
| **Monitor log entries (partial 0x2C records)** | **0A browse** | ✅ **new v0.10.0** |
`get_events()` sequence per event: `1E → 0A → 0C → 5A → 1F`
@@ -415,6 +416,8 @@ for 0x10 records).
## SFM REST API (sfm/server.py)
### Live device endpoints (connect to device per-request)
```
GET /device/info?port=COM5 ← serial
GET /device/info?host=1.2.3.4&tcp_port=9034 ← cellular
@@ -427,6 +430,19 @@ POST /device/monitor/stop?host=1.2.3.4&tcp_port=9034 ← stop recording
Server retries once on `ProtocolError` for TCP connections (handles cold-boot timing).
### DB read endpoints (query seismo_relay.db written by ach_server.py)
```
GET /db/units ← all known serials + summary stats
GET /db/events?serial=BE11529&from_dt=&to_dt=&limit= ← triggered events, newest first
GET /db/monitor_log?serial=BE11529&from_dt=&to_dt= ← monitoring intervals, newest first
GET /db/sessions?serial=BE11529&limit=50 ← ACH call-home sessions, newest first
PATCH /db/events/{id}/false_trigger?value=true ← flag/unflag false triggers
```
DB file: `bridges/captures/seismo_relay.db` (default; override with `--db-path` at startup).
All DB endpoints are read-only except `PATCH /db/events/{id}/false_trigger`.
---
## Key wire captures (reference material)
@@ -795,8 +811,134 @@ the erase). Used for post-erase detection.
---
## Monitor log entries — SUB 0x0A partial records (confirmed 2026-04-11)
Confirmed from 4-11-26 MITM capture: 12 partial records (record type `0x2C`) and 7 full
event records (record type `0x46`) across 19 total 0x0A responses.
### Record type detection
`read_waveform_header()` returns `(raw_data, length)` where `raw_data = data_rsp.data`
(the full payload including prefix bytes). The record type is at `raw_data[0]`:
| Value | Type | How to process |
|---|---|---|
| `0x46` | Full triggered event | Normal download: 0C → 5A → 1F |
| `0x2C` | Monitor log entry (partial) | No 0C/5A; decode inline from 0A payload |
Length heuristic: `length < 0x40` (64) reliably identifies partial records across all
observed captures. Both checks (`raw_data[0] == 0x2C` and `length < 0x40`) are used.
### SUB 0x0A partial record (0x2C) payload layout
All offsets are from `raw_data` (the full `data_rsp.data` array including the 11-byte
prefix before the actual header bytes start).
```
raw_data[0] = 0x2C ← record type (partial / monitor log)
raw_data[1:11] = prefix bytes (vary; contain key4 copy, flags, length)
raw_data[11:] = timestamp and ASCII metadata payload
```
**Timestamp auto-detection** (confirmed from 4-11-26 capture):
```
raw_data[11] == 0x10 → 10-byte sub_code=0x03 format (continuous mode)
raw_data[11] != 0x10 → 9-byte sub_code=0x10 format (single-shot mode)
```
**9-byte timestamp format (sub_code=0x10):**
| Byte | Field |
|---|---|
| 0 | day |
| 1 | `0x10` (sub_code marker) |
| 2 | month |
| 34 | year (uint16 BE) |
| 5 | unknown (0x00) |
| 6 | hour |
| 7 | minute |
| 8 | second |
**10-byte timestamp format (sub_code=0x03):**
| Byte | Field |
|---|---|
| 0 | `0x10` (marker) |
| 1 | day |
| 2 | `0x10` (marker) |
| 3 | month |
| 45 | year (uint16 BE) |
| 6 | unknown (0x00) |
| 7 | hour |
| 8 | minute |
| 9 | second |
**Two timestamps:** Each partial record contains two timestamps — `start_time` and
`stop_time` — stored consecutively:
- `ts1` (start) at `raw_data[ts_offset : ts_offset + ts_size]` where `ts_offset = 11`
- `ts2` (stop) at `raw_data[ts1_end : ts1_end + ts_size]`
**Edge case — 1-byte gap between timestamps:** Occurs when ts1 and ts2 share the same
minute:second. If `try_ts(raw_data[ts1_end:])` fails, try `try_ts(raw_data[ts1_end+1:])`.
Confirmed in frames 121, 161, 165 of the 4-11-26 MITM capture. Frame 121 still shows 0s
duration (both decode to 16:02:00) — the extra byte appears in all same-second cases.
**ASCII metadata after timestamps:**
```
<separator bytes> BE<serial>\x00Geo: <float> in/s ...
```
- Serial: scan for `b"BE"`, read until `b"\x00"` (e.g. `"BE11529"`)
- Geo threshold: scan for `b"Geo: "`, read float until next space (e.g. `0.254` in/s)
A separator of variable length (45 bytes of `\x00` + flags) sits between the two
timestamps and the ASCII region. The `b"BE"` anchor scan is robust to separator length
variation.
### `_decode_0a_partial_header(raw_data, index, key4)` — client.py
Returns a `MonitorLogEntry` or `None`. Called by `get_monitor_log_entries()` for each
event key whose 0x0A response has `raw_data[0] == 0x2C` or `length < 0x40`.
### `MiniMateClient.get_monitor_log_entries(skip_keys=None)` — client.py
Browse-mode walk: `1E → 0A → check type → decode if partial → 1F`. No 0x0C or 5A reads
performed. Full (0x46) records are skipped without decoding. Returns `list[MonitorLogEntry]`.
`skip_keys` (optional `set[str]`): keys in this set are still advanced through the walk
(to avoid disrupting the iteration sequence), but no `MonitorLogEntry` is created for them.
### `MonitorLogEntry` model — models.py
```python
@dataclass
class MonitorLogEntry:
index: int # 0-based position
key: str # 8-hex event key
start_time: Optional[datetime.datetime] = None
stop_time: Optional[datetime.datetime] = None
serial: Optional[str] = None
geo_threshold_ips: Optional[float] = None
raw_header: Optional[bytes] = field(default=None, repr=False)
@property
def duration_seconds(self) -> Optional[float]: ...
```
### ACH server integration (v0.10.0)
After `get_events()`, the ACH server calls `get_monitor_log_entries(skip_keys=seen_keys)`.
New entries are saved to `monitor_log.json` in the session directory. Monitor log keys are
included in `current_keys` for state persistence so they are not re-processed on the next
call-home.
---
## What's next
- **Database** — SQLite store for events + monitor log entries; dedup by key; queryable
- **Histograms** — decode histogram-mode A5 data (noise floor tracking)
- Compliance config encoder — build raw write payloads from a `ComplianceConfig` object
- Locate "Sensor Check" byte in compliance config (need capture with Disabled vs Before-monitoring)
- Modem manager — push RV50/RV55 configs via Sierra Wireless API
+82 -2
View File
@@ -67,7 +67,8 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.transport import SocketTransport
from minimateplus.client import MiniMateClient
from minimateplus.models import DeviceInfo, Event
from minimateplus.models import DeviceInfo, Event, MonitorLogEntry
from sfm.database import SeismoDb
log = logging.getLogger("ach_server")
@@ -136,6 +137,7 @@ class AchSession:
events_only: bool,
max_events: Optional[int],
state_path: Path,
db: "SeismoDb",
clear_after_download: bool = False,
) -> None:
self.sock = sock
@@ -145,6 +147,7 @@ class AchSession:
self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
self.db = db
self.clear_after_download = clear_after_download
def run(self) -> None:
@@ -372,6 +375,66 @@ class AchSession:
else:
log.info(" [OK] No new events since last call-home -- nothing to save")
# ── Monitor log entries (partial records / continuous monitoring) ──
# Browse walk (0A + 1F only) to collect monitor log entries for
# recording intervals where no threshold was crossed. This is a
# second 1E-based pass over the device's record list, separate from
# the get_events() download loop above.
log.info(" Collecting monitor log entries (browse walk)...")
new_monitor_entries: list[MonitorLogEntry] = []
try:
new_monitor_entries = client.get_monitor_log_entries(
skip_keys=seen_keys if seen_keys else None,
)
if new_monitor_entries:
_save_json(
session_dir / "monitor_log.json",
[_monitor_log_entry_to_dict(e) for e in new_monitor_entries],
)
log.info(
" [OK] %d new monitor log entry(s) saved",
len(new_monitor_entries),
)
for ml in new_monitor_entries:
log.info(
" MONLOG [%s] %s%s (%s)",
ml.key,
ml.start_time.isoformat() if ml.start_time else "?",
ml.stop_time.isoformat() if ml.stop_time else "?",
f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s",
)
else:
log.info(" [OK] No new monitor log entries")
except Exception as exc:
log.warning(
" [WARN] Monitor log collection failed: %s -- continuing",
exc,
)
# ── Persist to SQLite DB ─────────────────────────────────────
_session_start = datetime.datetime.now()
try:
_ev_ins, _ev_skip = self.db.insert_events(
new_events, serial=serial or self.peer, session_id=None
)
_ml_ins, _ml_skip = self.db.insert_monitor_log(
new_monitor_entries, session_id=None
)
_session_id = self.db.insert_ach_session(
serial=serial or self.peer,
peer=self.peer,
events_downloaded=_ev_ins,
monitor_entries=_ml_ins,
duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(),
session_time=_session_start,
)
log.info(
" [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)",
_session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip,
)
except Exception as exc:
log.warning(" [WARN] DB write failed: %s -- continuing", exc)
# ── Optional: erase device memory after successful download ────
erased_successfully = False
if self.clear_after_download and new_events:
@@ -387,11 +450,15 @@ class AchSession:
)
# ── Update persistent state ───────────────────────────────────
current_keys = [
# Include both triggered-event keys and monitor-log keys in the
# downloaded set so they are not re-processed on the next call-home.
current_event_keys = [
e._waveform_key.hex()
for e in all_events
if e._waveform_key is not None
]
current_monitor_keys = [e.key for e in new_monitor_entries]
current_keys = current_event_keys + current_monitor_keys
if erased_successfully:
# Device memory is clear. Reset downloaded_keys and the
@@ -492,12 +559,24 @@ def _event_to_dict(e: Event) -> dict:
}
def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict:
return {
"key": e.key,
"start_time": e.start_time.isoformat() if e.start_time else None,
"stop_time": e.stop_time.isoformat() if e.stop_time else None,
"duration_seconds": e.duration_seconds,
"serial": e.serial,
"geo_threshold_ips": e.geo_threshold_ips,
}
# ── Main server loop ───────────────────────────────────────────────────────────
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
state_path = output_dir / "ach_state.json"
db = SeismoDb(output_dir / "seismo_relay.db")
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -550,6 +629,7 @@ def serve(args: argparse.Namespace) -> None:
events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
db=db,
clear_after_download=args.clear_after_download,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
+2 -2
View File
@@ -20,8 +20,8 @@ Typical usage (TCP / modem):
"""
from .client import MiniMateClient
from .models import DeviceInfo, Event
from .models import DeviceInfo, Event, MonitorLogEntry
from .transport import SerialTransport, TcpTransport
__version__ = "0.1.0"
__all__ = ["MiniMateClient", "DeviceInfo", "Event", "SerialTransport", "TcpTransport"]
__all__ = ["MiniMateClient", "DeviceInfo", "Event", "MonitorLogEntry", "SerialTransport", "TcpTransport"]
+209
View File
@@ -28,6 +28,7 @@ Example (TCP / modem):
from __future__ import annotations
import datetime
import logging
import struct
from typing import Optional
@@ -37,6 +38,7 @@ from .models import (
ComplianceConfig,
DeviceInfo,
Event,
MonitorLogEntry,
MonitorStatus,
PeakValues,
ProjectInfo,
@@ -300,6 +302,96 @@ class MiniMateClient:
log.info("list_event_keys: %d key(s): %s", len(keys), keys)
return keys
def get_monitor_log_entries(
self,
skip_keys: Optional[set] = None,
) -> list[MonitorLogEntry]:
"""
Collect all monitor log entries (partial records, type 0x2C) from the
device using the browse-mode 1E 0A 1F walk.
This is the fast path for monitor log data. No 0C or 5A commands are
issued all available monitor log information is in the 0x0A response
header alone.
Full triggered events (0x0A response type 0x46) are silently skipped.
Only partial records (type 0x2C) are returned as MonitorLogEntry objects.
Confirmed from 4-11-26 MITM capture: Blastware's ACH mode performs a
full browse walk (Phase 3: 0x0A + 1F × all records) AFTER the triggered-
event download phase. The partial records encountered in this walk are
the monitor log entries.
Args:
skip_keys: optional set of 8-hex key strings to skip (already seen).
Keys in this set still advance the walk (0A + 1F) but are
not decoded or returned.
Returns:
List of MonitorLogEntry objects in device storage order.
Raises:
ProtocolError: on unrecoverable communication failure.
"""
proto = self._require_proto()
try:
key4, data8 = proto.read_event_first()
except ProtocolError as exc:
log.warning("get_monitor_log_entries: 1E failed: %s -- returning []", exc)
return []
if data8[4:8] == b"\x00\x00\x00\x00":
log.info("get_monitor_log_entries: device is empty")
return []
entries: list[MonitorLogEntry] = []
idx = 0
while data8[4:8] != b"\x00\x00\x00\x00":
cur_key = key4
key_hex = cur_key.hex()
try:
raw_data, rec_len = proto.read_waveform_header(cur_key)
except ProtocolError as exc:
log.warning(
"get_monitor_log_entries: 0A failed for key=%s: %s -- stopping",
key_hex, exc,
)
break
# Only decode partial records (0x2C); full records (0x46) are silently skipped.
if rec_len < 0x40 and raw_data and (not skip_keys or key_hex not in skip_keys):
entry = _decode_0a_partial_header(raw_data, idx, cur_key)
if entry is not None:
entries.append(entry)
log.debug(
"get_monitor_log_entries: [%d] key=%s %s%s",
idx, key_hex, entry.start_time, entry.stop_time,
)
else:
log.debug(
"get_monitor_log_entries: [%d] key=%s type=0x%02X %s",
idx, key_hex, rec_len,
"skip (already seen)" if skip_keys and key_hex in skip_keys else "skip (full record)",
)
try:
key4, data8 = proto.advance_event(browse=True)
except ProtocolError as exc:
log.warning(
"get_monitor_log_entries: 1F failed after %d record(s): %s -- stopping",
idx, exc,
)
break
idx += 1
log.info(
"get_monitor_log_entries: walked %d record(s), found %d monitor log entry(s)",
idx, len(entries),
)
return entries
def delete_all_events(self) -> None:
"""
Erase all stored events from the device memory.
@@ -1856,6 +1948,123 @@ def _find_first_string(data: bytes, start: int, end: int, min_len: int) -> Optio
def _decode_0a_partial_header(raw_data: bytes, index: int, key4: bytes) -> Optional[MonitorLogEntry]:
"""
Decode a SUB 0x0A response for a partial (monitor log) record into a
MonitorLogEntry.
Called when read_waveform_header() returns rec_len < 0x40 (i.e. 0x2C = 44).
raw_data is the complete data_rsp.data from the protocol layer.
Layout of raw_data:
[0] = 0x2C (partial record type)
[1:5] = 0x00 × 4
[5:9] = event key (big-endian)
[9:11] = 0x00 × 2
[11:] = timestamp_start + timestamp_stop + sep + serial + geo_string
Timestamp format detection (auto):
raw_data[11] == 0x10 10-byte sub_code=0x03 continuous format
raw_data[12] == 0x10 9-byte sub_code=0x10 single-shot format
Both timestamps use the same format (detected from the first byte).
A 1-byte gap can appear between ts1 and ts2 for certain timestamps
(observed empirically when both timestamps share the same minute:second).
The parser handles this by trying ts2 immediately after ts1, then with
a 1-byte skip if that fails.
Returns:
MonitorLogEntry if decoding succeeds, None on error.
"""
if len(raw_data) < 20 or raw_data[0] != 0x2C:
return None
key_hex = key4.hex()
def try_ts9(b: bytes):
"""9-byte sub_code=0x10 format. Returns datetime or None."""
if len(b) < 9 or b[1] != 0x10:
return None
day = b[0]; month = b[2]; year = (b[3] << 8) | b[4]
hr = b[6]; mn = b[7]; sec = b[8]
if not (1 <= day <= 31 and 1 <= month <= 12 and 2000 <= year <= 2050
and hr <= 23 and mn <= 59 and sec <= 59):
return None
try:
return datetime.datetime(year, month, day, hr, mn, sec)
except ValueError:
return None
def try_ts10(b: bytes):
"""10-byte sub_code=0x03 format. Returns datetime or None."""
if len(b) < 10 or b[0] != 0x10 or b[2] != 0x10:
return None
day = b[1]; month = b[3]; year = (b[4] << 8) | b[5]
hr = b[7]; mn = b[8]; sec = b[9]
if not (1 <= day <= 31 and 1 <= month <= 12 and 2000 <= year <= 2050
and hr <= 23 and mn <= 59 and sec <= 59):
return None
try:
return datetime.datetime(year, month, day, hr, mn, sec)
except ValueError:
return None
ts_offset = 11
if len(raw_data) <= ts_offset:
return MonitorLogEntry(index=index, key=key_hex, raw_header=raw_data)
# Detect timestamp format.
if raw_data[ts_offset] == 0x10:
ts_size = 10
try_ts = try_ts10
else:
ts_size = 9
try_ts = try_ts9
# Parse ts1.
ts1 = try_ts(raw_data[ts_offset:ts_offset + ts_size])
ts1_end = ts_offset + ts_size
# Parse ts2 immediately after ts1, then with 1-byte skip if needed.
ts2 = try_ts(raw_data[ts1_end:ts1_end + ts_size])
if ts2 is None:
ts2 = try_ts(raw_data[ts1_end + 1:ts1_end + 1 + ts_size])
# Extract serial and geo threshold from "BE11529\0" and "Geo: X.XXX in/s\0".
serial: Optional[str] = None
geo_ips: Optional[float] = None
serial_pos = raw_data.find(b"BE")
if serial_pos >= 0:
# Read null-terminated serial starting at serial_pos.
null_pos = raw_data.find(b"\x00", serial_pos)
if null_pos > serial_pos:
serial = raw_data[serial_pos:null_pos].decode("ascii", errors="replace")
# Geo string follows the null byte.
geo_start = (null_pos + 1) if null_pos > serial_pos else serial_pos + 7
geo_bytes = raw_data[geo_start:]
# "Geo: X.XXX in/s\0" — extract float after "Geo: ".
geo_str_pos = geo_bytes.find(b"Geo: ")
if geo_str_pos >= 0:
geo_val_bytes = geo_bytes[geo_str_pos + 5:] # after "Geo: "
geo_val_end = geo_val_bytes.find(b" ") # before " in/s"
if geo_val_end > 0:
try:
geo_ips = float(geo_val_bytes[:geo_val_end].decode("ascii"))
except ValueError:
pass
return MonitorLogEntry(
index=index,
key=key_hex,
start_time=ts1,
stop_time=ts2,
serial=serial,
geo_threshold_ips=geo_ips,
raw_header=raw_data,
)
def _decode_monitor_status(data: bytes) -> MonitorStatus:
"""
Decode SUB 0x1C response payload into a MonitorStatus object.
+60
View File
@@ -14,6 +14,7 @@ Notes on certainty:
from __future__ import annotations
import datetime
import struct
from dataclasses import dataclass, field
from typing import Optional
@@ -419,6 +420,65 @@ class Event:
return f"Event#{self.index} {ts}{ppv}"
# ── MonitorLogEntry ───────────────────────────────────────────────────────────
@dataclass
class MonitorLogEntry:
"""
A monitor log entry decoded from a SUB 0x0A (WAVEFORM_HEADER) response
whose first byte is 0x2C (partial record, recording mode = continuous
monitoring without a triggered event).
These are the "partial bins" that Blastware stores between triggered events.
Each entry represents one monitoring interval the span of time during
which the unit was actively monitoring but no threshold crossing occurred.
Confirmed from 4-11-26 MITM capture analysis (2026-04-11):
Header layout (full response data[0:]):
data[0] = 0x2C (partial record type / data length in probe response)
data[1:5] = 0x00 × 4
data[5:9] = event key (4 bytes, big-endian hex)
data[9:11] = 0x00 × 2
data[11:] = timestamp_start (9 or 10 bytes depending on recording mode)
+ timestamp_stop (same format)
+ separator (45 bytes, variable)
+ serial null-terminated (e.g. "BE11529\\0")
+ "Geo: X.XXX in/s\\0" (trigger threshold string)
Timestamp format detection:
data[11] == 0x10 10-byte sub_code=0x03 (continuous) format
data[12] == 0x10 9-byte sub_code=0x10 (single-shot) format
In contrast to Event (triggered records, type 0x46), MonitorLogEntry
records do NOT have a waveform record (SUB 0x0C) or bulk waveform stream
(SUB 5A). All available metadata is in the 0x0A header alone.
"""
index: int # 0-based position in device record list
key: str # 8-hex event key (e.g. "01114290") ✅
start_time: Optional[datetime.datetime] = None # monitoring session start ✅
stop_time: Optional[datetime.datetime] = None # monitoring session stop ✅
serial: Optional[str] = None # device serial (e.g. "BE11529") ✅
geo_threshold_ips: Optional[float] = None # trigger level from "Geo: X.XXX in/s" ✅
# Raw bytes for debugging / future decoding
raw_header: Optional[bytes] = field(default=None, repr=False)
@property
def duration_seconds(self) -> Optional[float]:
"""Duration of monitoring interval in seconds, or None if times unavailable."""
if self.start_time and self.stop_time:
return (self.stop_time - self.start_time).total_seconds()
return None
def __str__(self) -> str:
start = self.start_time.isoformat() if self.start_time else "?"
stop = self.stop_time.isoformat() if self.stop_time else "?"
dur = f" ({self.duration_seconds:.0f}s)" if self.duration_seconds is not None else ""
return f"MonitorLog#{self.index} key={self.key} {start}{stop}{dur}"
# ── MonitorStatus ─────────────────────────────────────────────────────────────
@dataclass
+19 -11
View File
@@ -394,23 +394,32 @@ class MiniMateProtocol:
Send the SUB 0A (WAVEFORM_HEADER) two-step read for *key4*.
The data length for 0A is VARIABLE and must be read from the probe
response at data[4]. Two known values:
0x30 full histogram bin (has a waveform record to follow)
0x26 partial histogram bin (no waveform record)
response at data[4]. Two confirmed values:
0x46 (70) full triggered event (has 0C waveform record to follow)
0x2C (44) partial / monitor-log entry (no 0C record; 0A header only)
Args:
key4: 4-byte waveform record address from 1E or 1F.
Returns:
(header_bytes, record_length) where:
header_bytes raw data section starting at data[11]
record_length DATA_LENGTH read from probe (0x30 or 0x26)
(raw_data, record_length) where:
raw_data complete data_rsp.data bytes (full response payload)
record_length DATA_LENGTH read from probe (0x46 for full, 0x2C for partial)
The raw_data layout:
raw_data[0] = record type (0x46 = full triggered event, 0x2C = partial/monitor)
raw_data[1:5] = 0x00 × 4
raw_data[5:9] = event key (4 bytes)
raw_data[9:11] = 0x00 × 2
raw_data[11:] = timestamps + separator + serial + channel strings
(see MonitorLogEntry in models.py for full layout)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
Confirmed from 3-31-26 capture: 0A probe response data[4] carries
Confirmed from 4-11-26 MITM capture: 0A probe response data[4] carries
the variable length; data-request uses that length as the offset byte.
record_length == data[0] in virtually all cases (confirmed empirically).
"""
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_HEADER)
params = waveform_key_params(key4)
@@ -420,7 +429,7 @@ class MiniMateProtocol:
probe_rsp = self._recv_one(expected_sub=rsp_sub)
# Variable length — read from probe response data[4]
length = probe_rsp.data[4] if len(probe_rsp.data) > 4 else 0x30
length = probe_rsp.data[4] if len(probe_rsp.data) > 4 else 0x46
log.debug("read_waveform_header: 0A data request offset=0x%02X", length)
if length == 0:
@@ -429,12 +438,11 @@ class MiniMateProtocol:
self._send(build_bw_frame(SUB_WAVEFORM_HEADER, length, params))
data_rsp = self._recv_one(expected_sub=rsp_sub)
header_bytes = data_rsp.data[11:11 + length]
log.debug(
"read_waveform_header: key=%s length=0x%02X is_full=%s",
key4.hex(), length, length == 0x30,
key4.hex(), length, length >= 0x40,
)
return header_bytes, length
return data_rsp.data, length
def read_waveform_data_raw(self) -> bytes:
"""
+406
View File
@@ -0,0 +1,406 @@
"""
sfm/database.py SQLite persistence layer for seismo-relay.
Three tables, all keyed by unit serial number:
ach_sessions one row per inbound ACH call-home
events one row per triggered waveform event (deduped by serial+key)
monitor_log one row per monitoring interval (deduped by serial+key)
The DB file lives at:
<output_dir>/seismo_relay.db (default: bridges/captures/seismo_relay.db)
Usage
-----
from sfm.database import SeismoDb
db = SeismoDb("bridges/captures/seismo_relay.db")
# Write a call-home session
session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920",
events_downloaded=3, monitor_entries=2,
duration_seconds=47.3)
# Write events (silently skips duplicates)
db.insert_events(events, serial="BE11529", session_id=session_id)
# Write monitor log entries
db.insert_monitor_log(entries, session_id=session_id)
# Query
rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...))
"""
from __future__ import annotations
import datetime
import logging
import sqlite3
import uuid
from pathlib import Path
from typing import Optional
from minimateplus.models import Event, MonitorLogEntry
log = logging.getLogger("sfm.database")
# ── Schema ─────────────────────────────────────────────────────────────────────
_SCHEMA = """
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS ach_sessions (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
session_time TEXT NOT NULL, -- ISO-8601 UTC
peer TEXT, -- "ip:port"
events_downloaded INTEGER NOT NULL DEFAULT 0,
monitor_entries INTEGER NOT NULL DEFAULT 0,
duration_seconds REAL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial);
CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time);
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK ach_sessions.id
timestamp TEXT, -- ISO-8601 local time from device
tran_ppv REAL, -- in/s
vert_ppv REAL, -- in/s
long_ppv REAL, -- in/s
peak_vector_sum REAL, -- in/s
mic_ppv REAL, -- psi or dB depending on setup
project TEXT,
client TEXT,
operator TEXT,
sensor_location TEXT,
sample_rate INTEGER,
record_type TEXT, -- "single_shot" | "continuous"
false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag)
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, waveform_key)
);
CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id);
CREATE TABLE IF NOT EXISTS monitor_log (
id TEXT PRIMARY KEY, -- UUID
serial TEXT NOT NULL,
waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field)
session_id TEXT, -- FK ach_sessions.id
start_time TEXT, -- ISO-8601
stop_time TEXT, -- ISO-8601
duration_seconds REAL,
geo_threshold_ips REAL, -- in/s
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(serial, waveform_key)
);
CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial);
CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time);
CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id);
"""
# ── SeismoDb class ─────────────────────────────────────────────────────────────
class SeismoDb:
"""
Thin SQLite wrapper for seismo-relay persistence.
Thread-safe: each call opens, uses, and closes a connection with
check_same_thread=False and WAL mode enabled. For the ACH server's
single-writer / occasional-reader pattern this is more than sufficient.
"""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
log.info("SeismoDb initialised at %s", self.db_path)
# ── Internal helpers ───────────────────────────────────────────────────────
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _init_schema(self) -> None:
with self._connect() as conn:
conn.executescript(_SCHEMA)
@staticmethod
def _iso(dt: Optional[datetime.datetime]) -> Optional[str]:
return dt.isoformat() if dt is not None else None
@staticmethod
def _new_id() -> str:
return str(uuid.uuid4())
# ── ACH sessions ──────────────────────────────────────────────────────────
def insert_ach_session(
self,
*,
serial: str,
peer: Optional[str] = None,
events_downloaded: int = 0,
monitor_entries: int = 0,
duration_seconds: Optional[float] = None,
session_time: Optional[datetime.datetime] = None,
) -> str:
"""Insert a new ACH session row. Returns the new session UUID."""
sid = self._new_id()
ts = self._iso(session_time or datetime.datetime.utcnow())
with self._connect() as conn:
conn.execute(
"""
INSERT INTO ach_sessions
(id, serial, session_time, peer,
events_downloaded, monitor_entries, duration_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(sid, serial, ts, peer,
events_downloaded, monitor_entries, duration_seconds),
)
log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d",
sid, serial, events_downloaded, monitor_entries)
return sid
def get_sessions(
self,
serial: Optional[str] = None,
limit: int = 50,
) -> list[dict]:
"""Return recent ACH sessions, newest first."""
with self._connect() as conn:
if serial:
rows = conn.execute(
"SELECT * FROM ach_sessions WHERE serial=? "
"ORDER BY session_time DESC LIMIT ?",
(serial, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# ── Events ────────────────────────────────────────────────────────────────
def insert_events(
self,
events: list[Event],
*,
serial: str,
session_id: Optional[str] = None,
) -> tuple[int, int]:
"""
Insert triggered events. Silently skips duplicates (serial+waveform_key).
Returns (inserted, skipped).
"""
inserted = skipped = 0
with self._connect() as conn:
for ev in events:
key = ev._waveform_key.hex() if ev._waveform_key else None
if key is None:
skipped += 1
continue
ts = None
if ev.timestamp:
try:
ts = datetime.datetime(
ev.timestamp.year, ev.timestamp.month, ev.timestamp.day,
ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second,
).isoformat()
except Exception:
ts = str(ev.timestamp)
pv = ev.peak_values
pi = ev.project_info
try:
conn.execute(
"""
INSERT INTO events
(id, serial, waveform_key, session_id, timestamp,
tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv,
project, client, operator, sensor_location,
sample_rate, record_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(), serial, key, session_id, ts,
pv.tran if pv else None,
pv.vert if pv else None,
pv.long if pv else None,
pv.peak_vector_sum if pv else None,
pv.micl if pv else None,
pi.project if pi else None,
pi.client if pi else None,
pi.operator if pi else None,
pi.sensor_location if pi else None,
ev.sample_rate,
ev.record_type,
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
log.debug("insert_events serial=%s inserted=%d skipped=%d",
serial, inserted, skipped)
return inserted, skipped
def query_events(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
false_trigger: Optional[bool] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query events with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("timestamp >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("timestamp <= ?")
params.append(to_dt.isoformat())
if false_trigger is not None:
clauses.append("false_trigger = ?")
params.append(1 if false_trigger else 0)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM events {where} "
f"ORDER BY timestamp DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
def set_false_trigger(self, event_id: str, value: bool) -> bool:
"""Set or clear the false_trigger flag on an event. Returns True if found."""
with self._connect() as conn:
cur = conn.execute(
"UPDATE events SET false_trigger=? WHERE id=?",
(1 if value else 0, event_id),
)
return cur.rowcount > 0
# ── Monitor log ───────────────────────────────────────────────────────────
def insert_monitor_log(
self,
entries: list[MonitorLogEntry],
*,
session_id: Optional[str] = None,
) -> tuple[int, int]:
"""
Insert monitor log entries. Silently skips duplicates (serial+waveform_key).
Returns (inserted, skipped).
"""
inserted = skipped = 0
with self._connect() as conn:
for e in entries:
try:
conn.execute(
"""
INSERT INTO monitor_log
(id, serial, waveform_key, session_id,
start_time, stop_time, duration_seconds,
geo_threshold_ips)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
self._new_id(),
e.serial or "",
e.key,
session_id,
self._iso(e.start_time),
self._iso(e.stop_time),
e.duration_seconds,
e.geo_threshold_ips,
),
)
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped)
return inserted, skipped
def query_monitor_log(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
limit: int = 500,
offset: int = 0,
) -> list[dict]:
"""Query monitor log entries with optional filters. Returns newest first."""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("start_time >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("start_time <= ?")
params.append(to_dt.isoformat())
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
params += [limit, offset]
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM monitor_log {where} "
f"ORDER BY start_time DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
# ── Fleet overview ────────────────────────────────────────────────────────
def query_units(self) -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries.
"""
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
s.serial,
MAX(s.session_time) AS last_seen,
SUM(s.events_downloaded) AS total_events,
SUM(s.monitor_entries) AS total_monitor_entries,
COUNT(*) AS total_sessions
FROM ach_sessions s
GROUP BY s.serial
ORDER BY last_seen DESC
"""
).fetchall()
return [dict(r) for r in rows]
+119
View File
@@ -34,6 +34,7 @@ or:
from __future__ import annotations
import datetime
import logging
import sys
from pathlib import Path
@@ -58,6 +59,7 @@ from minimateplus import MiniMateClient
from minimateplus.protocol import ProtocolError
from minimateplus.models import ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from sfm.database import SeismoDb
logging.basicConfig(
level=logging.INFO,
@@ -88,6 +90,21 @@ app.add_middleware(
)
# ── DB ────────────────────────────────────────────────────────────────────────
# Shared SeismoDb instance. Path can be overridden by --db-path at startup,
# or defaults to bridges/captures/seismo_relay.db relative to the repo root.
_DEFAULT_DB_PATH = Path(__file__).parent.parent / "bridges" / "captures" / "seismo_relay.db"
_db: Optional[SeismoDb] = None
def _get_db() -> SeismoDb:
global _db
if _db is None:
_db = SeismoDb(_DEFAULT_DB_PATH)
return _db
# ── Serialisers ────────────────────────────────────────────────────────────────
# Plain dict helpers — avoids a Pydantic dependency in the library layer.
@@ -698,6 +715,108 @@ def device_monitor_stop(
return {"status": "stopped"}
# ── DB read endpoints ─────────────────────────────────────────────────────────
#
# These endpoints expose the seismo-relay SQLite DB written by ach_server.py.
# All queries are read-only. Terra-view calls these to build project event
# views, unit history panels, and (eventually) vibration summary reports.
@app.get("/db/units")
def db_units() -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries, total_sessions.
"""
return _get_db().query_units()
@app.get("/db/events")
def db_events(
serial: Optional[str] = Query(None, description="Filter by unit serial (e.g. BE11529)"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
false_trigger: Optional[bool] = Query(None, description="Filter by false_trigger flag"),
limit: int = Query(500, description="Max rows to return (default 500)"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query triggered events from the DB.
Returns events newest-first. All filter params are optional.
Example:
GET /db/events?serial=BE11529&from_dt=2026-04-01&limit=100
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_events(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=false_trigger,
limit=limit,
offset=offset,
)
return {"count": len(rows), "events": rows}
@app.patch("/db/events/{event_id}/false_trigger")
def db_set_false_trigger(
event_id: str,
value: bool = Query(..., description="True to flag as false trigger, False to clear"),
) -> dict:
"""
Set or clear the false_trigger flag on a single event.
Used by the terra-view event review UI.
Returns 404 if the event_id is not found.
"""
found = _get_db().set_false_trigger(event_id, value)
if not found:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
return {"status": "ok", "event_id": event_id, "false_trigger": value}
@app.get("/db/monitor_log")
def db_monitor_log(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
from_dt: Optional[str] = Query(None, description="ISO-8601 start datetime (inclusive)"),
to_dt: Optional[str] = Query(None, description="ISO-8601 end datetime (inclusive)"),
limit: int = Query(500, description="Max rows to return"),
offset: int = Query(0, description="Pagination offset"),
) -> dict:
"""
Query monitor log entries (continuous monitoring intervals) from the DB.
Returns entries newest-first.
"""
from_parsed = datetime.datetime.fromisoformat(from_dt) if from_dt else None
to_parsed = datetime.datetime.fromisoformat(to_dt) if to_dt else None
rows = _get_db().query_monitor_log(
serial=serial,
from_dt=from_parsed,
to_dt=to_parsed,
limit=limit,
offset=offset,
)
return {"count": len(rows), "entries": rows}
@app.get("/db/sessions")
def db_sessions(
serial: Optional[str] = Query(None, description="Filter by unit serial"),
limit: int = Query(50, description="Max rows to return"),
) -> dict:
"""
Query ACH call-home sessions from the DB, newest first.
"""
rows = _get_db().get_sessions(serial=serial, limit=limit)
return {"count": len(rows), "sessions": rows}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":