diff --git a/.gitignore b/.gitignore index d6e4855..e85760d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,9 +25,4 @@ Thumbs.db # Analyzer outputs *.report -claude_export_*.md - -# Frame database -*.db -*.db-wal -*.db-shm +claude_exp \ No newline at end of file diff --git a/README.md b/README.md index 8c5fd6f..847dc37 100644 --- a/README.md +++ b/README.md @@ -1,268 +1,266 @@ -# seismo-relay `v0.12.0` - -A ground-up replacement for **Blastware** — Instantel's aging Windows-only -software for managing MiniMate Plus seismographs. - -Built in Python. Runs on Windows, Linux, or macOS. Connects to instruments -over direct RS-232 or cellular modem (Sierra Wireless RV50 / RV55). - -> **Status:** Active development. Full read + write + erase + monitoring -> pipeline working end-to-end over TCP/cellular. ACH Auto Call Home server -> handles inbound unit connections, downloads events, and persists everything -> to a SQLite database. SFM REST API exposes device control and DB queries. -> See [CHANGELOG.md](CHANGELOG.md) for full version history. - ---- - -## What's in here - -``` -seismo-relay/ -├── seismo_lab.py ← Main GUI (Bridge + Analyzer + Console tabs) -│ -├── minimateplus/ ← MiniMate Plus client library -│ ├── transport.py ← SerialTransport, TcpTransport, SocketTransport -│ ├── protocol.py ← DLE frame layer, SUB command dispatch -│ ├── client.py ← High-level client (connect, get_events, push_config, …) -│ ├── framing.py ← Frame builders, DLE codec, S3FrameParser -│ └── models.py ← DeviceInfo, Event, ComplianceConfig, MonitorLogEntry, … -│ -├── sfm/ ← SFM REST API server (FastAPI, port 8200) -│ ├── server.py ← All device + DB endpoints -│ ├── database.py ← SeismoDb — SQLite persistence layer -│ └── sfm_webapp.html ← Embedded web UI (served at /) -│ -├── bridges/ -│ ├── ach_server.py ← Inbound ACH call-home server (main production server) -│ ├── ach_mitm.py ← Transparent MITM proxy for capturing BW sessions -│ ├── s3-bridge/ ← RS-232 serial bridge (capture tool) -│ ├── tcp_serial_bridge.py ← Local TCP↔serial bridge (bench testing) -│ ├── gui_bridge.py ← Standalone bridge GUI -│ └── raw_capture.py ← Simple raw capture tool -│ -├── parsers/ -│ ├── s3_analyzer.py ← Session parser, differ, Claude export -│ ├── gui_analyzer.py ← Standalone analyzer GUI -│ └── frame_db.py ← SQLite frame database -│ -└── docs/ - └── instantel_protocol_reference.md ← Reverse-engineered protocol spec -``` - ---- - -## Quick start - -### ACH inbound server (production) - -Listens for inbound unit call-homes, downloads all new events and monitor log -entries, and writes everything to `bridges/captures/seismo_relay.db`. - -```bash -python bridges/ach_server.py --port 12345 --output bridges/captures/ -``` - -Point the unit's ACEmanager **Remote Host** to this machine's IP and **Remote Port** to `12345`. - -Options: -``` ---port N Listen port (default 12345) ---output DIR Capture directory (default bridges/captures/) ---allow-ip IP Allowlist an IP (repeat for multiple; default: accept all) ---max-events N Safety cap for first run (default: unlimited) ---clear-after-download Erase device memory after successful download ---verbose Debug logging -``` - -### SFM REST server - -Exposes device control and DB queries as a REST API. Proxied by terra-view. - -```bash -python sfm/server.py # default: 0.0.0.0:8200 -python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload -``` - -Open `http://localhost:8200` for the embedded web UI, or `http://localhost:8200/docs` -for the interactive API docs. - -### Seismo Lab GUI - -```bash -python seismo_lab.py -``` - ---- - -## SFM REST API - -### Live device endpoints - -Each call dials the device, does its work, and closes the connection. TCP -connections are retried once on `ProtocolError` to handle cold-boot timing. - -**Caching** — frequently-polled endpoints are cached in-process to avoid -redundant TCP round-trips: - -| Method | URL | Cache | -|--------|-----|-------| -| `GET` | `/device/info` | Indefinite; invalidated by `POST /device/config` | -| `GET` | `/device/events` | Count-probe fast path (~2s); full download only when new events detected | -| `GET` | `/device/event/{idx}/waveform` | Permanent per event index | -| `GET` | `/device/monitor/status` | 30-second TTL | -| `POST` | `/device/connect` | — | -| `POST` | `/device/config` | Writes compliance config; invalidates cache | -| `POST` | `/device/monitor/start` | Sends SUB 0x96 | -| `POST` | `/device/monitor/stop` | Sends SUB 0x97 | - -All cached endpoints accept `?force=true` to bypass the cache. - -Transport query params (supply one set): -``` -Serial: ?port=COM5&baud=38400 -TCP: ?host=1.2.3.4&tcp_port=12345 -``` - -### DB read endpoints - -Query the SQLite database written by `ach_server.py`. All read-only except -`PATCH /db/events/{id}/false_trigger`. - -| Method | URL | Description | -|--------|-----|-------------| -| `GET` | `/db/units` | All known serials with summary stats | -| `GET` | `/db/events` | Triggered events (filter by serial, date range, false_trigger) | -| `GET` | `/db/monitor_log` | Monitoring intervals | -| `GET` | `/db/sessions` | ACH call-home session history | -| `PATCH` | `/db/events/{id}/false_trigger?value=true` | Flag / unflag false triggers | - ---- - -## minimateplus library - -```python -from minimateplus import MiniMateClient -from minimateplus.transport import TcpTransport - -# Serial -client = MiniMateClient(port="COM5") - -# TCP (cellular modem) -client = MiniMateClient(transport=TcpTransport("1.2.3.4", 12345), timeout=30.0) - -with client: - # Read - info = client.connect() # DeviceInfo — serial, firmware, compliance config - count = client.count_events() # Number of stored events - keys = client.list_event_keys() # Fast browse walk — event keys only, no download - events = client.get_events() # Full download: headers + peaks + metadata - monitor = client.get_monitor_status() # Battery, memory, is_monitoring flag - log = client.get_monitor_log_entries() # Monitoring intervals (partial 0x2C records) - - # Write - client.apply_config( - sample_rate=1024, - trigger_level_geo=0.5, - project="Bridge Inspection 2026", - client_name="City of Portland", - operator="B. Harrison", - ) - - # Control - client.start_monitoring() # SUB 0x96 - client.stop_monitoring() # SUB 0x97 - client.delete_all_events() # Erase all (SUB 0xA3 → 0x1C → 0x06 → 0xA2) -``` - -`get_events()` runs the full per-event sequence: `1E → 0A → 0C → 5A → 1F`. -SUB 5A bulk stream provides `client`, `operator`, and `sensor_location` as they -existed at record time — not backfilled from the current compliance config. - ---- - -## Database - -`ach_server.py` writes to `bridges/captures/seismo_relay.db` (SQLite, WAL mode). -Three tables, all unit-keyed by serial number: - -| Table | Key | Contents | -|-------|-----|----------| -| `ach_sessions` | UUID | Per-call-home audit record: serial, peer IP, events_downloaded, duration | -| `events` | UUID, UNIQUE(serial, waveform_key) | Triggered events: timestamp, PPV per channel, project/client/operator strings, false_trigger flag | -| `monitor_log` | UUID, UNIQUE(serial, waveform_key) | Monitoring intervals: start/stop time, duration, geo threshold | - -Deduplication is by `(serial, waveform_key)` — repeat call-homes or re-runs -never produce duplicate rows. Post-erase key reuse is handled automatically -via the high-water mark in `ach_state.json`. - ---- - -## Connecting over cellular (RV50 / RV55) - -Field units connect via Sierra Wireless RV50 or RV55 cellular modems. - -### Required ACEmanager settings - -| Setting | Value | Why | -|---------|-------|-----| -| Configure Serial Port | `38400,8N1` | Must match MiniMate baud rate | -| Flow Control | `None` | Hardware FC blocks TX if pins unconnected | -| **Quiet Mode** | **Enable** | **Critical** — disabled injects `RING`/`CONNECT` onto serial, corrupting the S3 handshake | -| Data Forwarding Timeout | `1` (= 0.1 s) | Lower latency | -| TCP Connect Response Delay | `0` | Non-zero silently drops the first POLL frame | -| TCP Idle Timeout | `2` (minutes) | Prevents premature disconnect | -| DB9 Serial Echo | `Disable` | Echo corrupts the data stream | - ---- - -## Protocol quick-reference - -| Term | Value | Meaning | -|------|-------|---------| -| DLE | `0x10` | Data Link Escape | -| STX | `0x02` | Start of frame | -| ETX | `0x03` | End of frame | -| ACK | `0x41` | Frame-start marker sent before every BW frame | -| DLE stuffing | `10 10` on wire | Literal `0x10` in payload | - -**Response SUB rule:** `response_SUB = 0xFF - request_SUB` (no exceptions) - -Full protocol documentation: [`docs/instantel_protocol_reference.md`](docs/instantel_protocol_reference.md) - ---- - -## Requirements - -```bash -pip install pyserial fastapi uvicorn -``` - -Python 3.10+. Tkinter is included with the standard Python installer on -Windows (check "tcl/tk and IDLE" during install). - ---- - -## Virtual COM ports (bridge capture) - -``` -Blastware → COM4 (virtual) ↔ s3_bridge.py ↔ COM5 (physical) → MiniMate Plus -``` - -Use **com0com** or **VSPD** to create the virtual COM pair on Windows. - ---- - -## Roadmap - -- [x] Full read pipeline — device info, compliance config, event download with true event-time metadata -- [x] Write commands — push compliance config, trigger thresholds, project strings to device -- [x] Erase all events — confirmed erase sequence from live MITM capture -- [x] Monitor control — start/stop monitoring, read battery/memory/status -- [x] Monitor log entries — decode partial 0x2C records (continuous monitoring intervals) -- [x] ACH inbound server — accept call-home connections, download events, dedup by key -- [x] SQLite persistence — events, monitor log, and session history in `seismo_relay.db` -- [x] SFM REST API — device control + DB query endpoints, live device cache -- [ ] Terra-view integration — seismo-relay router, unit detail page, VISON-style event listing -- [ ] Vibration summary reports — highest legit PPV per project → Word doc (false trigger filtering first) -- [ ] Compliance config encoder — build raw write payloads from a `ComplianceConfig` object -- [ ] Modem manager — push RV50/RV55 configs via Sierra Wireless API +# seismo-relay `v0.12.0` + +A ground-up replacement for **Blastware** — Instantel's aging Windows-only +software for managing MiniMate Plus seismographs. + +Built in Python. Runs on Windows, Linux, or macOS. Connects to instruments +over direct RS-232 or cellular modem (Sierra Wireless RV50 / RV55). + +> **Status:** Active development. Full read + write + erase + monitoring +> pipeline working end-to-end over TCP/cellular. ACH Auto Call Home server +> handles inbound unit connections, downloads events, and persists everything +> to a SQLite database. SFM REST API exposes device control and DB queries. +> See [CHANGELOG.md](CHANGELOG.md) for full version history. + +--- + +## What's in here + +``` +seismo-relay/ +├── seismo_lab.py ← Main GUI (Bridge + Analyzer + Console tabs) +│ +├── minimateplus/ ← MiniMate Plus client library +│ ├── transport.py ← SerialTransport, TcpTransport, SocketTransport +│ ├── protocol.py ← DLE frame layer, SUB command dispatch +│ ├── client.py ← High-level client (connect, get_events, push_config, …) +│ ├── framing.py ← Frame builders, DLE codec, S3FrameParser +│ └── models.py ← DeviceInfo, Event, ComplianceConfig, MonitorLogEntry, … +│ +├── sfm/ ← SFM REST API server (FastAPI, port 8200) +│ ├── server.py ← All device + DB endpoints +│ ├── database.py ← SeismoDb — SQLite persistence layer +│ └── sfm_webapp.html ← Embedded web UI (served at /) +│ +├── bridges/ +│ ├── ach_server.py ← Inbound ACH call-home server (main production server) +│ ├── ach_mitm.py ← Transparent MITM proxy for capturing BW sessions +│ ├── s3-bridge/ ← RS-232 serial bridge (capture tool) +│ ├── tcp_serial_bridge.py ← Local TCP↔serial bridge (bench testing) +│ ├── gui_bridge.py ← Standalone bridge GUI +│ └── raw_capture.py ← Simple raw capture tool +│ +├── parsers/ +│ ├── s3_analyzer.py ← Session parser, differ, Claude export +│ ├── gui_analyzer.py ← Standalone analyzer GUI +│ └── frame_db.py ← SQLite frame database +│ +└── docs/ + └── instantel_protocol_reference.md ← Reverse-engineered protocol spec +``` + +--- + +## Quick start + +### ACH inbound server (production) + +Listens for inbound unit call-homes, downloads all new events and monitor log +entries, and writes everything to `bridges/captures/seismo_relay.db`. + +```bash +python bridges/ach_server.py --port 12345 --output bridges/captures/ +``` + +Point the unit's ACEmanager **Remote Host** to this machine's IP and **Remote Port** to `12345`. + +Options: +``` +--port N Listen port (default 12345) +--output DIR Capture directory (default bridges/captures/) +--allow-ip IP Allowlist an IP (repeat for multiple; default: accept all) +--max-events N Safety cap for first run (default: unlimited) +--clear-after-download Erase device memory after successful download +--verbose Debug logging +``` + +### SFM REST server + +Exposes device control and DB queries as a REST API. Proxied by terra-view. + +```bash +python sfm/server.py # default: 0.0.0.0:8200 +python -m uvicorn sfm.server:app --host 0.0.0.0 --port 8200 --reload +``` + +Open `http://localhost:8200` for the embedded web UI, or `http://localhost:8200/docs` +for the interactive API docs. + +### Seismo Lab GUI + +```bash +python seismo_lab.py +``` + +--- + +## SFM REST API + +### Live device endpoints + +Each call dials the device, does its work, and closes the connection. TCP +connections are retried once on `ProtocolError` to handle cold-boot timing. + +**Caching** — frequently-polled endpoints are cached in-process to avoid +redundant TCP round-trips: + +| Method | URL | Cache | +|--------|-----|-------| +| `GET` | `/device/info` | Indefinite; invalidated by `POST /device/config` | +| `GET` | `/device/events` | Count-probe fast path (~2s); full download only when new events detected | +| `GET` | `/device/event/{idx}/waveform` | Permanent per event index | +| `GET` | `/device/monitor/status` | 30-second TTL | +| `POST` | `/device/connect` | — | +| `POST` | `/device/config` | Writes compliance config; invalidates cache | +| `POST` | `/device/monitor/start` | Sends SUB 0x96 | +| `POST` | `/device/monitor/stop` | Sends SUB 0x97 | + +All cached endpoints accept `?force=true` to bypass the cache. + +Transport query params (supply one set): +``` +Serial: ?port=COM5&baud=38400 +TCP: ?host=1.2.3.4&tcp_port=12345 +``` + +### DB read endpoints + +Query the SQLite database written by `ach_server.py`. All read-only except +`PATCH /db/events/{id}/false_trigger`. + +| Method | URL | Description | +|--------|-----|-------------| +| `GET` | `/db/units` | All known serials with summary stats | +| `GET` | `/db/events` | Triggered events (filter by serial, date range, false_trigger) | +| `GET` | `/db/monitor_log` | Monitoring intervals | +| `GET` | `/db/sessions` | ACH call-home session history | +| `PATCH` | `/db/events/{id}/false_trigger?value=true` | Flag / unflag false triggers | + +--- + +## minimateplus library + +```python +from minimateplus import MiniMateClient +from minimateplus.transport import TcpTransport + +# Serial +client = MiniMateClient(port="COM5") + +# TCP (cellular modem) +client = MiniMateClient(transport=TcpTransport("1.2.3.4", 12345), timeout=30.0) + +with client: + # Read + info = client.connect() # DeviceInfo — serial, firmware, compliance config + count = client.count_events() # Number of stored events + keys = client.list_event_keys() # Fast browse walk — event keys only, no download + events = client.get_events() # Full download: headers + peaks + metadata + monitor = client.get_monitor_status() # Battery, memory, is_monitoring flag + log = client.get_monitor_log_entries() # Monitoring intervals (partial 0x2C records) + + # Write + client.apply_config( + sample_rate=1024, + trigger_level_geo=0.5, + project="Bridge Inspection 2026", + client_name="City of Portland", + operator="B. Harrison", + ) + + # Control + client.start_monitoring() # SUB 0x96 + client.stop_monitoring() # SUB 0x97 + client.delete_all_events() # Erase all (SUB 0xA3 → 0x1C → 0x06 → 0xA2) +``` + +`get_events()` runs the full per-event sequence: `1E → 0A → 0C → 5A → 1F`. +SUB 5A bulk stream provides `client`, `operator`, and `sensor_location` as they +existed at record time — not backfilled from the current compliance config. + +--- + +## Database + +`ach_server.py` writes to `bridges/captures/seismo_relay.db` (SQLite, WAL mode). +Three tables, all unit-keyed by serial number: + +| Table | Key | Contents | +|-------|-----|----------| +| `ach_sessions` | UUID | Per-call-home audit record: serial, peer IP, events_downloaded, duration | +| `events` | UUID, UNIQUE(serial, waveform_key) | Triggered events: timestamp, PPV per channel, project/client/operator strings, false_trigger flag | +| `monitor_log` | UUID, UNIQUE(serial, waveform_key) | Monitoring intervals: start/stop time, duration, geo threshold | + +Deduplication is by `(serial, waveform_key)` — repeat call-homes or re-runs +never produce duplicate rows. Post-erase key reuse is handled automatically +via the high-water mark in `ach_state.json`. + +--- + +## Connecting over cellular (RV50 / RV55) + +Field units connect via Sierra Wireless RV50 or RV55 cellular modems. + +### Required ACEmanager settings + +| Setting | Value | Why | +|---------|-------|-----| +| Configure Serial Port | `38400,8N1` | Must match MiniMate baud rate | +| Flow Control | `None` | Hardware FC blocks TX if pins unconnected | +| **Quiet Mode** | **Enable** | **Critical** — disabled injects `RING`/`CONNECT` onto serial, corrupting the S3 handshake | +| Data Forwarding Timeout | `1` (= 0.1 s) | Lower latency | +| TCP Connect Response Delay | `0` | Non-zero silently drops the first POLL frame | +| TCP Idle Timeout | `2` (minutes) | Prevents premature disconnect | +| DB9 Serial Echo | `Disable` | Echo corrupts the data stream | + +--- + +## Protocol quick-reference + +| Term | Value | Meaning | +|------|-------|---------| +| DLE | `0x10` | Data Link Escape | +| STX | `0x02` | Start of frame | +| ETX | `0x03` | End of frame | +| ACK | `0x41` | Frame-start marker sent before every BW frame | +| DLE stuffing | `10 10` on wire | Literal `0x10` in payload | + +**Response SUB rule:** `response_SUB = 0xFF - request_SUB` (no exceptions) + +Full protocol documentation: [`docs/instantel_protocol_reference.md`](docs/instantel_protocol_reference.md) + +--- + +## Requirements + +```bash +pip install pyserial fastapi uvicorn +``` + +Python 3.10+. Tkinter is included with the standard Python installer on +Windows (check "tcl/tk and IDLE" during install). + +--- + +## Virtual COM ports (bridge capture) + +``` +Blastware → COM4 (virtual) ↔ s3_bridge.py ↔ COM5 (physical) → MiniMate Plus +``` + +Use **com0com** or **VSPD** to create the virtual COM pair on Windows. + +--- + +## Roadmap + +- [x] Full read pipeline — device info, compliance config, event download with true event-time metadata +- [x] Write commands — push compliance config, trigger thresholds, project strings to device +- [x] Erase all events — confirmed erase sequence from live MITM capture +- [x] Monitor control — start/stop monitoring, read battery/memory/status +- [x] Monitor log entries — decode partial 0x2C records (continuous monitoring intervals) +- [x] ACH inbound server — accept call-home connections, download events, dedup by key +- [x] SQLite persistence — events, monitor log, and session history in `seismo_relay.db` +- [x] SFM REST API — device control + DB query endpoints, live device cache +- [ ] Terra-view integration — seismo-relay router, unit detail page, VISON-style event listing +- [ ] Vibra \ No newline at end of file diff --git a/bridges/ach_server.py b/bridges/ach_server.py index cc3be25..871cd3a 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -1,838 +1,838 @@ -#!/usr/bin/env python3 -""" -ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. - -This IS your test server. Run it on any machine on the same network, point a -unit's ACEmanager call-home destination at it, and it will speak the full BW -protocol to the device: handshake, pull device info, download all events, save -everything as JSON. - -The key thing this script tells you that no amount of packet sniffing can: - - Does the device speak first (push) or wait for us to send POLL (pull)? - -If startup() completes normally → it's pull protocol, same as Blastware. -If startup() times out → the device sent something first; check raw_rx.bin. - -Usage ------ - python bridges/ach_server.py [--port 12345] [--output bridges/captures/] - -Setup ------ - 1. Run this script on a machine on your local network. - 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) - find the Call Home / ACH settings. Set: - Remote Host: - Remote Port: 12345 - 3. Trigger the unit (wait for a vibration event, or use the manual call-home - button if your firmware version has one). - 4. The unit connects. This script handshakes, downloads all events, - and saves a timestamped session directory. - -Output per session ------------------- - bridges/captures/ach_inbound_/ - device_info.json — serial number, firmware version, calibration date, etc. - events.json — all events: timestamp, PPV per channel, peaks, metadata - raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer - session_.log — detailed protocol log - -What to look for ----------------- - Push vs pull: Check session_.log. If the first line after "Connected" - shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL - gets a clean response, it's pull. - - Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry - bulk waveform data — if frequency is sent pre-computed there will be float32 - values before the ADC sample blocks. - - ACH-specific framing: Does the unit send anything extra before the DLE+STX - framing starts? raw_rx.bin will show raw bytes including any preamble. -""" - -from __future__ import annotations - -import argparse -import datetime -import json -import logging -import socket -import sys -import threading -from pathlib import Path -from typing import Optional - -sys.path.insert(0, str(Path(__file__).parent.parent)) - -from minimateplus.transport import SocketTransport -from minimateplus.client import MiniMateClient -from minimateplus.models import DeviceInfo, Event, MonitorLogEntry -from sfm.database import SeismoDb - -log = logging.getLogger("ach_server") - -# ── Per-unit state (downloaded-key set) ─────────────────────────────────────── -# Persisted as /ach_state.json -# Format: -# { -# "BE11529": { -# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk -# "max_downloaded_key": "0111245a", # highest key ever seen -# "last_seen": "2026-04-11T01:04:36" -# } -# } -# -# Key-based deduplication works well within a single "key generation" (between -# erases). After the device memory is erased the event counter resets to -# 0x01110000, so the first new event has the SAME key as the very first event -# we ever downloaded. We detect this situation with max_downloaded_key: -# -# if max(current_device_keys) < max_downloaded_key -# → device was wiped and keys have restarted → treat all device keys as new -# -# After our own erase (--clear-after-download) we also explicitly clear -# downloaded_keys and max_downloaded_key so the next session starts fresh. - -_state_lock = threading.Lock() - - -def _load_state(state_path: Path) -> dict: - if state_path.exists(): - try: - with open(state_path) as f: - return json.load(f) - except Exception: - pass - return {} - - -def _save_state(state_path: Path, state: dict) -> None: - with _state_lock: - with open(state_path, "w") as f: - json.dump(state, f, indent=2) - - -# ── Per-session handler ──────────────────────────────────────────────────────── - -class AchSession: - """ - Handles one inbound unit connection in its own thread. - Wraps the socket in a SocketTransport → MiniMateClient, then runs the - standard connect → get_device_info → get_events sequence. - - State tracking (ach_state.json in output_dir): - On each successful download we record the SET of event keys downloaded. - On the next call-home we compare: if all device keys are already in the - set, there's nothing new. If any key is new (including after the device - was wiped and re-recorded), we download and save only those events. - """ - - def __init__( - self, - sock: socket.socket, - peer: str, - output_dir: Path, - timeout: float, - events_only: bool, - max_events: Optional[int], - state_path: Path, - db: "SeismoDb", - clear_after_download: bool = False, - restart_monitoring: bool = False, - ) -> None: - self.sock = sock - self.peer = peer - self.output_dir = output_dir - self.timeout = timeout - self.events_only = events_only - self.max_events = max_events - self.state_path = state_path - self.db = db - self.clear_after_download = clear_after_download - self.restart_monitoring = restart_monitoring - - def run(self) -> None: - ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - - # Session dir and file handler are created lazily — only after startup - # succeeds. This prevents internet scanners and dropped connections from - # littering the output directory with empty session folders. - try: - self._run_inner(ts) - except Exception as exc: - log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) - finally: - try: - self.sock.close() - except Exception: - pass - - def _run_inner(self, ts: str) -> None: - transport = SocketTransport(self.sock, peer=self.peer) - - # Collect raw bytes in memory until startup succeeds, then flush to disk. - raw_buf: list[bytes] = [] - _orig_read = transport.read - - def tapped_read(n: int) -> bytes: - data = _orig_read(n) - if data: - raw_buf.append(data) - return data - - transport.read = tapped_read # type: ignore[method-assign] - - serial: Optional[str] = None - - # ── Step 1: startup handshake ───────────────────────────────────────── - # Do this BEFORE creating the session directory so that scanner probes - # and dropped connections leave no trace on disk. - try: - from minimateplus.protocol import MiniMateProtocol - client = MiniMateClient(transport=transport, timeout=self.timeout) - client.open() - proto = MiniMateProtocol(transport, recv_timeout=self.timeout) - proto.startup() - except Exception as exc: - log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) - return # no session dir created - - # Startup succeeded — this is a real unit. Create session dir now. - session_dir = self.output_dir / f"ach_inbound_{ts}" - session_dir.mkdir(parents=True, exist_ok=True) - log_path = session_dir / f"session_{ts}.log" - raw_path = session_dir / f"raw_rx_{ts}.bin" - - # Flush buffered raw bytes to file and switch to direct file writes. - raw_fh = open(raw_path, "wb") - for chunk in raw_buf: - raw_fh.write(chunk) - raw_buf.clear() - - def tapped_read_file(n: int) -> bytes: - data = _orig_read(n) - if data: - raw_fh.write(data) - raw_fh.flush() - return data - - transport.read = tapped_read_file # type: ignore[method-assign] - - # Wire up file handler now that the session dir exists. - fh = logging.FileHandler(log_path, encoding="utf-8") - fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) - root_logger = logging.getLogger() - root_logger.addHandler(fh) - - try: - # ── Step 2: device info ─────────────────────────────────────────── - device_info = None - if not self.events_only: - log.info("Step 2/3: reading device info") - try: - device_info = client.connect() - serial = device_info.serial - _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) - log.info( - " [OK] Device: serial=%s firmware=%s model=%s events=%d", - serial, - device_info.firmware_version, - device_info.model, - device_info.event_count or 0, - ) - except Exception as exc: - log.error(" [FAIL] Device info failed: %s", exc) - else: - log.info("Step 2/3: skipping device info (--events-only)") - - # ── Step 3: check for new events by comparing key sets ──────────── - log.info("Step 3/3: checking for new events") - - state = _load_state(self.state_path) - unit_key = serial or self.peer # fall back to IP if no serial - unit_state = state.get(unit_key, {}) - seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) - # Highest event key ever downloaded from this unit (hex string, 8 chars). - # Used to detect post-erase key reuse — see comment block above. - max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") - - # Walk the event index (browse-mode, no 5A) to get the actual current - # key list. The SUB 08 event_count field is a lifetime "total events - # ever recorded" counter that does NOT decrement on erase — confirmed - # 2026-04-13. list_event_keys() via the 1E/1F chain is the only - # reliable way to know what is actually stored on the device right now. - log.info(" Checking device key list (browse walk, no waveform download)...") - try: - device_keys = client.list_event_keys() - except Exception as exc: - log.warning(" list_event_keys failed: %s -- falling back to full download", exc) - device_keys = None - - # Use the walk result as our authoritative current count. - current_count = len(device_keys) if device_keys is not None else 0 - - log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", - current_count, len(seen_keys)) - - if device_keys is not None and current_count == 0: - log.info(" [OK] No events on device -- nothing to download") - log.info("Session complete (no events) -> %s", session_dir) - return - - if device_keys is not None: - # ── Post-erase detection ────────────────────────────────────── - # After the device memory is erased, new events start from key - # 01110000 again — the same keys we already downloaded. Detect - # this by comparing the device's current highest key against the - # historical maximum. If the device has rolled back below our - # high-water mark, its counter was reset and we must treat all - # its keys as new, regardless of what seen_keys contains. - if device_keys and max_seen_key != "00000000": - max_device_key = max(device_keys) # lexicographic; safe because - # keys share the same 4-char prefix - if max_device_key < max_seen_key: - log.info( - " Post-erase reset detected: " - "device max key %s < historical max %s " - "-- treating all device keys as new", - max_device_key, max_seen_key, - ) - seen_keys = set() # discard stale dedup info for this session - - new_key_set = set(device_keys) - seen_keys - log.info(" Device has %d key(s): %d new, %d already seen", - len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) - if not new_key_set: - log.info(" [OK] All events already downloaded -- nothing to do") - # Refresh state timestamp; preserve max_seen_key unchanged. - state[unit_key] = { - "downloaded_keys": sorted(seen_keys | set(device_keys)), - "max_downloaded_key": max_seen_key, - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - - # ── Erase even when no new events (if requested) ────────── - # Blastware ACH always erases after every session — even when - # nothing new was downloaded. Without the erase the device - # still sees stored events in its memory and immediately - # retries the call-home, causing the looping we observed. - # Only erase when device actually has events stored; skip - # the erase if device_keys is empty (nothing to erase). - if self.clear_after_download and device_keys: - log.info( - " Clearing device memory (--clear-after-download, " - "no new events but device has %d stored)...", - len(device_keys), - ) - try: - client.delete_all_events() - log.info(" [OK] Device memory cleared") - # Reset state so the next session starts fresh. - state[unit_key] = { - "downloaded_keys": [], - "max_downloaded_key": "00000000", - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - except Exception as exc: - log.error( - " [WARN] Event deletion failed: %s -- events NOT cleared", - exc, - ) - - log.info("Session complete (no new events) -> %s", session_dir) - return - else: - new_key_set = None # unknown; proceed with full download - - # Apply max_events cap - # stop_idx: when we know the count from list_event_keys, use it as - # an upper bound. When list_event_keys failed (device_keys is None), - # pass None — get_events will run until the null sentinel naturally. - stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None - if self.max_events is not None: - cap = self.max_events - 1 - stop_idx = cap if stop_idx is None else min(stop_idx, cap) - if device_keys is not None and self.max_events < current_count: - log.warning( - " max_events=%d cap: will download events 0-%d only " - "(unit has %d total)", - self.max_events, stop_idx, current_count, - ) - - try: - all_events = client.get_events( - full_waveform=True, - stop_after_index=stop_idx, - skip_waveform_for_keys=seen_keys if seen_keys else None, - compliance_config=device_info.compliance_config if device_info else None, - ) - - # Filter to events whose keys we haven't saved before. - new_events = [ - e for e in all_events - if e._waveform_key is None - or e._waveform_key.hex() not in seen_keys - ] - skipped = len(all_events) - len(new_events) - - log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", - len(all_events), len(new_events), skipped) - if skipped: - log.info(" (skipped %d already-downloaded event(s))", skipped) - - if new_events: - _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) - - for ev in new_events: - pv = ev.peak_values - pi = ev.project_info - key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" - log.info( - " NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", - key_hex, - str(ev.timestamp) if ev.timestamp else "?", - pv.tran if pv else 0, - pv.vert if pv else 0, - pv.long if pv else 0, - pv.peak_vector_sum if pv else 0, - pi.project if pi else "", - ) - else: - log.info(" [OK] No new events since last call-home -- nothing to save") - - # ── Monitor log entries (partial records / continuous monitoring) ── - # Browse walk (0A + 1F only) to collect monitor log entries for - # recording intervals where no threshold was crossed. This is a - # second 1E-based pass over the device's record list, separate from - # the get_events() download loop above. - log.info(" Collecting monitor log entries (browse walk)...") - new_monitor_entries: list[MonitorLogEntry] = [] - try: - new_monitor_entries = client.get_monitor_log_entries( - skip_keys=seen_keys if seen_keys else None, - ) - if new_monitor_entries: - _save_json( - session_dir / "monitor_log.json", - [_monitor_log_entry_to_dict(e) for e in new_monitor_entries], - ) - log.info( - " [OK] %d new monitor log entry(s) saved", - len(new_monitor_entries), - ) - for ml in new_monitor_entries: - log.info( - " MONLOG [%s] %s → %s (%s)", - ml.key, - ml.start_time.isoformat() if ml.start_time else "?", - ml.stop_time.isoformat() if ml.stop_time else "?", - f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s", - ) - else: - log.info(" [OK] No new monitor log entries") - except Exception as exc: - log.warning( - " [WARN] Monitor log collection failed: %s -- continuing", - exc, - ) - - # ── Persist to SQLite DB ───────────────────────────────────── - _session_start = datetime.datetime.now() - try: - # Build waveform blobs for events that have full raw_samples - _waveform_blobs = {} - for _ev in new_events: - if _ev._waveform_key and _ev.raw_samples: - _blob = _build_waveform_blob(_ev) - if _blob: - _waveform_blobs[_ev._waveform_key.hex()] = _blob - if _waveform_blobs: - log.info(" [DB] waveform blobs prepared: %d", len(_waveform_blobs)) - - _ev_ins, _ev_skip = self.db.insert_events( - new_events, serial=serial or self.peer, session_id=None, - waveform_blobs=_waveform_blobs, - ) - _ml_ins, _ml_skip = self.db.insert_monitor_log( - new_monitor_entries, session_id=None - ) - _session_id = self.db.insert_ach_session( - serial=serial or self.peer, - peer=self.peer, - events_downloaded=_ev_ins, - monitor_entries=_ml_ins, - duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(), - session_time=_session_start, - ) - log.info( - " [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)", - _session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip, - ) - except Exception as exc: - log.warning(" [WARN] DB write failed: %s -- continuing", exc) - - # ── Optional: erase device memory after successful download ──── - erased_successfully = False - if self.clear_after_download and new_events: - log.info(" Clearing device memory (--clear-after-download)...") - try: - client.delete_all_events() - log.info(" [OK] Device memory cleared") - erased_successfully = True - except Exception as exc: - log.error( - " [WARN] Event deletion failed: %s -- events NOT cleared", - exc, - ) - - # ── Update persistent state ─────────────────────────────────── - # Include both triggered-event keys and monitor-log keys in the - # downloaded set so they are not re-processed on the next call-home. - current_event_keys = [ - e._waveform_key.hex() - for e in all_events - if e._waveform_key is not None - ] - current_monitor_keys = [e.key for e in new_monitor_entries] - current_keys = current_event_keys + current_monitor_keys - - if erased_successfully: - # Device memory is clear. Reset downloaded_keys and the - # high-water mark so the next call-home starts fresh and - # doesn't mis-identify the recycled key 01110000 as "seen". - updated_keys = [] - new_max_key = "00000000" - log.info( - " State reset after erase -- next session will download " - "from key 0 (device counter resets after erase)" - ) - else: - # Normal (no erase): union of previously-seen + all keys on - # device now. Includes already-seen survivors so we never - # re-download them if the device somehow keeps old records. - updated_keys = sorted(set(seen_keys) | set(current_keys)) - new_max_key = updated_keys[-1] if updated_keys else max_seen_key - - state[unit_key] = { - "downloaded_keys": updated_keys, - "max_downloaded_key": new_max_key, - "last_seen": datetime.datetime.now().isoformat(), - "serial": serial, - "peer": self.peer, - } - _save_state(self.state_path, state) - - except Exception as exc: - log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) - - # ── Optional: restart monitoring after successful download ───────── - if self.restart_monitoring: - log.info(" Restarting monitoring on device (--restart-monitoring)...") - try: - client.start_monitoring() - log.info(" [OK] Monitoring restarted") - except Exception as exc: - log.warning(" [WARN] Failed to restart monitoring: %s", exc) - - finally: - raw_fh.close() - client.close() # closes transport / socket cleanly - root_logger.removeHandler(fh) - fh.close() - - log.info("Session complete -> %s", session_dir) - log.info("="*60) - - -# ── JSON helpers ─────────────────────────────────────────────────────────────── - -def _save_json(path: Path, obj: object) -> None: - with open(path, "w") as f: - json.dump(obj, f, indent=2, default=str) - log.debug("Saved %s", path) - - -def _device_info_to_dict(d: DeviceInfo) -> dict: - cc = d.compliance_config - return { - "serial": d.serial, - "firmware_version": d.firmware_version, - "dsp_version": d.dsp_version, - "model": d.model, - "event_count": d.event_count, - # compliance config fields (None if 1A read failed) - "setup_name": cc.setup_name if cc else None, - "sample_rate": cc.sample_rate if cc else None, - "record_time": cc.record_time if cc else None, - "trigger_level_geo": cc.trigger_level_geo if cc else None, - "alarm_level_geo": cc.alarm_level_geo if cc else None, - "max_range_geo": cc.max_range_geo if cc else None, - "project": cc.project if cc else None, - "client": cc.client if cc else None, - "operator": cc.operator if cc else None, - "sensor_location": cc.sensor_location if cc else None, - } - - -def _event_to_dict(e: Event) -> dict: - pv = e.peak_values - pi = e.project_info - peaks = {} - if pv: - peaks = { - "transverse": pv.tran, - "vertical": pv.vert, - "longitudinal": pv.long, - "vector_sum": pv.peak_vector_sum, - "mic": pv.micl, - } - samples = {} - if e.raw_samples: - samples = { - ch: vals[:20] # first 20 sample-sets to keep the file sane - for ch, vals in e.raw_samples.items() - } - samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" - return { - "timestamp": str(e.timestamp) if e.timestamp else None, - "project": pi.project if pi else None, - "client": pi.client if pi else None, - "operator": pi.operator if pi else None, - "sensor_location": pi.sensor_location if pi else None, - "peaks": peaks, - "raw_samples_preview": samples, - } - - -def _build_waveform_blob(e: Event) -> Optional[str]: - """ - Serialise a downloaded event's full waveform data as a JSON string for - storage in the DB waveform_blob column. - - Returns the same shape as GET /device/event/{index}/waveform so the - waveform viewer can consume either source without modification. - Returns None if the event has no raw_samples (e.g. metadata-only download). - """ - raw = e.raw_samples or {} - if not raw: - return None - - pv = e.peak_values - peak_values = None - if pv: - # Key names must match sfm/server.py _serialise_peak_values() so the - # waveform viewer (which reads tran_in_s / vert_in_s / long_in_s) works - # identically in both live-device mode and DB mode. - peak_values = { - "tran_in_s": pv.tran, - "vert_in_s": pv.vert, - "long_in_s": pv.long, - "micl_psi": pv.micl, - "peak_vector_sum": pv.peak_vector_sum, - } - - ts = e.timestamp - timestamp_str = ( - f"{ts.year:04d}-{ts.month:02d}-{ts.day:02d}T" - f"{ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}" - if ts else None - ) - - blob = { - "index": e.index, - "record_type": e.record_type, - "timestamp": timestamp_str, - "total_samples": e.total_samples, - "pretrig_samples": e.pretrig_samples, - "rectime_seconds": e.rectime_seconds, - "samples_decoded": len(raw.get("Tran", [])), - "sample_rate": e.sample_rate, - "peak_values": peak_values, - "channels": raw, - } - return json.dumps(blob) - - -def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict: - return { - "key": e.key, - "start_time": e.start_time.isoformat() if e.start_time else None, - "stop_time": e.stop_time.isoformat() if e.stop_time else None, - "duration_seconds": e.duration_seconds, - "serial": e.serial, - "geo_threshold_ips": e.geo_threshold_ips, - } - - -# ── Main server loop ─────────────────────────────────────────────────────────── - -def serve(args: argparse.Namespace) -> None: - output_dir = Path(args.output) - output_dir.mkdir(parents=True, exist_ok=True) - state_path = output_dir / "ach_state.json" - db = SeismoDb(output_dir / "seismo_relay.db") - - server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_sock.bind(("0.0.0.0", args.port)) - server_sock.listen(5) - # Wake up every second so Ctrl-C is handled promptly on Windows. - # Without this, accept() blocks indefinitely and ignores KeyboardInterrupt. - server_sock.settimeout(1.0) - - max_ev = args.max_events - print(f"\n{'='*60}") - print(f" ACH inbound server listening on 0.0.0.0:{args.port}") - print(f" Output: {output_dir.resolve()}/ach_inbound_/") - print(f" State file: {state_path}") - print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") - print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") - print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") - print(f"{'='*60}") - print(f"\n Point your test unit's ACEmanager call-home settings to:") - print(f" Remote Host: ") - print(f" Remote Port: {args.port}") - print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") - - allow_ips = set(args.allow_ips) - if allow_ips: - print(f" Allowlist: {', '.join(sorted(allow_ips))}") - else: - print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") - - try: - while True: - try: - client_sock, addr = server_sock.accept() - except socket.timeout: - continue # no connection this second; loop back and check for Ctrl-C - try: - peer_ip = addr[0] - peer = f"{addr[0]}:{addr[1]}" - - if allow_ips and peer_ip not in allow_ips: - log.info("Rejected connection from %s (not in allowlist)", peer) - client_sock.close() - continue - - log.info("Accepted connection from %s", peer) - session = AchSession( - sock=client_sock, - peer=peer, - output_dir=output_dir, - timeout=args.timeout, - events_only=args.events_only, - max_events=max_ev, - state_path=state_path, - db=db, - clear_after_download=args.clear_after_download, - restart_monitoring=args.restart_monitoring, - ) - t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") - t.start() - except KeyboardInterrupt: - raise - except Exception as exc: - log.error("Accept error: %s", exc) - finally: - server_sock.close() - print("\nServer stopped.") - - -def parse_args() -> argparse.Namespace: - p = argparse.ArgumentParser( - description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=__doc__, - ) - p.add_argument( - "--port", "-p", - type=int, - default=12345, - help="Port to listen on (default: 12345).", - ) - p.add_argument( - "--output", "-o", - default=str(Path(__file__).parent / "captures"), - metavar="DIR", - help="Directory to write session captures (default: bridges/captures/).", - ) - p.add_argument( - "--timeout", "-t", - type=float, - default=30.0, - help="Protocol receive timeout in seconds (default: 30.0).", - ) - p.add_argument( - "--events-only", - action="store_true", - help="Skip the device-info step and go straight to event download.", - ) - p.add_argument( - "--max-events", - type=int, - default=None, - metavar="N", - help=( - "Safety cap: download at most N events per session (default: unlimited). " - "Useful if a unit has many old events stored — prevents a very long first run." - ), - ) - p.add_argument( - "--allow-ip", - metavar="IP", - action="append", - dest="allow_ips", - default=[], - help=( - "Only accept connections from this IP address (repeat for multiple). " - "Example: --allow-ip 63.43.212.232 " - "If not specified, all IPs are accepted (not recommended for public servers)." - ), - ) - p.add_argument( - "--restart-monitoring", - action="store_true", - default=False, - help=( - "After downloading events, send SUB 0x96 (start monitoring) before " - "disconnecting. Required for RV55 units whose firmware does not assert " - "DCD on disconnect — without this the unit stays idle after a call-home." - ), - ) - p.add_argument( - "--clear-after-download", - action="store_true", - default=False, - help=( - "After successfully downloading new events, erase all events from the " - "device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from " - "4-11-26 MITM capture). Only fires when at least one new event was saved. " - "This mirrors the standard Blastware ACH workflow." - ), - ) - p.add_argument( - "--verbose", "-v", - action="store_true", - help="Enable debug logging.", - ) - return p.parse_args() - - -if __name__ == "__main__": - args = parse_args() - logging.basicConfig( - level=logging.DEBUG if args.verbose else logging.INFO, - format="%(asctime)s %(levelname)-7s %(name)s %(message)s", - ) - try: - serve(args) - except KeyboardInterrupt: - print("\nStopped.") +#!/usr/bin/env python3 +""" +ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. + +This IS your test server. Run it on any machine on the same network, point a +unit's ACEmanager call-home destination at it, and it will speak the full BW +protocol to the device: handshake, pull device info, download all events, save +everything as JSON. + +The key thing this script tells you that no amount of packet sniffing can: + - Does the device speak first (push) or wait for us to send POLL (pull)? + +If startup() completes normally → it's pull protocol, same as Blastware. +If startup() times out → the device sent something first; check raw_rx.bin. + +Usage +----- + python bridges/ach_server.py [--port 12345] [--output bridges/captures/] + +Setup +----- + 1. Run this script on a machine on your local network. + 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) + find the Call Home / ACH settings. Set: + Remote Host: + Remote Port: 12345 + 3. Trigger the unit (wait for a vibration event, or use the manual call-home + button if your firmware version has one). + 4. The unit connects. This script handshakes, downloads all events, + and saves a timestamped session directory. + +Output per session +------------------ + bridges/captures/ach_inbound_/ + device_info.json — serial number, firmware version, calibration date, etc. + events.json — all events: timestamp, PPV per channel, peaks, metadata + raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer + session_.log — detailed protocol log + +What to look for +---------------- + Push vs pull: Check session_.log. If the first line after "Connected" + shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL + gets a clean response, it's pull. + + Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry + bulk waveform data — if frequency is sent pre-computed there will be float32 + values before the ADC sample blocks. + + ACH-specific framing: Does the unit send anything extra before the DLE+STX + framing starts? raw_rx.bin will show raw bytes including any preamble. +""" + +from __future__ import annotations + +import argparse +import datetime +import json +import logging +import socket +import sys +import threading +from pathlib import Path +from typing import Optional + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from minimateplus.transport import SocketTransport +from minimateplus.client import MiniMateClient +from minimateplus.models import DeviceInfo, Event, MonitorLogEntry +from sfm.database import SeismoDb + +log = logging.getLogger("ach_server") + +# ── Per-unit state (downloaded-key set) ─────────────────────────────────────── +# Persisted as /ach_state.json +# Format: +# { +# "BE11529": { +# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk +# "max_downloaded_key": "0111245a", # highest key ever seen +# "last_seen": "2026-04-11T01:04:36" +# } +# } +# +# Key-based deduplication works well within a single "key generation" (between +# erases). After the device memory is erased the event counter resets to +# 0x01110000, so the first new event has the SAME key as the very first event +# we ever downloaded. We detect this situation with max_downloaded_key: +# +# if max(current_device_keys) < max_downloaded_key +# → device was wiped and keys have restarted → treat all device keys as new +# +# After our own erase (--clear-after-download) we also explicitly clear +# downloaded_keys and max_downloaded_key so the next session starts fresh. + +_state_lock = threading.Lock() + + +def _load_state(state_path: Path) -> dict: + if state_path.exists(): + try: + with open(state_path) as f: + return json.load(f) + except Exception: + pass + return {} + + +def _save_state(state_path: Path, state: dict) -> None: + with _state_lock: + with open(state_path, "w") as f: + json.dump(state, f, indent=2) + + +# ── Per-session handler ──────────────────────────────────────────────────────── + +class AchSession: + """ + Handles one inbound unit connection in its own thread. + Wraps the socket in a SocketTransport → MiniMateClient, then runs the + standard connect → get_device_info → get_events sequence. + + State tracking (ach_state.json in output_dir): + On each successful download we record the SET of event keys downloaded. + On the next call-home we compare: if all device keys are already in the + set, there's nothing new. If any key is new (including after the device + was wiped and re-recorded), we download and save only those events. + """ + + def __init__( + self, + sock: socket.socket, + peer: str, + output_dir: Path, + timeout: float, + events_only: bool, + max_events: Optional[int], + state_path: Path, + db: "SeismoDb", + clear_after_download: bool = False, + restart_monitoring: bool = False, + ) -> None: + self.sock = sock + self.peer = peer + self.output_dir = output_dir + self.timeout = timeout + self.events_only = events_only + self.max_events = max_events + self.state_path = state_path + self.db = db + self.clear_after_download = clear_after_download + self.restart_monitoring = restart_monitoring + + def run(self) -> None: + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + # Session dir and file handler are created lazily — only after startup + # succeeds. This prevents internet scanners and dropped connections from + # littering the output directory with empty session folders. + try: + self._run_inner(ts) + except Exception as exc: + log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) + finally: + try: + self.sock.close() + except Exception: + pass + + def _run_inner(self, ts: str) -> None: + transport = SocketTransport(self.sock, peer=self.peer) + + # Collect raw bytes in memory until startup succeeds, then flush to disk. + raw_buf: list[bytes] = [] + _orig_read = transport.read + + def tapped_read(n: int) -> bytes: + data = _orig_read(n) + if data: + raw_buf.append(data) + return data + + transport.read = tapped_read # type: ignore[method-assign] + + serial: Optional[str] = None + + # ── Step 1: startup handshake ───────────────────────────────────────── + # Do this BEFORE creating the session directory so that scanner probes + # and dropped connections leave no trace on disk. + try: + from minimateplus.protocol import MiniMateProtocol + client = MiniMateClient(transport=transport, timeout=self.timeout) + client.open() + proto = MiniMateProtocol(transport, recv_timeout=self.timeout) + proto.startup() + except Exception as exc: + log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) + return # no session dir created + + # Startup succeeded — this is a real unit. Create session dir now. + session_dir = self.output_dir / f"ach_inbound_{ts}" + session_dir.mkdir(parents=True, exist_ok=True) + log_path = session_dir / f"session_{ts}.log" + raw_path = session_dir / f"raw_rx_{ts}.bin" + + # Flush buffered raw bytes to file and switch to direct file writes. + raw_fh = open(raw_path, "wb") + for chunk in raw_buf: + raw_fh.write(chunk) + raw_buf.clear() + + def tapped_read_file(n: int) -> bytes: + data = _orig_read(n) + if data: + raw_fh.write(data) + raw_fh.flush() + return data + + transport.read = tapped_read_file # type: ignore[method-assign] + + # Wire up file handler now that the session dir exists. + fh = logging.FileHandler(log_path, encoding="utf-8") + fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) + root_logger = logging.getLogger() + root_logger.addHandler(fh) + + try: + # ── Step 2: device info ─────────────────────────────────────────── + device_info = None + if not self.events_only: + log.info("Step 2/3: reading device info") + try: + device_info = client.connect() + serial = device_info.serial + _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) + log.info( + " [OK] Device: serial=%s firmware=%s model=%s events=%d", + serial, + device_info.firmware_version, + device_info.model, + device_info.event_count or 0, + ) + except Exception as exc: + log.error(" [FAIL] Device info failed: %s", exc) + else: + log.info("Step 2/3: skipping device info (--events-only)") + + # ── Step 3: check for new events by comparing key sets ──────────── + log.info("Step 3/3: checking for new events") + + state = _load_state(self.state_path) + unit_key = serial or self.peer # fall back to IP if no serial + unit_state = state.get(unit_key, {}) + seen_keys: set[str] = set(unit_state.get("downloaded_keys", [])) + # Highest event key ever downloaded from this unit (hex string, 8 chars). + # Used to detect post-erase key reuse — see comment block above. + max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") + + # Walk the event index (browse-mode, no 5A) to get the actual current + # key list. The SUB 08 event_count field is a lifetime "total events + # ever recorded" counter that does NOT decrement on erase — confirmed + # 2026-04-13. list_event_keys() via the 1E/1F chain is the only + # reliable way to know what is actually stored on the device right now. + log.info(" Checking device key list (browse walk, no waveform download)...") + try: + device_keys = client.list_event_keys() + except Exception as exc: + log.warning(" list_event_keys failed: %s -- falling back to full download", exc) + device_keys = None + + # Use the walk result as our authoritative current count. + current_count = len(device_keys) if device_keys is not None else 0 + + log.info(" Unit has %d stored event(s); %d key(s) previously downloaded", + current_count, len(seen_keys)) + + if device_keys is not None and current_count == 0: + log.info(" [OK] No events on device -- nothing to download") + log.info("Session complete (no events) -> %s", session_dir) + return + + if device_keys is not None: + # ── Post-erase detection ────────────────────────────────────── + # After the device memory is erased, new events start from key + # 01110000 again — the same keys we already downloaded. Detect + # this by comparing the device's current highest key against the + # historical maximum. If the device has rolled back below our + # high-water mark, its counter was reset and we must treat all + # its keys as new, regardless of what seen_keys contains. + if device_keys and max_seen_key != "00000000": + max_device_key = max(device_keys) # lexicographic; safe because + # keys share the same 4-char prefix + if max_device_key < max_seen_key: + log.info( + " Post-erase reset detected: " + "device max key %s < historical max %s " + "-- treating all device keys as new", + max_device_key, max_seen_key, + ) + seen_keys = set() # discard stale dedup info for this session + + new_key_set = set(device_keys) - seen_keys + log.info(" Device has %d key(s): %d new, %d already seen", + len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set)) + if not new_key_set: + log.info(" [OK] All events already downloaded -- nothing to do") + # Refresh state timestamp; preserve max_seen_key unchanged. + state[unit_key] = { + "downloaded_keys": sorted(seen_keys | set(device_keys)), + "max_downloaded_key": max_seen_key, + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, + } + _save_state(self.state_path, state) + + # ── Erase even when no new events (if requested) ────────── + # Blastware ACH always erases after every session — even when + # nothing new was downloaded. Without the erase the device + # still sees stored events in its memory and immediately + # retries the call-home, causing the looping we observed. + # Only erase when device actually has events stored; skip + # the erase if device_keys is empty (nothing to erase). + if self.clear_after_download and device_keys: + log.info( + " Clearing device memory (--clear-after-download, " + "no new events but device has %d stored)...", + len(device_keys), + ) + try: + client.delete_all_events() + log.info(" [OK] Device memory cleared") + # Reset state so the next session starts fresh. + state[unit_key] = { + "downloaded_keys": [], + "max_downloaded_key": "00000000", + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, + } + _save_state(self.state_path, state) + except Exception as exc: + log.error( + " [WARN] Event deletion failed: %s -- events NOT cleared", + exc, + ) + + log.info("Session complete (no new events) -> %s", session_dir) + return + else: + new_key_set = None # unknown; proceed with full download + + # Apply max_events cap + # stop_idx: when we know the count from list_event_keys, use it as + # an upper bound. When list_event_keys failed (device_keys is None), + # pass None — get_events will run until the null sentinel naturally. + stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None + if self.max_events is not None: + cap = self.max_events - 1 + stop_idx = cap if stop_idx is None else min(stop_idx, cap) + if device_keys is not None and self.max_events < current_count: + log.warning( + " max_events=%d cap: will download events 0-%d only " + "(unit has %d total)", + self.max_events, stop_idx, current_count, + ) + + try: + all_events = client.get_events( + full_waveform=True, + stop_after_index=stop_idx, + skip_waveform_for_keys=seen_keys if seen_keys else None, + compliance_config=device_info.compliance_config if device_info else None, + ) + + # Filter to events whose keys we haven't saved before. + new_events = [ + e for e in all_events + if e._waveform_key is None + or e._waveform_key.hex() not in seen_keys + ] + skipped = len(all_events) - len(new_events) + + log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)", + len(all_events), len(new_events), skipped) + if skipped: + log.info(" (skipped %d already-downloaded event(s))", skipped) + + if new_events: + _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) + + for ev in new_events: + pv = ev.peak_values + pi = ev.project_info + key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" + log.info( + " NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", + key_hex, + str(ev.timestamp) if ev.timestamp else "?", + pv.tran if pv else 0, + pv.vert if pv else 0, + pv.long if pv else 0, + pv.peak_vector_sum if pv else 0, + pi.project if pi else "", + ) + else: + log.info(" [OK] No new events since last call-home -- nothing to save") + + # ── Monitor log entries (partial records / continuous monitoring) ── + # Browse walk (0A + 1F only) to collect monitor log entries for + # recording intervals where no threshold was crossed. This is a + # second 1E-based pass over the device's record list, separate from + # the get_events() download loop above. + log.info(" Collecting monitor log entries (browse walk)...") + new_monitor_entries: list[MonitorLogEntry] = [] + try: + new_monitor_entries = client.get_monitor_log_entries( + skip_keys=seen_keys if seen_keys else None, + ) + if new_monitor_entries: + _save_json( + session_dir / "monitor_log.json", + [_monitor_log_entry_to_dict(e) for e in new_monitor_entries], + ) + log.info( + " [OK] %d new monitor log entry(s) saved", + len(new_monitor_entries), + ) + for ml in new_monitor_entries: + log.info( + " MONLOG [%s] %s → %s (%s)", + ml.key, + ml.start_time.isoformat() if ml.start_time else "?", + ml.stop_time.isoformat() if ml.stop_time else "?", + f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s", + ) + else: + log.info(" [OK] No new monitor log entries") + except Exception as exc: + log.warning( + " [WARN] Monitor log collection failed: %s -- continuing", + exc, + ) + + # ── Persist to SQLite DB ───────────────────────────────────── + _session_start = datetime.datetime.now() + try: + # Build waveform blobs for events that have full raw_samples + _waveform_blobs = {} + for _ev in new_events: + if _ev._waveform_key and _ev.raw_samples: + _blob = _build_waveform_blob(_ev) + if _blob: + _waveform_blobs[_ev._waveform_key.hex()] = _blob + if _waveform_blobs: + log.info(" [DB] waveform blobs prepared: %d", len(_waveform_blobs)) + + _ev_ins, _ev_skip = self.db.insert_events( + new_events, serial=serial or self.peer, session_id=None, + waveform_blobs=_waveform_blobs, + ) + _ml_ins, _ml_skip = self.db.insert_monitor_log( + new_monitor_entries, session_id=None + ) + _session_id = self.db.insert_ach_session( + serial=serial or self.peer, + peer=self.peer, + events_downloaded=_ev_ins, + monitor_entries=_ml_ins, + duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(), + session_time=_session_start, + ) + log.info( + " [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)", + _session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip, + ) + except Exception as exc: + log.warning(" [WARN] DB write failed: %s -- continuing", exc) + + # ── Optional: erase device memory after successful download ──── + erased_successfully = False + if self.clear_after_download and new_events: + log.info(" Clearing device memory (--clear-after-download)...") + try: + client.delete_all_events() + log.info(" [OK] Device memory cleared") + erased_successfully = True + except Exception as exc: + log.error( + " [WARN] Event deletion failed: %s -- events NOT cleared", + exc, + ) + + # ── Update persistent state ─────────────────────────────────── + # Include both triggered-event keys and monitor-log keys in the + # downloaded set so they are not re-processed on the next call-home. + current_event_keys = [ + e._waveform_key.hex() + for e in all_events + if e._waveform_key is not None + ] + current_monitor_keys = [e.key for e in new_monitor_entries] + current_keys = current_event_keys + current_monitor_keys + + if erased_successfully: + # Device memory is clear. Reset downloaded_keys and the + # high-water mark so the next call-home starts fresh and + # doesn't mis-identify the recycled key 01110000 as "seen". + updated_keys = [] + new_max_key = "00000000" + log.info( + " State reset after erase -- next session will download " + "from key 0 (device counter resets after erase)" + ) + else: + # Normal (no erase): union of previously-seen + all keys on + # device now. Includes already-seen survivors so we never + # re-download them if the device somehow keeps old records. + updated_keys = sorted(set(seen_keys) | set(current_keys)) + new_max_key = updated_keys[-1] if updated_keys else max_seen_key + + state[unit_key] = { + "downloaded_keys": updated_keys, + "max_downloaded_key": new_max_key, + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, + } + _save_state(self.state_path, state) + + except Exception as exc: + log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) + + # ── Optional: restart monitoring after successful download ───────── + if self.restart_monitoring: + log.info(" Restarting monitoring on device (--restart-monitoring)...") + try: + client.start_monitoring() + log.info(" [OK] Monitoring restarted") + except Exception as exc: + log.warning(" [WARN] Failed to restart monitoring: %s", exc) + + finally: + raw_fh.close() + client.close() # closes transport / socket cleanly + root_logger.removeHandler(fh) + fh.close() + + log.info("Session complete -> %s", session_dir) + log.info("="*60) + + +# ── JSON helpers ─────────────────────────────────────────────────────────────── + +def _save_json(path: Path, obj: object) -> None: + with open(path, "w") as f: + json.dump(obj, f, indent=2, default=str) + log.debug("Saved %s", path) + + +def _device_info_to_dict(d: DeviceInfo) -> dict: + cc = d.compliance_config + return { + "serial": d.serial, + "firmware_version": d.firmware_version, + "dsp_version": d.dsp_version, + "model": d.model, + "event_count": d.event_count, + # compliance config fields (None if 1A read failed) + "setup_name": cc.setup_name if cc else None, + "sample_rate": cc.sample_rate if cc else None, + "record_time": cc.record_time if cc else None, + "trigger_level_geo": cc.trigger_level_geo if cc else None, + "alarm_level_geo": cc.alarm_level_geo if cc else None, + "max_range_geo": cc.max_range_geo if cc else None, + "project": cc.project if cc else None, + "client": cc.client if cc else None, + "operator": cc.operator if cc else None, + "sensor_location": cc.sensor_location if cc else None, + } + + +def _event_to_dict(e: Event) -> dict: + pv = e.peak_values + pi = e.project_info + peaks = {} + if pv: + peaks = { + "transverse": pv.tran, + "vertical": pv.vert, + "longitudinal": pv.long, + "vector_sum": pv.peak_vector_sum, + "mic": pv.micl, + } + samples = {} + if e.raw_samples: + samples = { + ch: vals[:20] # first 20 sample-sets to keep the file sane + for ch, vals in e.raw_samples.items() + } + samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" + return { + "timestamp": str(e.timestamp) if e.timestamp else None, + "project": pi.project if pi else None, + "client": pi.client if pi else None, + "operator": pi.operator if pi else None, + "sensor_location": pi.sensor_location if pi else None, + "peaks": peaks, + "raw_samples_preview": samples, + } + + +def _build_waveform_blob(e: Event) -> Optional[str]: + """ + Serialise a downloaded event's full waveform data as a JSON string for + storage in the DB waveform_blob column. + + Returns the same shape as GET /device/event/{index}/waveform so the + waveform viewer can consume either source without modification. + Returns None if the event has no raw_samples (e.g. metadata-only download). + """ + raw = e.raw_samples or {} + if not raw: + return None + + pv = e.peak_values + peak_values = None + if pv: + # Key names must match sfm/server.py _serialise_peak_values() so the + # waveform viewer (which reads tran_in_s / vert_in_s / long_in_s) works + # identically in both live-device mode and DB mode. + peak_values = { + "tran_in_s": pv.tran, + "vert_in_s": pv.vert, + "long_in_s": pv.long, + "micl_psi": pv.micl, + "peak_vector_sum": pv.peak_vector_sum, + } + + ts = e.timestamp + timestamp_str = ( + f"{ts.year:04d}-{ts.month:02d}-{ts.day:02d}T" + f"{ts.hour:02d}:{ts.minute:02d}:{ts.second:02d}" + if ts else None + ) + + blob = { + "index": e.index, + "record_type": e.record_type, + "timestamp": timestamp_str, + "total_samples": e.total_samples, + "pretrig_samples": e.pretrig_samples, + "rectime_seconds": e.rectime_seconds, + "samples_decoded": len(raw.get("Tran", [])), + "sample_rate": e.sample_rate, + "peak_values": peak_values, + "channels": raw, + } + return json.dumps(blob) + + +def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict: + return { + "key": e.key, + "start_time": e.start_time.isoformat() if e.start_time else None, + "stop_time": e.stop_time.isoformat() if e.stop_time else None, + "duration_seconds": e.duration_seconds, + "serial": e.serial, + "geo_threshold_ips": e.geo_threshold_ips, + } + + +# ── Main server loop ─────────────────────────────────────────────────────────── + +def serve(args: argparse.Namespace) -> None: + output_dir = Path(args.output) + output_dir.mkdir(parents=True, exist_ok=True) + state_path = output_dir / "ach_state.json" + db = SeismoDb(output_dir / "seismo_relay.db") + + server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_sock.bind(("0.0.0.0", args.port)) + server_sock.listen(5) + # Wake up every second so Ctrl-C is handled promptly on Windows. + # Without this, accept() blocks indefinitely and ignores KeyboardInterrupt. + server_sock.settimeout(1.0) + + max_ev = args.max_events + print(f"\n{'='*60}") + print(f" ACH inbound server listening on 0.0.0.0:{args.port}") + print(f" Output: {output_dir.resolve()}/ach_inbound_/") + print(f" State file: {state_path}") + print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") + print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") + print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") + print(f"{'='*60}") + print(f"\n Point your test unit's ACEmanager call-home settings to:") + print(f" Remote Host: ") + print(f" Remote Port: {args.port}") + print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") + + allow_ips = set(args.allow_ips) + if allow_ips: + print(f" Allowlist: {', '.join(sorted(allow_ips))}") + else: + print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") + + try: + while True: + try: + client_sock, addr = server_sock.accept() + except socket.timeout: + continue # no connection this second; loop back and check for Ctrl-C + try: + peer_ip = addr[0] + peer = f"{addr[0]}:{addr[1]}" + + if allow_ips and peer_ip not in allow_ips: + log.info("Rejected connection from %s (not in allowlist)", peer) + client_sock.close() + continue + + log.info("Accepted connection from %s", peer) + session = AchSession( + sock=client_sock, + peer=peer, + output_dir=output_dir, + timeout=args.timeout, + events_only=args.events_only, + max_events=max_ev, + state_path=state_path, + db=db, + clear_after_download=args.clear_after_download, + restart_monitoring=args.restart_monitoring, + ) + t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") + t.start() + except KeyboardInterrupt: + raise + except Exception as exc: + log.error("Accept error: %s", exc) + finally: + server_sock.close() + print("\nServer stopped.") + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + p.add_argument( + "--port", "-p", + type=int, + default=12345, + help="Port to listen on (default: 12345).", + ) + p.add_argument( + "--output", "-o", + default=str(Path(__file__).parent / "captures"), + metavar="DIR", + help="Directory to write session captures (default: bridges/captures/).", + ) + p.add_argument( + "--timeout", "-t", + type=float, + default=30.0, + help="Protocol receive timeout in seconds (default: 30.0).", + ) + p.add_argument( + "--events-only", + action="store_true", + help="Skip the device-info step and go straight to event download.", + ) + p.add_argument( + "--max-events", + type=int, + default=None, + metavar="N", + help=( + "Safety cap: download at most N events per session (default: unlimited). " + "Useful if a unit has many old events stored — prevents a very long first run." + ), + ) + p.add_argument( + "--allow-ip", + metavar="IP", + action="append", + dest="allow_ips", + default=[], + help=( + "Only accept connections from this IP address (repeat for multiple). " + "Example: --allow-ip 63.43.212.232 " + "If not specified, all IPs are accepted (not recommended for public servers)." + ), + ) + p.add_argument( + "--restart-monitoring", + action="store_true", + default=False, + help=( + "After downloading events, send SUB 0x96 (start monitoring) before " + "disconnecting. Required for RV55 units whose firmware does not assert " + "DCD on disconnect — without this the unit stays idle after a call-home." + ), + ) + p.add_argument( + "--clear-after-download", + action="store_true", + default=False, + help=( + "After successfully downloading new events, erase all events from the " + "device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from " + "4-11-26 MITM capture). Only fires when at least one new event was saved. " + "This mirrors the standard Blastware ACH workflow." + ), + ) + p.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable debug logging.", + ) + return p.parse_args() + + +if __name__ == "__main__": + args = parse_args() + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)-7s %(name)s %(message)s", + ) + try: + serve(args) + except KeyboardInterrupt: + print("\nStopped.") diff --git a/sfm/database.py b/sfm/database.py index 01758a7..12793fe 100644 --- a/sfm/database.py +++ b/sfm/database.py @@ -1,524 +1,524 @@ -""" -sfm/database.py — SQLite persistence layer for seismo-relay. - -Three tables, all keyed by unit serial number: - - ach_sessions — one row per inbound ACH call-home - events — one row per triggered waveform event (deduped by serial+timestamp) - monitor_log — one row per monitoring interval (deduped by serial+start_time) - -The DB file lives at: - /seismo_relay.db (default: bridges/captures/seismo_relay.db) - -Usage ------ - from sfm.database import SeismoDb - - db = SeismoDb("bridges/captures/seismo_relay.db") - - # Write a call-home session - session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920", - events_downloaded=3, monitor_entries=2, - duration_seconds=47.3) - - # Write events (silently skips duplicates) - db.insert_events(events, serial="BE11529", session_id=session_id) - - # Write monitor log entries - db.insert_monitor_log(entries, session_id=session_id) - - # Query - rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...)) -""" - -from __future__ import annotations - -import datetime -import logging -import sqlite3 -import uuid -from pathlib import Path -from typing import Optional - -from minimateplus.models import Event, MonitorLogEntry - -log = logging.getLogger("sfm.database") - -# ── Schema ───────────────────────────────────────────────────────────────────── - -_SCHEMA = """ -PRAGMA journal_mode = WAL; -PRAGMA foreign_keys = ON; - -CREATE TABLE IF NOT EXISTS ach_sessions ( - id TEXT PRIMARY KEY, -- UUID - serial TEXT NOT NULL, - session_time TEXT NOT NULL, -- ISO-8601 UTC - peer TEXT, -- "ip:port" - events_downloaded INTEGER NOT NULL DEFAULT 0, - monitor_entries INTEGER NOT NULL DEFAULT 0, - duration_seconds REAL, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) -); -CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial); -CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time); - -CREATE TABLE IF NOT EXISTS events ( - id TEXT PRIMARY KEY, -- UUID - serial TEXT NOT NULL, - waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) - session_id TEXT, -- FK → ach_sessions.id - timestamp TEXT, -- ISO-8601 local time from device - tran_ppv REAL, -- in/s - vert_ppv REAL, -- in/s - long_ppv REAL, -- in/s - peak_vector_sum REAL, -- in/s - mic_ppv REAL, -- psi or dB depending on setup - project TEXT, - client TEXT, - operator TEXT, - sensor_location TEXT, - sample_rate INTEGER, - record_type TEXT, -- "single_shot" | "continuous" - false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) - waveform_blob TEXT, -- JSON waveform response (channels + metadata) - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - UNIQUE(serial, timestamp) -); -CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); -CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); -CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); - -CREATE TABLE IF NOT EXISTS monitor_log ( - id TEXT PRIMARY KEY, -- UUID - serial TEXT NOT NULL, - waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) - session_id TEXT, -- FK → ach_sessions.id - start_time TEXT, -- ISO-8601 - stop_time TEXT, -- ISO-8601 - duration_seconds REAL, - geo_threshold_ips REAL, -- in/s - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - UNIQUE(serial, start_time) -); -CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); -CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); -CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); -""" - - -# ── SeismoDb class ───────────────────────────────────────────────────────────── - -class SeismoDb: - """ - Thin SQLite wrapper for seismo-relay persistence. - - Thread-safe: each call opens, uses, and closes a connection with - check_same_thread=False and WAL mode enabled. For the ACH server's - single-writer / occasional-reader pattern this is more than sufficient. - """ - - def __init__(self, db_path: str | Path) -> None: - self.db_path = Path(db_path) - self.db_path.parent.mkdir(parents=True, exist_ok=True) - self._init_schema() - log.info("SeismoDb initialised at %s", self.db_path) - - # ── Internal helpers ─────────────────────────────────────────────────────── - - def _connect(self) -> sqlite3.Connection: - conn = sqlite3.connect(str(self.db_path), check_same_thread=False) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode = WAL") - conn.execute("PRAGMA foreign_keys = ON") - return conn - - def _init_schema(self) -> None: - with self._connect() as conn: - conn.executescript(_SCHEMA) - self._migrate(conn) - - def _migrate(self, conn: sqlite3.Connection) -> None: - """Apply in-place schema migrations for existing databases.""" - - # Migration 1: change events UNIQUE from (serial, waveform_key) [or any - # waveform_key-based variant] to (serial, timestamp). - # Rationale: device key counter resets to 01110000 after every erase, so - # waveform_key is not a stable dedup field across erase cycles. The event - # timestamp (from the device clock) is the correct natural key. - row = conn.execute( - "SELECT sql FROM sqlite_master WHERE type='table' AND name='events'" - ).fetchone() - if row and "UNIQUE(serial, timestamp)" not in row[0]: - log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)") - conn.executescript(""" - ALTER TABLE events RENAME TO events_old; - - CREATE TABLE events ( - id TEXT PRIMARY KEY, - serial TEXT NOT NULL, - waveform_key TEXT NOT NULL, - session_id TEXT, - timestamp TEXT, - tran_ppv REAL, - vert_ppv REAL, - long_ppv REAL, - peak_vector_sum REAL, - mic_ppv REAL, - project TEXT, - client TEXT, - operator TEXT, - sensor_location TEXT, - sample_rate INTEGER, - record_type TEXT, - false_trigger INTEGER NOT NULL DEFAULT 0, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - UNIQUE(serial, timestamp) - ); - - INSERT OR IGNORE INTO events SELECT * FROM events_old; - DROP TABLE events_old; - - CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); - CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); - CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); - """) - log.info("_migrate: events table rebuilt OK") - - # Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to - # (serial, start_time) — same reasoning as events. - row = conn.execute( - "SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'" - ).fetchone() - if row and "UNIQUE(serial, start_time)" not in row[0]: - log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)") - conn.executescript(""" - ALTER TABLE monitor_log RENAME TO monitor_log_old; - - CREATE TABLE monitor_log ( - id TEXT PRIMARY KEY, - serial TEXT NOT NULL, - waveform_key TEXT NOT NULL, - session_id TEXT, - start_time TEXT, - stop_time TEXT, - duration_seconds REAL, - geo_threshold_ips REAL, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - UNIQUE(serial, start_time) - ); - - INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old; - DROP TABLE monitor_log_old; - - CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); - CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); - CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); - """) - log.info("_migrate: monitor_log table rebuilt OK") - - # Migration 3: add waveform_blob column to events (nullable TEXT). - # ALter TABLE ADD COLUMN is safe in SQLite for nullable columns — no rebuild needed. - col_names = { - row[1] - for row in conn.execute("PRAGMA table_info(events)").fetchall() - } - if "waveform_blob" not in col_names: - log.info("_migrate: adding waveform_blob column to events") - conn.execute("ALTER TABLE events ADD COLUMN waveform_blob TEXT") - log.info("_migrate: waveform_blob column added OK") - - @staticmethod - def _iso(dt: Optional[datetime.datetime]) -> Optional[str]: - return dt.isoformat() if dt is not None else None - - @staticmethod - def _new_id() -> str: - return str(uuid.uuid4()) - - # ── ACH sessions ────────────────────────────────────────────────────────── - - def insert_ach_session( - self, - *, - serial: str, - peer: Optional[str] = None, - events_downloaded: int = 0, - monitor_entries: int = 0, - duration_seconds: Optional[float] = None, - session_time: Optional[datetime.datetime] = None, - ) -> str: - """Insert a new ACH session row. Returns the new session UUID.""" - sid = self._new_id() - ts = self._iso(session_time or datetime.datetime.utcnow()) - with self._connect() as conn: - conn.execute( - """ - INSERT INTO ach_sessions - (id, serial, session_time, peer, - events_downloaded, monitor_entries, duration_seconds) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - (sid, serial, ts, peer, - events_downloaded, monitor_entries, duration_seconds), - ) - log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d", - sid, serial, events_downloaded, monitor_entries) - return sid - - def get_sessions( - self, - serial: Optional[str] = None, - limit: int = 50, - ) -> list[dict]: - """Return recent ACH sessions, newest first.""" - with self._connect() as conn: - if serial: - rows = conn.execute( - "SELECT * FROM ach_sessions WHERE serial=? " - "ORDER BY session_time DESC LIMIT ?", - (serial, limit), - ).fetchall() - else: - rows = conn.execute( - "SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?", - (limit,), - ).fetchall() - return [dict(r) for r in rows] - - # ── Events ──────────────────────────────────────────────────────────────── - - def insert_events( - self, - events: list[Event], - *, - serial: str, - session_id: Optional[str] = None, - waveform_blobs: Optional[dict[str, str]] = None, - ) -> tuple[int, int]: - """ - Insert triggered events. Silently skips duplicates (serial+timestamp). - Returns (inserted, skipped). - - waveform_blobs: optional mapping of waveform_key (hex str) → JSON string - containing the full waveform response (channels + metadata). When provided, - the blob is stored alongside the event row and is retrievable via - GET /db/events/{id}/waveform. - """ - inserted = skipped = 0 - blobs = waveform_blobs or {} - with self._connect() as conn: - for ev in events: - key = ev._waveform_key.hex() if ev._waveform_key else None - if key is None: - skipped += 1 - continue - - ts = None - if ev.timestamp: - try: - ts = datetime.datetime( - ev.timestamp.year, ev.timestamp.month, ev.timestamp.day, - ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second, - ).isoformat() - except Exception: - ts = str(ev.timestamp) - - pv = ev.peak_values - pi = ev.project_info - blob = blobs.get(key) - - try: - conn.execute( - """ - INSERT INTO events - (id, serial, waveform_key, session_id, timestamp, - tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, - project, client, operator, sensor_location, - sample_rate, record_type, waveform_blob) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - self._new_id(), serial, key, session_id, ts, - pv.tran if pv else None, - pv.vert if pv else None, - pv.long if pv else None, - pv.peak_vector_sum if pv else None, - pv.micl if pv else None, - pi.project if pi else None, - pi.client if pi else None, - pi.operator if pi else None, - pi.sensor_location if pi else None, - ev.sample_rate, - ev.record_type, - blob, - ), - ) - inserted += 1 - except sqlite3.IntegrityError: - skipped += 1 - - log.debug("insert_events serial=%s inserted=%d skipped=%d", - serial, inserted, skipped) - return inserted, skipped - - def query_events( - self, - serial: Optional[str] = None, - from_dt: Optional[datetime.datetime] = None, - to_dt: Optional[datetime.datetime] = None, - false_trigger: Optional[bool] = None, - limit: int = 500, - offset: int = 0, - ) -> list[dict]: - """Query events with optional filters. Returns newest first.""" - clauses: list[str] = [] - params: list = [] - - if serial: - clauses.append("serial = ?") - params.append(serial) - if from_dt: - clauses.append("timestamp >= ?") - params.append(from_dt.isoformat()) - if to_dt: - clauses.append("timestamp <= ?") - params.append(to_dt.isoformat()) - if false_trigger is not None: - clauses.append("false_trigger = ?") - params.append(1 if false_trigger else 0) - - where = ("WHERE " + " AND ".join(clauses)) if clauses else "" - params += [limit, offset] - - with self._connect() as conn: - rows = conn.execute( - f"SELECT * FROM events {where} " - f"ORDER BY timestamp DESC LIMIT ? OFFSET ?", - params, - ).fetchall() - return [dict(r) for r in rows] - - def set_false_trigger(self, event_id: str, value: bool) -> bool: - """Set or clear the false_trigger flag on an event. Returns True if found.""" - with self._connect() as conn: - cur = conn.execute( - "UPDATE events SET false_trigger=? WHERE id=?", - (1 if value else 0, event_id), - ) - return cur.rowcount > 0 - - def get_event_waveform(self, event_id: str) -> tuple[bool, Optional[str]]: - """ - Return (found, waveform_blob) for a given event UUID. - - found=False means the event row doesn't exist. - found=True, blob=None means the event exists but has no stored waveform - (e.g. downloaded before waveform storage was implemented). - found=True, blob= means the full waveform JSON is available. - """ - with self._connect() as conn: - row = conn.execute( - "SELECT waveform_blob FROM events WHERE id = ?", (event_id,) - ).fetchone() - if row is None: - return False, None - return True, row["waveform_blob"] - - # ── Monitor log ─────────────────────────────────────────────────────────── - - def insert_monitor_log( - self, - entries: list[MonitorLogEntry], - *, - session_id: Optional[str] = None, - ) -> tuple[int, int]: - """ - Insert monitor log entries. Silently skips duplicates (serial+start_time). - Returns (inserted, skipped). - """ - inserted = skipped = 0 - with self._connect() as conn: - for e in entries: - try: - conn.execute( - """ - INSERT INTO monitor_log - (id, serial, waveform_key, session_id, - start_time, stop_time, duration_seconds, - geo_threshold_ips) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - self._new_id(), - e.serial or "", - e.key, - session_id, - self._iso(e.start_time), - self._iso(e.stop_time), - e.duration_seconds, - e.geo_threshold_ips, - ), - ) - inserted += 1 - except sqlite3.IntegrityError: - skipped += 1 - - log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped) - return inserted, skipped - - def query_monitor_log( - self, - serial: Optional[str] = None, - from_dt: Optional[datetime.datetime] = None, - to_dt: Optional[datetime.datetime] = None, - limit: int = 500, - offset: int = 0, - ) -> list[dict]: - """Query monitor log entries with optional filters. Returns newest first.""" - clauses: list[str] = [] - params: list = [] - - if serial: - clauses.append("serial = ?") - params.append(serial) - if from_dt: - clauses.append("start_time >= ?") - params.append(from_dt.isoformat()) - if to_dt: - clauses.append("start_time <= ?") - params.append(to_dt.isoformat()) - - where = ("WHERE " + " AND ".join(clauses)) if clauses else "" - params += [limit, offset] - - with self._connect() as conn: - rows = conn.execute( - f"SELECT * FROM monitor_log {where} " - f"ORDER BY start_time DESC LIMIT ? OFFSET ?", - params, - ).fetchall() - return [dict(r) for r in rows] - - # ── Fleet overview ──────────────────────────────────────────────────────── - - def query_units(self) -> list[dict]: - """ - Return one row per known serial with summary stats: - last_seen, total_events, total_monitor_entries. - """ - with self._connect() as conn: - rows = conn.execute( - """ - SELECT - s.serial, - MAX(s.session_time) AS last_seen, - SUM(s.events_downloaded) AS total_events, - SUM(s.monitor_entries) AS total_monitor_entries, - COUNT(*) AS total_sessions - FROM ach_sessions s - GROUP BY s.serial - ORDER BY last_seen DESC - """ - ).fetchall() - return [dict(r) for r in rows] +""" +sfm/database.py — SQLite persistence layer for seismo-relay. + +Three tables, all keyed by unit serial number: + + ach_sessions — one row per inbound ACH call-home + events — one row per triggered waveform event (deduped by serial+timestamp) + monitor_log — one row per monitoring interval (deduped by serial+start_time) + +The DB file lives at: + /seismo_relay.db (default: bridges/captures/seismo_relay.db) + +Usage +----- + from sfm.database import SeismoDb + + db = SeismoDb("bridges/captures/seismo_relay.db") + + # Write a call-home session + session_id = db.insert_ach_session(serial="BE11529", peer="1.2.3.4:51920", + events_downloaded=3, monitor_entries=2, + duration_seconds=47.3) + + # Write events (silently skips duplicates) + db.insert_events(events, serial="BE11529", session_id=session_id) + + # Write monitor log entries + db.insert_monitor_log(entries, session_id=session_id) + + # Query + rows = db.query_events(serial="BE11529", from_dt=datetime(...), to_dt=datetime(...)) +""" + +from __future__ import annotations + +import datetime +import logging +import sqlite3 +import uuid +from pathlib import Path +from typing import Optional + +from minimateplus.models import Event, MonitorLogEntry + +log = logging.getLogger("sfm.database") + +# ── Schema ───────────────────────────────────────────────────────────────────── + +_SCHEMA = """ +PRAGMA journal_mode = WAL; +PRAGMA foreign_keys = ON; + +CREATE TABLE IF NOT EXISTS ach_sessions ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + session_time TEXT NOT NULL, -- ISO-8601 UTC + peer TEXT, -- "ip:port" + events_downloaded INTEGER NOT NULL DEFAULT 0, + monitor_entries INTEGER NOT NULL DEFAULT 0, + duration_seconds REAL, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); +CREATE INDEX IF NOT EXISTS idx_ach_sessions_serial ON ach_sessions(serial); +CREATE INDEX IF NOT EXISTS idx_ach_sessions_time ON ach_sessions(session_time); + +CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) + session_id TEXT, -- FK → ach_sessions.id + timestamp TEXT, -- ISO-8601 local time from device + tran_ppv REAL, -- in/s + vert_ppv REAL, -- in/s + long_ppv REAL, -- in/s + peak_vector_sum REAL, -- in/s + mic_ppv REAL, -- psi or dB depending on setup + project TEXT, + client TEXT, + operator TEXT, + sensor_location TEXT, + sample_rate INTEGER, + record_type TEXT, -- "single_shot" | "continuous" + false_trigger INTEGER NOT NULL DEFAULT 0, -- 0=no, 1=yes (manual flag) + waveform_blob TEXT, -- JSON waveform response (channels + metadata) + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, timestamp) +); +CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); +CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); +CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); + +CREATE TABLE IF NOT EXISTS monitor_log ( + id TEXT PRIMARY KEY, -- UUID + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, -- 8-hex device key (dedup field) + session_id TEXT, -- FK → ach_sessions.id + start_time TEXT, -- ISO-8601 + stop_time TEXT, -- ISO-8601 + duration_seconds REAL, + geo_threshold_ips REAL, -- in/s + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, start_time) +); +CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); +CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); +CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); +""" + + +# ── SeismoDb class ───────────────────────────────────────────────────────────── + +class SeismoDb: + """ + Thin SQLite wrapper for seismo-relay persistence. + + Thread-safe: each call opens, uses, and closes a connection with + check_same_thread=False and WAL mode enabled. For the ACH server's + single-writer / occasional-reader pattern this is more than sufficient. + """ + + def __init__(self, db_path: str | Path) -> None: + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init_schema() + log.info("SeismoDb initialised at %s", self.db_path) + + # ── Internal helpers ─────────────────────────────────────────────────────── + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA foreign_keys = ON") + return conn + + def _init_schema(self) -> None: + with self._connect() as conn: + conn.executescript(_SCHEMA) + self._migrate(conn) + + def _migrate(self, conn: sqlite3.Connection) -> None: + """Apply in-place schema migrations for existing databases.""" + + # Migration 1: change events UNIQUE from (serial, waveform_key) [or any + # waveform_key-based variant] to (serial, timestamp). + # Rationale: device key counter resets to 01110000 after every erase, so + # waveform_key is not a stable dedup field across erase cycles. The event + # timestamp (from the device clock) is the correct natural key. + row = conn.execute( + "SELECT sql FROM sqlite_master WHERE type='table' AND name='events'" + ).fetchone() + if row and "UNIQUE(serial, timestamp)" not in row[0]: + log.info("_migrate: rebuilding events table — UNIQUE(serial, timestamp)") + conn.executescript(""" + ALTER TABLE events RENAME TO events_old; + + CREATE TABLE events ( + id TEXT PRIMARY KEY, + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, + session_id TEXT, + timestamp TEXT, + tran_ppv REAL, + vert_ppv REAL, + long_ppv REAL, + peak_vector_sum REAL, + mic_ppv REAL, + project TEXT, + client TEXT, + operator TEXT, + sensor_location TEXT, + sample_rate INTEGER, + record_type TEXT, + false_trigger INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, timestamp) + ); + + INSERT OR IGNORE INTO events SELECT * FROM events_old; + DROP TABLE events_old; + + CREATE INDEX IF NOT EXISTS idx_events_serial ON events(serial); + CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp); + CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_id); + """) + log.info("_migrate: events table rebuilt OK") + + # Migration 2: change monitor_log UNIQUE from (serial, waveform_key) to + # (serial, start_time) — same reasoning as events. + row = conn.execute( + "SELECT sql FROM sqlite_master WHERE type='table' AND name='monitor_log'" + ).fetchone() + if row and "UNIQUE(serial, start_time)" not in row[0]: + log.info("_migrate: rebuilding monitor_log table — UNIQUE(serial, start_time)") + conn.executescript(""" + ALTER TABLE monitor_log RENAME TO monitor_log_old; + + CREATE TABLE monitor_log ( + id TEXT PRIMARY KEY, + serial TEXT NOT NULL, + waveform_key TEXT NOT NULL, + session_id TEXT, + start_time TEXT, + stop_time TEXT, + duration_seconds REAL, + geo_threshold_ips REAL, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + UNIQUE(serial, start_time) + ); + + INSERT OR IGNORE INTO monitor_log SELECT * FROM monitor_log_old; + DROP TABLE monitor_log_old; + + CREATE INDEX IF NOT EXISTS idx_monitor_log_serial ON monitor_log(serial); + CREATE INDEX IF NOT EXISTS idx_monitor_log_start ON monitor_log(start_time); + CREATE INDEX IF NOT EXISTS idx_monitor_log_session ON monitor_log(session_id); + """) + log.info("_migrate: monitor_log table rebuilt OK") + + # Migration 3: add waveform_blob column to events (nullable TEXT). + # ALter TABLE ADD COLUMN is safe in SQLite for nullable columns — no rebuild needed. + col_names = { + row[1] + for row in conn.execute("PRAGMA table_info(events)").fetchall() + } + if "waveform_blob" not in col_names: + log.info("_migrate: adding waveform_blob column to events") + conn.execute("ALTER TABLE events ADD COLUMN waveform_blob TEXT") + log.info("_migrate: waveform_blob column added OK") + + @staticmethod + def _iso(dt: Optional[datetime.datetime]) -> Optional[str]: + return dt.isoformat() if dt is not None else None + + @staticmethod + def _new_id() -> str: + return str(uuid.uuid4()) + + # ── ACH sessions ────────────────────────────────────────────────────────── + + def insert_ach_session( + self, + *, + serial: str, + peer: Optional[str] = None, + events_downloaded: int = 0, + monitor_entries: int = 0, + duration_seconds: Optional[float] = None, + session_time: Optional[datetime.datetime] = None, + ) -> str: + """Insert a new ACH session row. Returns the new session UUID.""" + sid = self._new_id() + ts = self._iso(session_time or datetime.datetime.utcnow()) + with self._connect() as conn: + conn.execute( + """ + INSERT INTO ach_sessions + (id, serial, session_time, peer, + events_downloaded, monitor_entries, duration_seconds) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (sid, serial, ts, peer, + events_downloaded, monitor_entries, duration_seconds), + ) + log.debug("ach_session inserted: %s serial=%s events=%d monitor=%d", + sid, serial, events_downloaded, monitor_entries) + return sid + + def get_sessions( + self, + serial: Optional[str] = None, + limit: int = 50, + ) -> list[dict]: + """Return recent ACH sessions, newest first.""" + with self._connect() as conn: + if serial: + rows = conn.execute( + "SELECT * FROM ach_sessions WHERE serial=? " + "ORDER BY session_time DESC LIMIT ?", + (serial, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM ach_sessions ORDER BY session_time DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + # ── Events ──────────────────────────────────────────────────────────────── + + def insert_events( + self, + events: list[Event], + *, + serial: str, + session_id: Optional[str] = None, + waveform_blobs: Optional[dict[str, str]] = None, + ) -> tuple[int, int]: + """ + Insert triggered events. Silently skips duplicates (serial+timestamp). + Returns (inserted, skipped). + + waveform_blobs: optional mapping of waveform_key (hex str) → JSON string + containing the full waveform response (channels + metadata). When provided, + the blob is stored alongside the event row and is retrievable via + GET /db/events/{id}/waveform. + """ + inserted = skipped = 0 + blobs = waveform_blobs or {} + with self._connect() as conn: + for ev in events: + key = ev._waveform_key.hex() if ev._waveform_key else None + if key is None: + skipped += 1 + continue + + ts = None + if ev.timestamp: + try: + ts = datetime.datetime( + ev.timestamp.year, ev.timestamp.month, ev.timestamp.day, + ev.timestamp.hour, ev.timestamp.minute, ev.timestamp.second, + ).isoformat() + except Exception: + ts = str(ev.timestamp) + + pv = ev.peak_values + pi = ev.project_info + blob = blobs.get(key) + + try: + conn.execute( + """ + INSERT INTO events + (id, serial, waveform_key, session_id, timestamp, + tran_ppv, vert_ppv, long_ppv, peak_vector_sum, mic_ppv, + project, client, operator, sensor_location, + sample_rate, record_type, waveform_blob) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + self._new_id(), serial, key, session_id, ts, + pv.tran if pv else None, + pv.vert if pv else None, + pv.long if pv else None, + pv.peak_vector_sum if pv else None, + pv.micl if pv else None, + pi.project if pi else None, + pi.client if pi else None, + pi.operator if pi else None, + pi.sensor_location if pi else None, + ev.sample_rate, + ev.record_type, + blob, + ), + ) + inserted += 1 + except sqlite3.IntegrityError: + skipped += 1 + + log.debug("insert_events serial=%s inserted=%d skipped=%d", + serial, inserted, skipped) + return inserted, skipped + + def query_events( + self, + serial: Optional[str] = None, + from_dt: Optional[datetime.datetime] = None, + to_dt: Optional[datetime.datetime] = None, + false_trigger: Optional[bool] = None, + limit: int = 500, + offset: int = 0, + ) -> list[dict]: + """Query events with optional filters. Returns newest first.""" + clauses: list[str] = [] + params: list = [] + + if serial: + clauses.append("serial = ?") + params.append(serial) + if from_dt: + clauses.append("timestamp >= ?") + params.append(from_dt.isoformat()) + if to_dt: + clauses.append("timestamp <= ?") + params.append(to_dt.isoformat()) + if false_trigger is not None: + clauses.append("false_trigger = ?") + params.append(1 if false_trigger else 0) + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params += [limit, offset] + + with self._connect() as conn: + rows = conn.execute( + f"SELECT * FROM events {where} " + f"ORDER BY timestamp DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + + def set_false_trigger(self, event_id: str, value: bool) -> bool: + """Set or clear the false_trigger flag on an event. Returns True if found.""" + with self._connect() as conn: + cur = conn.execute( + "UPDATE events SET false_trigger=? WHERE id=?", + (1 if value else 0, event_id), + ) + return cur.rowcount > 0 + + def get_event_waveform(self, event_id: str) -> tuple[bool, Optional[str]]: + """ + Return (found, waveform_blob) for a given event UUID. + + found=False means the event row doesn't exist. + found=True, blob=None means the event exists but has no stored waveform + (e.g. downloaded before waveform storage was implemented). + found=True, blob= means the full waveform JSON is available. + """ + with self._connect() as conn: + row = conn.execute( + "SELECT waveform_blob FROM events WHERE id = ?", (event_id,) + ).fetchone() + if row is None: + return False, None + return True, row["waveform_blob"] + + # ── Monitor log ─────────────────────────────────────────────────────────── + + def insert_monitor_log( + self, + entries: list[MonitorLogEntry], + *, + session_id: Optional[str] = None, + ) -> tuple[int, int]: + """ + Insert monitor log entries. Silently skips duplicates (serial+start_time). + Returns (inserted, skipped). + """ + inserted = skipped = 0 + with self._connect() as conn: + for e in entries: + try: + conn.execute( + """ + INSERT INTO monitor_log + (id, serial, waveform_key, session_id, + start_time, stop_time, duration_seconds, + geo_threshold_ips) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + self._new_id(), + e.serial or "", + e.key, + session_id, + self._iso(e.start_time), + self._iso(e.stop_time), + e.duration_seconds, + e.geo_threshold_ips, + ), + ) + inserted += 1 + except sqlite3.IntegrityError: + skipped += 1 + + log.debug("insert_monitor_log inserted=%d skipped=%d", inserted, skipped) + return inserted, skipped + + def query_monitor_log( + self, + serial: Optional[str] = None, + from_dt: Optional[datetime.datetime] = None, + to_dt: Optional[datetime.datetime] = None, + limit: int = 500, + offset: int = 0, + ) -> list[dict]: + """Query monitor log entries with optional filters. Returns newest first.""" + clauses: list[str] = [] + params: list = [] + + if serial: + clauses.append("serial = ?") + params.append(serial) + if from_dt: + clauses.append("start_time >= ?") + params.append(from_dt.isoformat()) + if to_dt: + clauses.append("start_time <= ?") + params.append(to_dt.isoformat()) + + where = ("WHERE " + " AND ".join(clauses)) if clauses else "" + params += [limit, offset] + + with self._connect() as conn: + rows = conn.execute( + f"SELECT * FROM monitor_log {where} " + f"ORDER BY start_time DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + + # ── Fleet overview ──────────────────────────────────────────────────────── + + def query_units(self) -> list[dict]: + """ + Return one row per known serial with summary stats: + last_seen, total_events, total_monitor_entries. + """ + with self._connect() as conn: + rows = conn.execute( + """ + SELECT + s.serial, + MAX(s.session_time) AS last_seen, + SUM(s.events_downloaded) AS total_events, + SUM(s.monitor_entries) AS total_monitor_entries, + COUNT(*) AS total_sessions + FROM ach_sessions s + GROUP BY s.serial + ORDER BY last_seen DESC + """ + ).fetchall() + return [dict(r) for r in rows] diff --git a/sfm/sfm_webapp.html b/sfm/sfm_webapp.html index 1e83c75..ad715a8 100644 --- a/sfm/sfm_webapp.html +++ b/sfm/sfm_webapp.html @@ -782,6 +782,12 @@ +
@@ -793,6 +799,14 @@
PVS
+ + +
@@ -1051,6 +1065,7 @@ let eventList = []; let currentEvent = 0; let charts = {}; let geoRange = 6.206; +let lastWaveformData = null; // last successfully rendered waveform payload const DBL_REF = 2.9e-9; // 20 µPa in psi — reference pressure for dBL const CHANNEL_COLORS = { Tran:'#58a6ff', Vert:'#3fb950', Long:'#d29922', Mic:'#bc8cff' }; @@ -1512,13 +1527,14 @@ function updatePeaksBar(ev) { async function loadWaveform() { if (!devHost()) { setStatus('Enter device host first.', 'error'); return; } - const idx = currentEvent; + const idx = currentEvent; + const force = document.getElementById('force-reload')?.checked ? '&force=true' : ''; document.getElementById('load-btn').disabled = true; setStatus('Fetching waveform…', 'loading'); let data; try { - const r = await fetch(`${api()}/device/event/${idx}/waveform?${deviceParams()}`); + const r = await fetch(`${api()}/device/event/${idx}/waveform?${deviceParams()}${force}`); if (!r.ok) { const e = await r.json().catch(()=>({})); throw new Error(e.detail || r.statusText); } data = await r.json(); } catch(e) { @@ -1527,46 +1543,41 @@ async function loadWaveform() { return; } + lastWaveformData = data; renderWaveform(data); document.getElementById('load-btn').disabled = false; } -function renderWaveform(data) { +// ── Shared waveform chart builder ────────────────────────────────────────────── +// Renders waveform channel charts into chartsEl, destroys+replaces instances in +// chartsStore. emptyEl (optional) is shown/hidden based on decoded sample count. +function _buildWaveformCharts(data, chartsEl, emptyEl, chartsStore) { const sr = data.sample_rate || 1024; const pretrig = data.pretrig_samples || 0; const decoded = data.samples_decoded || 0; - const total = data.total_samples || decoded; const channels = data.channels || {}; - // Status bar - const bar = document.getElementById('status-bar'); - bar.innerHTML = ''; - bar.className = 'ok'; - const ts = data.timestamp; - bar.textContent = ts ? `Event #${data.index} — ${ts.display} ` : `Event #${data.index} `; - addPill(`${data.record_type || '?'}`); - addPill(`${sr} sps`); - addPill(`${decoded.toLocaleString()} / ${total.toLocaleString()} samples`); - addPill(`pretrig ${pretrig}`); - addPill(`${data.rectime_seconds ?? '?'} s`); + // Destroy old chart instances + Object.values(chartsStore).forEach(c => c.destroy()); + for (const k in chartsStore) delete chartsStore[k]; if (decoded === 0) { - document.getElementById('empty-state').style.display = 'flex'; - document.getElementById('empty-state').querySelector('p').textContent = - data.record_type === 'Waveform' + if (emptyEl) { + emptyEl.style.display = 'flex'; + const p = emptyEl.querySelector('p'); + if (p) p.textContent = data.record_type === 'Waveform' ? 'No samples decoded — check server logs' : `Record type "${data.record_type}" — waveform not supported yet`; - document.getElementById('charts').style.display = 'none'; - Object.values(charts).forEach(c => c.destroy()); charts = {}; + } + chartsEl.style.display = 'none'; + chartsEl.innerHTML = ''; return; } const times = Array.from({length: decoded}, (_, i) => ((i - pretrig) / sr * 1000).toFixed(2)); - document.getElementById('empty-state').style.display = 'none'; - const chartsDiv = document.getElementById('charts'); - chartsDiv.style.display = 'flex'; - chartsDiv.innerHTML = ''; - Object.values(charts).forEach(c => c.destroy()); charts = {}; + if (emptyEl) emptyEl.style.display = 'none'; + chartsEl.style.display = 'flex'; + chartsEl.innerHTML = ''; const micPeakPsi = data.peak_values?.micl_psi ?? null; @@ -1618,9 +1629,9 @@ function renderWaveform(data) { const cw = document.createElement('div'); cw.className = 'chart-canvas-wrap'; const canvas = document.createElement('canvas'); - cw.appendChild(canvas); wrap.appendChild(cw); chartsDiv.appendChild(wrap); + cw.appendChild(canvas); wrap.appendChild(cw); chartsEl.appendChild(wrap); - charts[ch] = new Chart(canvas, { + chartsStore[ch] = new Chart(canvas, { type: 'line', data: { labels: rTimes, datasets: [{ data: rData, borderColor: color, borderWidth: 1, pointRadius: 0, tension: 0 }] }, options: { @@ -1655,6 +1666,64 @@ function renderWaveform(data) { } } +function renderWaveform(data) { + const sr = data.sample_rate || 1024; + const pretrig = data.pretrig_samples || 0; + const decoded = data.samples_decoded || 0; + const total = data.total_samples || decoded; + lastWaveformData = data; + + // Status bar + const bar = document.getElementById('status-bar'); + bar.innerHTML = ''; + bar.className = 'ok'; + const ts = data.timestamp; + bar.textContent = ts ? `Event #${data.index} — ${ts.display} ` : `Event #${data.index} `; + addPill(`${data.record_type || '?'}`); + addPill(`${sr} sps`); + addPill(`${decoded.toLocaleString()} / ${total.toLocaleString()} samples`); + addPill(`pretrig ${pretrig}`); + addPill(`${data.rectime_seconds ?? '?'} s`); + + _buildWaveformCharts(data, document.getElementById('charts'), document.getElementById('empty-state'), charts); + updateDebugPanel(data); +} + +// ── Debug panel population ───────────────────────────────────────────────────── +function _fillDebugPanel(data, dbg, cont) { + if (!dbg || !cont) return; + const channels = data.channels || {}; + const pv = data.peak_values || {}; + const scale = geoRange / 32767; + const geoChans = ['Tran', 'Vert', 'Long']; + + let html = '
'; + for (const ch of [...geoChans, 'Mic']) { + const raw = (channels[ch] || []).slice(0, 8); + if (raw.length === 0) continue; + const maxAbs = Math.max(...raw.map(Math.abs)); + const keyMap = { Tran:'tran_in_s', Vert:'vert_in_s', Long:'long_in_s' }; + const p0c = ch !== 'Mic' ? (pv[keyMap[ch]] ?? null) : null; + const src = p0c !== null ? `0C=${p0c.toFixed(4)}` + : `Math.max≈${(maxAbs*scale).toFixed(4)}`; + html += `
${ch} raw[0:8]: ${raw.join(', ')} peak: ${src}
`; + } + html += '
'; + + const nullPeaks = geoChans.filter(ch => (pv[{ Tran:'tran_in_s', Vert:'vert_in_s', Long:'long_in_s' }[ch]] ?? null) === null); + if (nullPeaks.length > 0) { + html += `
⚠ peak0C null for: ${nullPeaks.join(', ')} — peaks shown are Math.max of waveform samples, not 0C record
`; + } + html += `
decoded=${data.samples_decoded} total=${data.total_samples} pretrig=${data.pretrig_samples} sr=${data.sample_rate} geoRange=${geoRange.toFixed(3)}
`; + + cont.innerHTML = html; + dbg.style.display = 'block'; +} + +function updateDebugPanel(data) { + _fillDebugPanel(data, document.getElementById('debug-panel'), document.getElementById('debug-content')); +} + // ── DB tabs ──────────────────────────────────────────────────────────────────── let histLoaded = false; let unitsLoaded = false; @@ -1757,9 +1826,8 @@ async function loadHistory() { const tr = document.createElement('tr'); const pvs = ev.peak_vector_sum; const maxPPV = Math.max(ev.tran_ppv ?? 0, ev.vert_ppv ?? 0, ev.long_ppv ?? 0); - const waveformUrl = `${api()}/waveform?db_id=${encodeURIComponent(ev.id)}&api_base=${encodeURIComponent(api())}`; tr.innerHTML = ` - + ${_fmtTs(ev.timestamp)} ${ev.serial ?? '—'} ${_ppvFmt(ev.tran_ppv)} @@ -1934,9 +2002,86 @@ async function loadSessions() { } } +// ── DB waveform modal ───────────────────────────────────────────────────────── +let modalCharts = {}; + +async function openDbWaveformModal(id) { + const modal = document.getElementById('wf-modal'); + const titleEl = document.getElementById('wf-modal-title'); + const chartsEl = document.getElementById('wf-modal-charts'); + const emptyEl = document.getElementById('wf-modal-empty'); + const peaksEl = document.getElementById('wf-modal-peaks'); + const debugEl = document.getElementById('wf-modal-debug'); + + // Show modal in loading state + titleEl.textContent = 'Loading…'; + peaksEl.classList.remove('visible'); + if (debugEl) debugEl.style.display = 'none'; + chartsEl.style.display = 'none'; + chartsEl.innerHTML = ''; + emptyEl.style.display = 'flex'; + emptyEl.querySelector('p').textContent = 'Loading waveform…'; + modal.style.display = 'flex'; + + let data; + try { + const r = await fetch(`${api()}/db/events/${encodeURIComponent(id)}/waveform`); + if (!r.ok) { const e = await r.json().catch(()=>({})); throw new Error(e.detail || r.statusText); } + data = await r.json(); + } catch(e) { + emptyEl.querySelector('p').textContent = `Error: ${e.message}`; + return; + } + + // Normalize old blob peak_values keys (pre-fix ACH blobs used tran/vert/long without _in_s) + if (data.peak_values) { + const pv = data.peak_values; + if (pv.tran_in_s == null && pv.tran != null) pv.tran_in_s = pv.tran; + if (pv.vert_in_s == null && pv.vert != null) pv.vert_in_s = pv.vert; + if (pv.long_in_s == null && pv.long != null) pv.long_in_s = pv.long; + } + + // Header — DB blobs have timestamp as ISO string; live device returns {display:...} + const sr = data.sample_rate || 1024; + const decoded = data.samples_decoded || 0; + const total = data.total_samples || decoded; + const pretrig = data.pretrig_samples || 0; + let tsStr = ''; + if (data.timestamp) { + const tsDisplay = typeof data.timestamp === 'object' + ? (data.timestamp.display || String(data.timestamp)) + : new Date(data.timestamp).toLocaleString(); + tsStr = `${tsDisplay} `; + } + titleEl.innerHTML = `${tsStr}${data.record_type || '?'} · ${sr} sps · ${decoded.toLocaleString()} / ${total.toLocaleString()} samples · pretrig ${pretrig} · ${data.rectime_seconds ?? '?'} s`; + + // Peaks bar + const pv = data.peak_values || {}; + const micDbl = pv.micl_psi != null && pv.micl_psi > 0 ? 20 * Math.log10(pv.micl_psi / DBL_REF) : null; + document.getElementById('wf-mpk-tran').textContent = pv.tran_in_s != null ? `${pv.tran_in_s.toFixed(5)} in/s` : '—'; + document.getElementById('wf-mpk-vert').textContent = pv.vert_in_s != null ? `${pv.vert_in_s.toFixed(5)} in/s` : '—'; + document.getElementById('wf-mpk-long').textContent = pv.long_in_s != null ? `${pv.long_in_s.toFixed(5)} in/s` : '—'; + document.getElementById('wf-mpk-mic').textContent = micDbl != null ? `${micDbl.toFixed(1)} dBL` : '—'; + document.getElementById('wf-mpk-pvs').textContent = pv.peak_vector_sum != null ? `${pv.peak_vector_sum.toFixed(5)} in/s` : '—'; + peaksEl.classList.add('visible'); + + _buildWaveformCharts(data, chartsEl, emptyEl, modalCharts); + _fillDebugPanel(data, debugEl, document.getElementById('wf-modal-debug-content')); +} + +function closeWfModal() { + const modal = document.getElementById('wf-modal'); + if (!modal || modal.style.display === 'none') return; + modal.style.display = 'none'; + // Destroy chart instances to free canvas memory + Object.values(modalCharts).forEach(c => c.destroy()); + for (const k in modalCharts) delete modalCharts[k]; +} + // ── Keyboard shortcuts ───────────────────────────────────────────────────────── document.addEventListener('keydown', e => { if (e.target.tagName === 'INPUT' || e.target.tagName === 'SELECT') return; + if (e.key === 'Escape') { closeWfModal(); return; } if (e.key === 'ArrowLeft') { stepEvent(-1); e.preventDefault(); } if (e.key === 'ArrowRight') { stepEvent(+1); e.preventDefault(); } }); @@ -1950,5 +2095,60 @@ document.getElementById('api-base').value = window.location.origin; document.getElementById(id)?.addEventListener('keydown', e => { if (e.key === 'Enter') connectUnit(); }); }); + + +