From f4ec6ef9453f75422f9841cccf520cd40c58b626 Mon Sep 17 00:00:00 2001 From: serversdown Date: Sat, 9 May 2026 00:03:31 +0000 Subject: [PATCH] feat(forward): SFM event forwarder (v1.5.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When SFM_FORWARD_ENABLED=true and SFM_URL is set, every Blastware event binary in the ACH watch folder is forwarded to an SFM server's /db/import/blastware_file endpoint as a multipart POST. The paired .TXT ASCII report (which Blastware's ACH writes alongside each event) is shipped in the same request, letting the SFM server index the full per-channel stats — PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail per channel, and monitor-log timestamps — without depending on the still-undecoded BW waveform body codec. New module event_forwarder.py: - is_event_binary() filename matcher (BW's

. scheme; rejects .MLG, .TXT, .log, .ini, .h5, etc.) - ForwardState (.json file keyed by sha256 — idempotent across restarts and auto-updates) - find_pending_events() with quiescence + grace-period guards - Hand-rolled multipart encoder (stdlib-only) - forward_event_pair() / forward_pending() — POST loop with structured per-event outcomes Wired into series3_watcher.run_watcher() on its own cadence (SFM_FORWARD_INTERVAL_SECONDS, default 60s) so it doesn't slow the existing 5-min heartbeat scan. Default-off: existing 1.4.x deployments keep their old behaviour after auto-updating until an operator sets SFM_URL + SFM_FORWARD_ENABLED=true and restarts. 17 unit tests in test_event_forwarder.py cover filename matching, state idempotency, scan logic (quiescence, grace, max age, already-forwarded, .TXT pairing), multipart byte shape, and an end-to-end POST against a tiny stdlib http.server fake. Bumps version 1.4.4 → 1.5.0 (minor — additive feature, no API break). Requires SFM server v0.16+ for the paired-.TXT import endpoint. --- CHANGELOG.md | 23 ++ README.md | 20 +- config-template.ini | 31 +++ event_forwarder.py | 504 ++++++++++++++++++++++++++++++++++++++++ installer.iss | 2 +- series3_tray.py | 2 +- series3_watcher.py | 101 +++++++- test_event_forwarder.py | 374 +++++++++++++++++++++++++++++ 8 files changed, 1052 insertions(+), 5 deletions(-) create mode 100644 event_forwarder.py create mode 100644 test_event_forwarder.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 174b4dc..1daa983 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- +## [1.5.0] - 2026-05-09 + +### Added +- **SFM event forwarder.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every Blastware event binary is forwarded to an SFM server's `/db/import/blastware_file` endpoint as a multipart POST. The corresponding `.TXT` ASCII report (which Blastware's ACH writes alongside each event) is paired by filename and shipped in the same request, letting the SFM server index the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) without depending on the still-undecoded Blastware waveform body codec. +- **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates. +- **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's `.TXT` partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever. +- New `event_forwarder.py` module + 17 unit tests in `test_event_forwarder.py` covering filename matching, state idempotency, scan logic, multipart encoding, and a fake-server end-to-end POST. + +### Configuration + +New `[agent]` keys (all default-off — existing 1.4.x deployments don't change behaviour on auto-update): +- `SFM_FORWARD_ENABLED` (default `false`) +- `SFM_URL` (e.g. `http://10.0.0.44:8200`) +- `SFM_FORWARD_INTERVAL_SECONDS` (default `60`) +- `SFM_QUIESCENCE_SECONDS` (default `5`) +- `SFM_MISSING_REPORT_GRACE_SECONDS` (default `60`) +- `SFM_HTTP_TIMEOUT` (default `60`) +- `SFM_STATE_FILE` (default: `/sfm_forwarded.json`) + +### Compatibility + +- Requires SFM server v0.16+ (the `/db/import/blastware_file` endpoint that accepts paired `.TXT` reports — released alongside this watcher version on the seismo-relay side). + ## [1.4.4] - 2026-03-17 ### Removed diff --git a/README.md b/README.md index 4f5af2e..8d80489 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Series 3 Watcher v1.4.4 +# Series 3 Watcher v1.5.0 Monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. Runs as a **system tray app** that starts automatically on login, reports heartbeats to terra-view, and self-updates from Gitea. @@ -88,6 +88,22 @@ All settings live in `config.ini`. The Setup Wizard covers every field, but here | `UPDATE_SOURCE` | `gitea` (default) or `url` — where to check for updates | | `UPDATE_URL` | Base URL of the update server when `UPDATE_SOURCE = url` (e.g. terra-view URL). The watcher fetches `/api/updates/series3-watcher/version.txt` and `/api/updates/series3-watcher/series3-watcher.exe` from this base. | +### SFM Event Forwarder (v1.5.0+) + +Forwards each Blastware event binary (and its paired `.TXT` ASCII report when present) to an SFM server's `/db/import/blastware_file` endpoint, where the report is parsed and the rich per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, sensor self-check) land in a searchable database. **Default-off** — existing deployments keep their old behaviour after auto-updating until the operator opts in. + +| Key | Description | +|-----|-------------| +| `SFM_FORWARD_ENABLED` | `true` to enable forwarding (default `false`) | +| `SFM_URL` | Base URL of the SFM server, e.g. `http://10.0.0.44:8200` | +| `SFM_FORWARD_INTERVAL_SECONDS` | Scan-and-forward cadence (default `60`); independent of the heartbeat interval | +| `SFM_QUIESCENCE_SECONDS` | Skip files modified within this many seconds (default `5`) — avoids forwarding mid-write | +| `SFM_MISSING_REPORT_GRACE_SECONDS` | If a `.TXT` partner hasn't appeared after this many seconds, forward the binary alone (default `60`) | +| `SFM_HTTP_TIMEOUT` | Per-request HTTP timeout in seconds (default `60`) | +| `SFM_STATE_FILE` | Path to the JSON state file tracking sha256 of forwarded events. Leave blank to default to `/sfm_forwarded.json` | + +Forwarded files are tracked by sha256 in the state file, so re-scans / restarts / auto-updates never re-POST the same content. A failed POST stays in the pending pool and is retried on the next interval. + --- ## Tray Icon @@ -121,7 +137,7 @@ To view connected watchers: **Settings → Developer → Watcher Manager**. ## Versioning -Follows **Semantic Versioning**. Current release: **v1.4.4**. +Follows **Semantic Versioning**. Current release: **v1.5.0**. See `CHANGELOG.md` for full history. --- diff --git a/config-template.ini b/config-template.ini index fd03b34..29ca234 100644 --- a/config-template.ini +++ b/config-template.ini @@ -32,3 +32,34 @@ UPDATE_SOURCE = gitea # If UPDATE_SOURCE = url, set UPDATE_URL to the base URL of the update server (e.g. terra-view) UPDATE_URL = +# --- SFM Event Forwarder --- +# When enabled, every Blastware event binary (and its paired .TXT +# report when present) is forwarded to an SFM server's +# /db/import/blastware_file endpoint as a multipart POST. The SFM +# server parses the .TXT and indexes the event's full per-channel +# stats (PPV, ZC Freq, Time of Peak, Peak Acceleration, Peak +# Displacement, sensor self-check) for sortable / filterable review. +# +# Default-off so existing deployments don't change behaviour after an +# auto-update. To enable on a field machine: set SFM_URL, then flip +# SFM_FORWARD_ENABLED to true and restart the watcher. +SFM_FORWARD_ENABLED = false +SFM_URL = ; e.g. http://10.0.0.44:8200 +SFM_FORWARD_INTERVAL_SECONDS = 60 ; scan-and-forward cadence (independent of heartbeat) + +# Files modified within the last N seconds are skipped (BW may still +# be writing them). Defence against truncated uploads. +SFM_QUIESCENCE_SECONDS = 5 + +# If a binary's .TXT report hasn't appeared after this many seconds, +# forward the binary alone rather than blocking forever waiting. +SFM_MISSING_REPORT_GRACE_SECONDS = 60 + +# Per-request HTTP timeout (seconds). +SFM_HTTP_TIMEOUT = 60 + +# Path to the JSON state file tracking which events have been +# forwarded (sha256-keyed, idempotent across restarts). Leave blank +# to default to /sfm_forwarded.json. +SFM_STATE_FILE = + diff --git a/event_forwarder.py b/event_forwarder.py new file mode 100644 index 0000000..9a3a08a --- /dev/null +++ b/event_forwarder.py @@ -0,0 +1,504 @@ +""" +event_forwarder.py — forward Blastware event files to an SFM server. + +Watches the same Blastware ACH folder the heartbeat path watches. +For each event binary that hasn't been forwarded yet, pairs it with +its `.TXT` report (when available) and POSTs both to SFM's +`/db/import/blastware_file` endpoint as one multipart request. + +The receiving SFM server (seismo-relay v0.16+) detects paired binaries +and reports by filename, parses the .TXT into structured fields +(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak +Displacement / sensor self-check / monitor log), and persists every +field into the SFM database for sortable / filterable monthly-summary +review. + +Design notes +──────────── +- **stdlib only.** Matches the rest of the watcher (`urllib.request`). + Multipart encoding is hand-rolled. +- **Idempotent across restarts.** Forwarded files are tracked by + sha256 in a JSON state file (`.forwarded.json` next to config.ini). + Re-scanning the watch folder doesn't re-POST anything. +- **Default-off.** Callers must enable via config + (`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x + deployments that auto-update to the new version stay non-forwarding + until an operator flips the switch. +- **Quiescence guard.** Files modified within the last few seconds + are skipped — Blastware ACH writes the .TXT after the binary, so + we wait until both look stable before forwarding. +- **Best-effort report pairing.** When the .TXT hasn't appeared yet + but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the + binary is forwarded alone (the SFM endpoint accepts that and just + skips the rich fields — we'd rather get the binary indexed than + block forever waiting for a TXT that never arrived). +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import time +import urllib.error +import urllib.request +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + + +log = logging.getLogger(__name__) + +# Default tuning. All overridable via config.ini SFM_* keys. +DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds +DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds +DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout +STATE_SCHEMA_VERSION = 1 + + +# ── Filename matching ───────────────────────────────────────────────────────── +# +# Blastware's filename scheme (confirmed in seismo-relay docs): +# prefix_letter (B–Z) + 3-digit serial-tail + 4-char base36 timestamp stem +# + "." + 3-or-4-char extension. +# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00. +# +# We accept lowercase too because some filesystems lower-case names. +_EVENT_FILENAME_RE = re.compile( + r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$" +) + +# Filenames we explicitly skip even if they happen to match the regex. +_NON_EVENT_EXTS = { + ".mlg", # monitor-log files (separate heartbeat path) + ".txt", # ASCII reports — handled via pairing, not as primary files + ".log", + ".ini", + ".dat", + ".bak", + ".tmp", + ".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence) + ".h5", + ".sfm.json", + ".json", +} + + +def is_event_binary(path: str) -> bool: + """Return True if `path`'s basename looks like a Blastware event binary.""" + name = os.path.basename(path) + if not _EVENT_FILENAME_RE.match(name): + return False + ext = os.path.splitext(name)[1].lower() + if ext in _NON_EVENT_EXTS: + return False + return True + + +def report_path_for(binary_path: str) -> str: + """Return the conventional `.TXT` partner path.""" + return binary_path + ".TXT" + + +# ── State file ──────────────────────────────────────────────────────────────── + + +class ForwardState: + """Idempotency record: which event files have we already forwarded? + + State file format (JSON): + + { + "version": 1, + "forwarded": { + "": { + "filename": "M529LK44.AB0", + "size": 4400, + "forwarded_at": "2026-05-08T...Z" + }, + ... + } + } + + Keyed by sha256 (not filename) so that re-saved or re-uploaded + identical content is recognised as already-forwarded even if the + file moved or got renamed. Filename is preserved for human + inspection. + """ + + def __init__(self, path: str): + self.path = path + self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}} + self._load() + + def _load(self) -> None: + try: + with open(self.path, "r", encoding="utf-8") as f: + d = json.load(f) + if not isinstance(d, dict): + raise ValueError("state file root is not an object") + if d.get("version") != STATE_SCHEMA_VERSION: + log.warning( + "forward state version mismatch (got %r, want %d) — starting fresh", + d.get("version"), STATE_SCHEMA_VERSION, + ) + return + forwarded = d.get("forwarded") + if isinstance(forwarded, dict): + self._data["forwarded"] = forwarded + except FileNotFoundError: + pass + except (OSError, ValueError, json.JSONDecodeError) as exc: + log.warning("failed to load forward state from %s: %s", self.path, exc) + + def _save(self) -> None: + tmp = self.path + ".tmp" + try: + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self._data, f, indent=2, sort_keys=True) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, self.path) + except OSError as exc: + log.warning("failed to save forward state to %s: %s", self.path, exc) + + def is_forwarded(self, sha256: str) -> bool: + return sha256 in self._data["forwarded"] + + def mark_forwarded(self, sha256: str, filename: str, size: int) -> None: + self._data["forwarded"][sha256] = { + "filename": filename, + "size": size, + "forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + } + self._save() + + def count(self) -> int: + return len(self._data["forwarded"]) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def sha256_of_file(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool: + """Return True if the file's mtime is at least `quiescence_seconds` + in the past — i.e. no longer being written.""" + try: + mtime = os.path.getmtime(path) + except OSError: + return False + return (now_ts - mtime) >= quiescence_seconds + + +# ── Scan pass ───────────────────────────────────────────────────────────────── + + +def find_pending_events( + watch_dir: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, +) -> List[Tuple[str, Optional[str]]]: + """ + Walk `watch_dir` and return the list of (binary_path, txt_path_or_None) + pairs that need forwarding. + + Filtering rules: + - Filename must match the BW event filename regex. + - File must be quiescent (mtime >= quiescence_seconds in the past). + - File must not exceed `max_age_days` (matches the heartbeat + path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the + forwarder). + - File's sha256 must NOT already be in the forwarded state. + - If a `.TXT` exists and is quiescent, we pair them. + Otherwise, if the binary is older than + missing_report_grace_seconds, we forward without the TXT. + Younger binaries with a missing TXT are deferred — let BW + finish writing the report. + """ + if not os.path.isdir(watch_dir): + log.warning("forward scan: watch dir not found: %s", watch_dir) + return [] + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + pending: List[Tuple[str, Optional[str]]] = [] + skipped_inflight = 0 + skipped_already_forwarded = 0 + + try: + with os.scandir(watch_dir) as it: + entries = list(it) + except OSError as exc: + log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc) + return [] + + # Cache existence of TXT partners so we don't stat() each twice. + names = {e.name for e in entries if e.is_file()} + + for e in entries: + if not e.is_file(): + continue + if not is_event_binary(e.path): + continue + + try: + mtime = e.stat().st_mtime + size = e.stat().st_size + except OSError: + continue + + # Out-of-window: too old or too fresh + if (now_ts - mtime) > max_age_seconds: + continue + if not _is_quiescent(e.path, now_ts, quiescence_seconds): + skipped_inflight += 1 + continue + + # Idempotency: skip if we already forwarded this content + try: + digest = sha256_of_file(e.path) + except OSError as exc: + log.warning("forward scan: sha256 failed for %s: %s", e.path, exc) + continue + if state.is_forwarded(digest): + skipped_already_forwarded += 1 + continue + + # TXT pairing + txt_name = e.name + ".TXT" + # Case-insensitive match on the .TXT suffix + if txt_name not in names: + txt_name_lc = txt_name.lower() + txt_name = next((n for n in names if n.lower() == txt_name_lc), None) + + txt_path: Optional[str] = None + if txt_name: + candidate = os.path.join(watch_dir, txt_name) + if _is_quiescent(candidate, now_ts, quiescence_seconds): + txt_path = candidate + # else: TXT is mid-write; treat as not-yet-paired and defer. + + if txt_path is None: + # No TXT (or not yet quiescent). Wait for the grace + # period before forwarding alone. + if (now_ts - mtime) < missing_report_grace_seconds: + skipped_inflight += 1 + continue + + pending.append((e.path, txt_path)) + # Stash size + digest on the tuple-replacement for use during forward; + # callers can re-derive but caching avoids a second sha256. + + log.debug( + "forward scan: %d pending skipped_inflight=%d already_forwarded=%d", + len(pending), skipped_inflight, skipped_already_forwarded, + ) + return pending + + +# ── Multipart upload ────────────────────────────────────────────────────────── + + +def _encode_multipart( + parts: List[Tuple[str, str, str, bytes]], +) -> Tuple[bytes, str]: + """Encode a list of (field_name, filename, content_type, data) tuples + as a multipart/form-data body. Returns (body_bytes, content_type + header value).""" + boundary = "----Series3WatcherBoundary" + os.urandom(8).hex() + chunks: List[bytes] = [] + for field_name, filename, content_type, data in parts: + chunks.append(("--" + boundary + "\r\n").encode("ascii")) + chunks.append( + (f'Content-Disposition: form-data; name="{field_name}"; ' + f'filename="{filename}"\r\n').encode("ascii") + ) + chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii")) + chunks.append(data) + chunks.append(b"\r\n") + chunks.append(("--" + boundary + "--\r\n").encode("ascii")) + body = b"".join(chunks) + content_type_hdr = f"multipart/form-data; boundary={boundary}" + return body, content_type_hdr + + +def _import_endpoint(sfm_url: str) -> str: + """Compose the import endpoint URL from a base SFM URL.""" + return sfm_url.rstrip("/") + "/db/import/blastware_file" + + +def forward_event_pair( + sfm_url: str, + binary_path: str, + txt_path: Optional[str], + *, + serial_hint: Optional[str] = None, + timeout: float = DEFAULT_HTTP_TIMEOUT, +) -> Dict[str, Any]: + """POST a single event (binary + optional .TXT) to the SFM import + endpoint. + + Returns a dict mirroring the per-file outcome the server returned + (see /db/import/blastware_file response.results[0]) on success, or + a dict with `status="error"` on transport/HTTP failure. + """ + binary_name = os.path.basename(binary_path) + with open(binary_path, "rb") as f: + binary_bytes = f.read() + + parts = [("files", binary_name, "application/octet-stream", binary_bytes)] + if txt_path is not None: + with open(txt_path, "rb") as f: + txt_bytes = f.read() + parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes)) + + body, content_type = _encode_multipart(parts) + + url = _import_endpoint(sfm_url) + if serial_hint: + sep = "&" if "?" in url else "?" + url = f"{url}{sep}serial={serial_hint}" + + req = urllib.request.Request( + url, data=body, method="POST", + headers={ + "Content-Type": content_type, + "Content-Length": str(len(body)), + "User-Agent": "series3-watcher/sfm-forwarder", + "Accept": "application/json", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8", errors="replace") + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return { + "status": "error", + "filename": binary_name, + "detail": f"server returned non-JSON: {raw[:200]!r}", + } + # Server returns {"count":N, "results":[{...}]}. Pull our row out. + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name and entry.get("status") == "ok": + return entry + # No matching ok row → propagate the first error we find + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name: + return entry + return { + "status": "error", + "filename": binary_name, + "detail": f"unexpected server response: {payload!r}", + } + except urllib.error.HTTPError as exc: + try: + body_excerpt = exc.read().decode("utf-8", errors="replace")[:300] + except Exception: + body_excerpt = "" + return { + "status": "error", + "filename": binary_name, + "detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}", + } + except urllib.error.URLError as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"connection error: {exc.reason}", + } + except (OSError, TimeoutError) as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"transport error: {exc}", + } + + +# ── Top-level orchestration ─────────────────────────────────────────────────── + + +def forward_pending( + watch_dir: str, + sfm_url: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + timeout: float = DEFAULT_HTTP_TIMEOUT, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """ + Run one full pass: find pending events, POST each one, update state. + + Returns a counts dict suitable for logging: + + { + "scanned": , # total event binaries seen + "forwarded": , # successfully POSTed this pass + "errors": , # POST failures (will retry next pass) + "with_report":, # of forwarded, how many had a paired TXT + } + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + pending = find_pending_events( + watch_dir, state, + max_age_days=max_age_days, + quiescence_seconds=quiescence_seconds, + missing_report_grace_seconds=missing_report_grace_seconds, + ) + + counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0} + + for binary_path, txt_path in pending: + result = forward_event_pair( + sfm_url, binary_path, txt_path, + timeout=timeout, + ) + if result.get("status") == "ok": + try: + digest = sha256_of_file(binary_path) + size = os.path.getsize(binary_path) + state.mark_forwarded(digest, os.path.basename(binary_path), size) + except OSError as exc: + _log(f"[forward] post-success state save failed for " + f"{os.path.basename(binary_path)}: {exc}") + counts["forwarded"] += 1 + if txt_path: + counts["with_report"] += 1 + _log( + f"[forward] OK {os.path.basename(binary_path)} " + f"({result.get('filesize', 0)}B, " + f"{'with' if txt_path else 'no'} report, " + f"inserted={result.get('inserted', 0)}, " + f"skipped={result.get('skipped', 0)})" + ) + else: + counts["errors"] += 1 + _log( + f"[forward] ERR {os.path.basename(binary_path)}: " + f"{result.get('detail', 'unknown error')}" + ) + + return counts diff --git a/installer.iss b/installer.iss index 1dd056a..1625c87 100644 --- a/installer.iss +++ b/installer.iss @@ -3,7 +3,7 @@ [Setup] AppName=Series 3 Watcher -AppVersion=1.4.4 +AppVersion=1.5.0 AppPublisher=Terra-Mechanics Inc. DefaultDirName={pf}\Series3Watcher DefaultGroupName=Series 3 Watcher diff --git a/series3_tray.py b/series3_tray.py index 2914440..555a2f6 100644 --- a/series3_tray.py +++ b/series3_tray.py @@ -1,5 +1,5 @@ """ -Series 3 Watcher — System Tray Launcher v1.4.4 +Series 3 Watcher — System Tray Launcher v1.5.0 Requires: pystray, Pillow, tkinter (stdlib) Run with: pythonw series3_tray.py (no console window) diff --git a/series3_watcher.py b/series3_watcher.py index b5881cf..ad93b70 100644 --- a/series3_watcher.py +++ b/series3_watcher.py @@ -80,6 +80,30 @@ def load_config(path: str) -> Dict[str, Any]: # Auto-updater source "UPDATE_SOURCE": get_str("UPDATE_SOURCE", "gitea"), "UPDATE_URL": get_str("UPDATE_URL", ""), + + # SFM event forwarder — when enabled, forwards each Blastware + # event binary (+ paired .TXT report when present) to an SFM + # server's /db/import/blastware_file endpoint. Default-off so + # existing 1.4.x deployments don't change behaviour on + # auto-update; operators flip it on by setting SFM_URL + + # SFM_FORWARD_ENABLED=true in config.ini. + "SFM_FORWARD_ENABLED": get_bool("SFM_FORWARD_ENABLED", False), + "SFM_URL": get_str("SFM_URL", ""), + "SFM_FORWARD_INTERVAL_SECONDS": get_int("SFM_FORWARD_INTERVAL_SECONDS", 60), + # Files modified within the last N seconds are skipped (BW may + # still be writing them). + "SFM_QUIESCENCE_SECONDS": get_int("SFM_QUIESCENCE_SECONDS", 5), + # If a binary's .TXT report hasn't appeared after this many + # seconds, forward the binary alone rather than blocking + # forever. + "SFM_MISSING_REPORT_GRACE_SECONDS": get_int( + "SFM_MISSING_REPORT_GRACE_SECONDS", 60 + ), + # Per-request HTTP timeout (seconds). + "SFM_HTTP_TIMEOUT": get_int("SFM_HTTP_TIMEOUT", 60), + # State file for forwarded-sha256 idempotency tracking. + # Defaults next to the log file for easy operator access. + "SFM_STATE_FILE": get_str("SFM_STATE_FILE", ""), } @@ -217,7 +241,7 @@ def scan_latest( # --- API heartbeat / SFM telemetry helpers --- -VERSION = "1.4.4" +VERSION = "1.5.0" def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]: @@ -366,6 +390,36 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: sniff_cache: Dict[str, Tuple[float, str]] = {} last_api_ts: float = 0.0 + last_forward_ts: float = 0.0 + + # ---- SFM event-forwarder setup ---- + # Default-off; only initialised when both flag and URL are set. + sfm_state = None + if cfg.get("SFM_FORWARD_ENABLED") and cfg.get("SFM_URL"): + try: + from event_forwarder import ForwardState + state_file = cfg.get("SFM_STATE_FILE") or os.path.join( + os.path.dirname(LOG_FILE) or here, "sfm_forwarded.json" + ) + sfm_state = ForwardState(state_file) + print( + "[CFG] SFM_FORWARD_ENABLED=true SFM_URL={} state={} ({} already-forwarded)".format( + cfg.get("SFM_URL"), state_file, sfm_state.count(), + ) + ) + log_message( + LOG_FILE, ENABLE_LOGGING, + "[cfg] sfm forwarder enabled url={} state={} already_forwarded={}".format( + cfg.get("SFM_URL"), state_file, sfm_state.count(), + ), + ) + except Exception as e: + print("[WARN] SFM forwarder init failed: {}".format(e)) + log_message(LOG_FILE, ENABLE_LOGGING, + "[warn] sfm forwarder init failed: {}".format(e)) + sfm_state = None + else: + print("[CFG] SFM_FORWARD_ENABLED=false (event forwarding disabled)") while not stop_event.is_set(): try: @@ -447,6 +501,51 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: else: state["api_status"] = "disabled" + # ---- SFM event forwarder ---- + # Same scan loop as the heartbeat, but on its own cadence + # (SFM_FORWARD_INTERVAL_SECONDS). Default-off — sfm_state + # is None unless config explicitly enabled it AND supplied + # an SFM_URL. + if sfm_state is not None: + now_ts = time.time() + fwd_interval = int(cfg.get("SFM_FORWARD_INTERVAL_SECONDS", 60)) + if now_ts - last_forward_ts >= fwd_interval: + try: + from event_forwarder import forward_pending + counts = forward_pending( + WATCH_PATH, + cfg.get("SFM_URL", ""), + sfm_state, + max_age_days=MAX_EVENT_AGE_DAYS, + quiescence_seconds=int(cfg.get("SFM_QUIESCENCE_SECONDS", 5)), + missing_report_grace_seconds=int( + cfg.get("SFM_MISSING_REPORT_GRACE_SECONDS", 60) + ), + timeout=int(cfg.get("SFM_HTTP_TIMEOUT", 60)), + logger=lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), + ) + last_forward_ts = now_ts + if counts["scanned"] > 0: + summary = ( + "[forward] scanned={} forwarded={} " + "with_report={} errors={}".format( + counts["scanned"], counts["forwarded"], + counts["with_report"], counts["errors"], + ) + ) + print(summary) + log_message(LOG_FILE, ENABLE_LOGGING, summary) + state["sfm_status"] = "ok" if counts["errors"] == 0 else "errors" + state["last_forward"] = datetime.now() + state["last_forward_counts"] = counts + except Exception as e: + err = "[forward-error] {}".format(e) + print(err) + log_message(LOG_FILE, ENABLE_LOGGING, err) + state["sfm_status"] = "fail" + else: + state["sfm_status"] = "disabled" + except Exception as e: err = "[loop-error] {}".format(e) print(err) diff --git a/test_event_forwarder.py b/test_event_forwarder.py new file mode 100644 index 0000000..df4268f --- /dev/null +++ b/test_event_forwarder.py @@ -0,0 +1,374 @@ +""" +test_event_forwarder.py — unit tests for the SFM event forwarder. + +Covers: + - is_event_binary() filename matching (positive + negative cases) + - ForwardState load/save round-trip + idempotency check + - find_pending_events() pairing + quiescence + grace-period logic + - _encode_multipart() byte-level shape (boundary + headers) + - forward_event_pair() end-to-end against a tiny stdlib HTTP server + that mimics the SFM /db/import/blastware_file endpoint + +Stdlib only — runs with `python -m pytest test_event_forwarder.py` +on Python 3.8+ (the watcher's compat target). +""" + +from __future__ import annotations + +import http.server +import json +import os +import socket +import tempfile +import threading +import time +import unittest +from pathlib import Path + +import event_forwarder as ef + + +# ── is_event_binary() ──────────────────────────────────────────────────────── + + +class TestIsEventBinary(unittest.TestCase): + + def test_recognises_typical_blastware_filenames(self): + for name in [ + "M529LK44.AB0", + "M529LKVQ.6S0", + "M529LKVQ.6S0W", + "S353L4H0.3M0W", + "P036L318.C80H", + "M529LIY6.N00", + ]: + self.assertTrue(ef.is_event_binary(name), name) + + def test_rejects_lowercase_extensions_we_explicitly_exclude(self): + for name in ["BE11529.MLG", "M529LK44.AB0.TXT", "agent.log", + "config.ini", "foo.bak", "bar.tmp", + "something.h5", "noise.json"]: + self.assertFalse(ef.is_event_binary(name), name) + + def test_rejects_non_matching_filenames(self): + for name in ["", "no_extension", + "TooShort.AB0", # stem must be 8 chars + "TOOLONG12345.AB0", # stem must be 8 chars + "M529LK44.A", # ext too short + "M529LK44.ABCDE", # ext too long + "M52.AB0", # stem too short + "1234ABCD.AB0"]: # first char must be letter + self.assertFalse(ef.is_event_binary(name), name) + + +# ── ForwardState ───────────────────────────────────────────────────────────── + + +class TestForwardState(unittest.TestCase): + + def test_round_trip_persists_marked_entries(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + s = ef.ForwardState(path) + self.assertFalse(s.is_forwarded("abc123")) + s.mark_forwarded("abc123", "M529LK44.AB0", 4400) + self.assertTrue(s.is_forwarded("abc123")) + + # Re-load from disk + s2 = ef.ForwardState(path) + self.assertTrue(s2.is_forwarded("abc123")) + self.assertEqual(s2.count(), 1) + + def test_corrupt_state_file_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + f.write("not valid json {{{") + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + def test_version_mismatch_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + json.dump({"version": 999, "forwarded": {"x": {}}}, f) + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + +# ── find_pending_events() ──────────────────────────────────────────────────── + + +class TestFindPendingEvents(unittest.TestCase): + + def _make(self, dir_path: Path, name: str, age_seconds: float = 100, + content: bytes = b"x") -> Path: + """Create a file with controlled mtime.""" + p = dir_path / name + p.write_bytes(content) + # Set mtime to simulate age + target = time.time() - age_seconds + os.utime(p, (target, target)) + return p + + def test_returns_pair_when_both_files_present_and_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + txt_p = self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0") + self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") + + def test_skips_if_already_forwarded(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary")) + + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_if_too_fresh_to_be_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=1, content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=1, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_forwards_alone_after_grace_when_txt_missing(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"binary") + # No .TXT created. + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + bin_path, txt_path = pending[0] + self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0") + self.assertIsNone(txt_path) + + def test_defers_when_txt_missing_and_within_grace(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=15, content=b"binary") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_old_files_beyond_max_age_days(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # 10 days old, but max_age_days=1 → should be excluded + self._make(tmp_p, "M529LK44.AB0", age_seconds=10 * 86400, + content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=10 * 86400, + content=b"report") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=1, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_ignores_mlg_and_other_non_event_files(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") + self._make(tmp_p, "agent.log", age_seconds=120, content=b"log") + self._make(tmp_p, "config.ini", age_seconds=120, content=b"cfg") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + +# ── Multipart encoder ──────────────────────────────────────────────────────── + + +class TestMultipartEncoder(unittest.TestCase): + + def test_encodes_two_parts_with_proper_boundary(self): + body, content_type = ef._encode_multipart([ + ("files", "a.bin", "application/octet-stream", b"\x01\x02"), + ("files", "a.txt", "text/plain", b"hello"), + ]) + # Content-Type header carries the boundary + self.assertTrue(content_type.startswith("multipart/form-data; boundary=")) + boundary = content_type.split("boundary=", 1)[1] + self.assertIn(boundary.encode("ascii"), body) + + # Body shape + text = body.decode("latin-1") + self.assertIn(f'name="files"; filename="a.bin"', text) + self.assertIn(f'name="files"; filename="a.txt"', text) + self.assertIn("Content-Type: application/octet-stream", text) + self.assertIn("Content-Type: text/plain", text) + # Trailing close boundary present + self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--")) + + +# ── End-to-end forward_event_pair against a fake server ────────────────────── + + +class _FakeImportHandler(http.server.BaseHTTPRequestHandler): + """Mimics seismo-relay's POST /db/import/blastware_file response.""" + received = [] # class-level capture for test inspection + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + ctype = self.headers.get("Content-Type", "") + + # Crude multipart split — enough to count parts and grab filenames. + parts = body.split(b"--" + ctype.split("boundary=")[-1].encode()) + # Locate filename= occurrences — that's our part count + filenames = [] + for p in parts: + for line in p.split(b"\r\n"): + if b'filename="' in line: + fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0] + filenames.append(fn.decode("latin-1")) + + self.__class__.received.append({ + "path": self.path, + "ctype": ctype, + "filenames": filenames, + }) + + # Build a faux SFM response: success for the first .bin-style filename + results = [] + binary_fn = next( + (fn for fn in filenames if not fn.lower().endswith(".txt")), + None, + ) + if binary_fn: + results.append({ + "filename": binary_fn, + "status": "ok", + "stored_filename": binary_fn, + "filesize": len(body), + "sha256": "00" * 32, + "report_attached": any(fn.lower().endswith(".txt") for fn in filenames), + "inserted": 1, + "skipped": 0, + }) + + payload = json.dumps({"count": len(results), "results": results}).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, *_a, **_kw): # silence the test runner + pass + + +def _start_fake_server() -> tuple[http.server.HTTPServer, str]: + """Start an HTTPServer on a random local port; return (server, base_url).""" + server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler) + threading.Thread(target=server.serve_forever, daemon=True).start() + host, port = server.server_address + return server, f"http://{host}:{port}" + + +class TestForwardEventPair(unittest.TestCase): + + def setUp(self): + _FakeImportHandler.received = [] + self.server, self.base_url = _start_fake_server() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + + def test_post_with_paired_report(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = tmp_p / "M529LK44.AB0" + txt_p = tmp_p / "M529LK44.AB0.TXT" + bin_p.write_bytes(b"\x10\x20\x30 binary") + txt_p.write_bytes(b'"Serial Number : BE11529"\n') + + result = ef.forward_event_pair( + self.base_url, str(bin_p), str(txt_p), timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertEqual(result["filename"], "M529LK44.AB0") + self.assertTrue(result["report_attached"]) + + self.assertEqual(len(_FakeImportHandler.received), 1) + req = _FakeImportHandler.received[0] + self.assertEqual(req["path"], "/db/import/blastware_file") + self.assertIn("M529LK44.AB0", req["filenames"]) + self.assertIn("M529LK44.AB0.TXT", req["filenames"]) + + def test_post_without_report(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "M529LK44.AB0" + bin_p.write_bytes(b"binary only") + + result = ef.forward_event_pair( + self.base_url, str(bin_p), None, timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertFalse(result["report_attached"]) + req = _FakeImportHandler.received[0] + self.assertEqual(req["filenames"], ["M529LK44.AB0"]) + + def test_post_propagates_serial_hint_in_query(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "M529LK44.AB0" + bin_p.write_bytes(b"x") + ef.forward_event_pair( + self.base_url, str(bin_p), None, + serial_hint="BE11529", timeout=5.0, + ) + req = _FakeImportHandler.received[0] + self.assertIn("serial=BE11529", req["path"]) + + +if __name__ == "__main__": + unittest.main()