diff --git a/CHANGELOG.md b/CHANGELOG.md index 174b4dc..133eb76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- +## [Unreleased] — v1.5.0 + +First release of the SFM event forwarder. + +### Added — SFM event forwarder +- **Forward Blastware event binaries (+ paired BW ACH ASCII reports) to an SFM server.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every event binary in the BW ACH watch folder is POSTed as multipart to `/db/import/blastware_file` along with its `__ASCII.TXT` partner report (BW ACH convention; manual-export `.TXT` is also supported as a fallback). SFM parses the report and indexes the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) into a searchable database — no codec decoding required. +- **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates. +- **Re-pair on late-arriving TXT.** When the watcher forwards a binary alone (`_ASCII.TXT` partner didn't appear within `SFM_MISSING_REPORT_GRACE_SECONDS`), the state file records `had_report: false`. On subsequent scans, the watcher re-checks whether the TXT has since arrived. If yes, the event is re-forwarded with the TXT attached — the SFM server's upsert path refreshes the DB row with the report's device-authoritative peak / project values. Without this, slow-disk or AV-interrupted TXT writes would permanently leave that event with broken-codec peaks in the SFM database. Legacy state-file entries (without the `had_report` field) default to `had_report: true` so an upgrade doesn't unexpectedly re-forward existing entries. +- **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's report partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever. +- **Per-pass rate cap.** `SFM_MAX_FORWARDS_PER_PASS` (default 500) drips first-deploy backfill instead of hammering the SFM server in one burst. At 60-second `SFM_FORWARD_INTERVAL_SECONDS` cadence that's ~30K events/hour throughput. Set to `0` for unlimited. Scan walks oldest-first so backfill advances chronologically and successive scans reliably progress. +- **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. Recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear from then on get forwarded. +- **SFM Forward tab in the Settings dialog** with: Forward checkbox, SFM URL + Test button (GETs `/health`), Forward Interval / Quiescence / Missing-Report Grace / HTTP Timeout / Max Events Per Pass spinboxes, State File entry with Browse... Save-time guard: enabling forwarding without a URL shows a validation error. +- **Histogram-aware log clarity.** Histogram events (extensions ending in `H`) don't get auto-exported reports from BW; the log distinguishes that case (`(histogram, no report expected)`) from a waveform with unexpectedly missing report (`no report ⚠`). +- **README "First-time deployment" section** documenting the seed-state workflow + the rate cap as belt-and-suspenders for safe rollout on machines with hundreds of thousands of historical events. +- 31 new unit tests in `test_event_forwarder.py` covering filename matching, state idempotency, scan logic (quiescence / grace period / max age / already-forwarded / TXT pairing), multipart byte shape, rate cap (oldest-first, cap=0 unlimited, cap=N enforcement), seed-state mode (in-window seeding / max-age skip / end-to-end skip-after-seed / idempotent re-runs), histogram classification, and an end-to-end POST against a stdlib fake server. + +### Configuration + +New `[agent]` keys (all default-off — existing 1.4.x deployments don't change behaviour on auto-update): + +| Key | Default | Notes | +|---|---|---| +| `SFM_FORWARD_ENABLED` | `false` | Master toggle for the forwarder | +| `SFM_URL` | empty | e.g. `http://10.0.0.44:8200` | +| `SFM_FORWARD_INTERVAL_SECONDS` | `60` | Scan-and-forward cadence | +| `SFM_QUIESCENCE_SECONDS` | `5` | Skip files modified in the last N seconds | +| `SFM_MISSING_REPORT_GRACE_SECONDS` | `60` | Forward without TXT after this delay | +| `SFM_HTTP_TIMEOUT` | `60` | Per-request HTTP timeout | +| `SFM_STATE_FILE` | `/sfm_forwarded.json` | Override location of the forwarded-sha256 state file | +| `SFM_MAX_FORWARDS_PER_PASS` | `500` | Per-scan cap (`0` = unlimited) | + +### Compatibility + +- Requires SFM server v0.16+ (the `/db/import/blastware_file` endpoint that accepts paired `_ASCII.TXT` reports + the BW-report label normalisation — released alongside this watcher version on the seismo-relay side). + ## [1.4.4] - 2026-03-17 ### Removed diff --git a/README.md b/README.md index 4f5af2e..726fd29 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Series 3 Watcher v1.4.4 +# Series 3 Watcher v1.5.0 Monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. Runs as a **system tray app** that starts automatically on login, reports heartbeats to terra-view, and self-updates from Gitea. @@ -88,6 +88,40 @@ All settings live in `config.ini`. The Setup Wizard covers every field, but here | `UPDATE_SOURCE` | `gitea` (default) or `url` — where to check for updates | | `UPDATE_URL` | Base URL of the update server when `UPDATE_SOURCE = url` (e.g. terra-view URL). The watcher fetches `/api/updates/series3-watcher/version.txt` and `/api/updates/series3-watcher/series3-watcher.exe` from this base. | +### SFM Event Forwarder (v1.5.0+) + +Forwards each Blastware event binary (and its paired `.TXT` ASCII report when present) to an SFM server's `/db/import/blastware_file` endpoint, where the report is parsed and the rich per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, sensor self-check) land in a searchable database. **Default-off** — existing deployments keep their old behaviour after auto-updating until the operator opts in. + +| Key | Description | +|-----|-------------| +| `SFM_FORWARD_ENABLED` | `true` to enable forwarding (default `false`) | +| `SFM_URL` | Base URL of the SFM server, e.g. `http://10.0.0.44:8200` | +| `SFM_FORWARD_INTERVAL_SECONDS` | Scan-and-forward cadence (default `60`); independent of the heartbeat interval | +| `SFM_QUIESCENCE_SECONDS` | Skip files modified within this many seconds (default `5`) — avoids forwarding mid-write | +| `SFM_MISSING_REPORT_GRACE_SECONDS` | If a `.TXT` partner hasn't appeared after this many seconds, forward the binary alone (default `60`) | +| `SFM_HTTP_TIMEOUT` | Per-request HTTP timeout in seconds (default `60`) | +| `SFM_STATE_FILE` | Path to the JSON state file tracking sha256 of forwarded events. Leave blank to default to `/sfm_forwarded.json` | +| `SFM_MAX_FORWARDS_PER_PASS` | Max events forwarded per scan tick (default `500`, `0` = unlimited). Drip-feeds backfill so a folder with thousands of qualifying events doesn't hammer the SFM server in one giant burst. | + +Forwarded files are tracked by sha256 in the state file, so re-scans / restarts / auto-updates never re-POST the same content. A failed POST stays in the pending pool and is retried on the next interval. + +#### First-time deployment on a folder with a large historical archive + +If you're enabling SFM forwarding on a Blastware ACH machine that's been accumulating events for years (tens or hundreds of thousands of files in the watch folder), you almost certainly **don't** want the watcher to forward all of them on first run. Two options: + +1. **Skip the historical backfill (recommended).** Run the seed-state CLI once before flipping `SFM_FORWARD_ENABLED=true`. It walks the folder, sha256s every existing in-window event, and marks them all as already-forwarded — without POSTing anything. The watcher then only forwards events that appear *after* the seed run. + + ``` + python event_forwarder.py --seed-state ^ + --watch "C:\Blastware 10\Event\autocall home" ^ + --state "C:\Users\\AppData\Local\Series3Watcher\agent_logs\sfm_forwarded.json" ^ + --max-age-days 365 + ``` + +2. **Throttle the backfill.** Leave `SFM_MAX_FORWARDS_PER_PASS` at its 500 default and let the watcher drip-feed. With a 60-second `SFM_FORWARD_INTERVAL_SECONDS` that's ~30K events/hour throughput. Backfill of 30K events takes about an hour, 100K takes ~3.5 hours. The cap fires per scan, so heartbeat and forwarding share the watcher's main loop without saturating it. + +Combine both for a fully controlled rollout: seed-state to skip the deep archive, then leave the cap on as a steady-state safety net. + --- ## Tray Icon @@ -119,9 +153,21 @@ To view connected watchers: **Settings → Developer → Watcher Manager**. --- +## Roadmap (Future) + +Deferred work — parked but worth tracking. Pairs with seismo-relay's +[Roadmap (Future)](https://gitea.serversdown.net/serversdown/seismo-relay#roadmap-future) +where the corresponding server-side work lives. + +- [ ] **File archive manager.** Move BW autocall-home events older than ~90 days into `_archive///` subfolders so the active watch directory doesn't accumulate hundreds of thousands of entries (filesystem dir lookups slow at 100K+, BW UI hangs opening the folder, watcher's own scandir gets expensive). Plan drafted in the codec-RE branch's plan-mode session, including a critical pre-coding test (does Blastware UI walk subfolders or only see the flat watch folder?) that determines the archive layout (in-place subfolders vs sibling archive). Default-off, dry-run mode, opt-in per machine. +- [ ] **MLG forwarding.** Currently the watcher's `is_event_binary()` filter explicitly excludes `.MLG` per-unit monitor log files — only event binaries (`.AB0` / `.PG0H` / etc.) and their paired `_ASCII.TXT` reports get forwarded. Adding an `POST /db/import/mlg_file` SFM endpoint + a parallel `.MLG` scan path on the watcher would populate `monitor_log` rows for non-ACH-routed units (coverage queries, "was this unit monitoring on date X" lookups). MLG files are append-only / mutable so the watcher needs a different dedup strategy than the per-event sha256 state file — better to forward whole file every scan and let the server dedup by `(serial, start_time)` on insert. +- [ ] **Pre-deploy seed-state UX in the Settings dialog.** Currently `event_forwarder.py --seed-state` is a CLI-only operation. A "Skip backfill" button next to the SFM Forward checkbox would let operators opt-out of re-forwarding the historical archive without dropping to a command line. + +--- + ## Versioning -Follows **Semantic Versioning**. Current release: **v1.4.4**. +Follows **Semantic Versioning**. Current release: **v1.5.0**. See `CHANGELOG.md` for full history. --- diff --git a/config-template.ini b/config-template.ini index fd03b34..45799c9 100644 --- a/config-template.ini +++ b/config-template.ini @@ -32,3 +32,43 @@ UPDATE_SOURCE = gitea # If UPDATE_SOURCE = url, set UPDATE_URL to the base URL of the update server (e.g. terra-view) UPDATE_URL = +# --- SFM Event Forwarder --- +# When enabled, every Blastware event binary (and its paired .TXT +# report when present) is forwarded to an SFM server's +# /db/import/blastware_file endpoint as a multipart POST. The SFM +# server parses the .TXT and indexes the event's full per-channel +# stats (PPV, ZC Freq, Time of Peak, Peak Acceleration, Peak +# Displacement, sensor self-check) for sortable / filterable review. +# +# Default-off so existing deployments don't change behaviour after an +# auto-update. To enable on a field machine: set SFM_URL, then flip +# SFM_FORWARD_ENABLED to true and restart the watcher. +SFM_FORWARD_ENABLED = false +SFM_URL = ; e.g. http://10.0.0.44:8200 +SFM_FORWARD_INTERVAL_SECONDS = 60 ; scan-and-forward cadence (independent of heartbeat) + +# Files modified within the last N seconds are skipped (BW may still +# be writing them). Defence against truncated uploads. +SFM_QUIESCENCE_SECONDS = 5 + +# If a binary's .TXT report hasn't appeared after this many seconds, +# forward the binary alone rather than blocking forever waiting. +SFM_MISSING_REPORT_GRACE_SECONDS = 60 + +# Per-request HTTP timeout (seconds). +SFM_HTTP_TIMEOUT = 60 + +# Path to the JSON state file tracking which events have been +# forwarded (sha256-keyed, idempotent across restarts). Leave blank +# to default to /sfm_forwarded.json. +SFM_STATE_FILE = + +# Per-pass cap — forward at most N events per scan tick. 0 = unlimited. +# Default 500 throttles first-deploy backfill on machines with large +# historical archives (tens or hundreds of thousands of events) so +# the SFM server isn't hammered with one giant burst. At 60s scan +# interval × 500 events/pass that's 30K events/hour throughput. +# See README "First-time deployment" for the recommended +# `--seed-state` workflow that skips the historical backfill entirely. +SFM_MAX_FORWARDS_PER_PASS = 500 + diff --git a/event_forwarder.py b/event_forwarder.py new file mode 100644 index 0000000..cbc6d2b --- /dev/null +++ b/event_forwarder.py @@ -0,0 +1,833 @@ +""" +event_forwarder.py — forward Blastware event files to an SFM server. + +Watches the same Blastware ACH folder the heartbeat path watches. +For each event binary that hasn't been forwarded yet, pairs it with +its `.TXT` report (when available) and POSTs both to SFM's +`/db/import/blastware_file` endpoint as one multipart request. + +The receiving SFM server (seismo-relay v0.16+) detects paired binaries +and reports by filename, parses the .TXT into structured fields +(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak +Displacement / sensor self-check / monitor log), and persists every +field into the SFM database for sortable / filterable monthly-summary +review. + +Design notes +──────────── +- **stdlib only.** Matches the rest of the watcher (`urllib.request`). + Multipart encoding is hand-rolled. +- **Idempotent across restarts.** Forwarded files are tracked by + sha256 in a JSON state file (`.forwarded.json` next to config.ini). + Re-scanning the watch folder doesn't re-POST anything. +- **Default-off.** Callers must enable via config + (`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x + deployments that auto-update to the new version stay non-forwarding + until an operator flips the switch. +- **Quiescence guard.** Files modified within the last few seconds + are skipped — Blastware ACH writes the .TXT after the binary, so + we wait until both look stable before forwarding. +- **Best-effort report pairing.** When the .TXT hasn't appeared yet + but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the + binary is forwarded alone (the SFM endpoint accepts that and just + skips the rich fields — we'd rather get the binary indexed than + block forever waiting for a TXT that never arrived). +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import time +import urllib.error +import urllib.request +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + + +log = logging.getLogger(__name__) + +# Default tuning. All overridable via config.ini SFM_* keys. +DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds +DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds +DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout +STATE_SCHEMA_VERSION = 1 + + +# ── Filename matching ───────────────────────────────────────────────────────── +# +# Blastware's filename scheme (confirmed in seismo-relay docs): +# prefix_letter (B–Z) + 3-digit serial-tail + 4-char base36 timestamp stem +# + "." + 3-or-4-char extension. +# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00. +# +# We accept lowercase too because some filesystems lower-case names. +_EVENT_FILENAME_RE = re.compile( + r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$" +) + +# Filenames we explicitly skip even if they happen to match the regex. +_NON_EVENT_EXTS = { + ".mlg", # monitor-log files (separate heartbeat path) + ".txt", # ASCII reports — handled via pairing, not as primary files + ".log", + ".ini", + ".dat", + ".bak", + ".tmp", + ".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence) + ".h5", + ".sfm.json", + ".json", +} + + +def is_event_binary(path: str) -> bool: + """Return True if `path`'s basename looks like a Blastware event binary.""" + name = os.path.basename(path) + if not _EVENT_FILENAME_RE.match(name): + return False + ext = os.path.splitext(name)[1].lower() + if ext in _NON_EVENT_EXTS: + return False + return True + + +def ach_report_name(binary_name: str) -> str: + """BW ACH report-naming convention. + + Blastware's official Auto Call Home server writes per-event ASCII + reports as ``__ASCII.TXT`` — the ``.`` between stem and + ext is replaced with ``_`` and ``_ASCII.TXT`` is appended. + + Examples: + ``M529LK44.AB0`` → ``M529LK44_AB0_ASCII.TXT`` + ``N844L20G.630H`` → ``N844L20G_630H_ASCII.TXT`` + ``H907L1R7.PG0H`` → ``H907L1R7_PG0H_ASCII.TXT`` + + For a filename without a dot (defensive — shouldn't happen for real + BW events) we still append ``_ASCII.TXT``. + """ + stem, dot, ext = binary_name.rpartition(".") + if not dot: + return binary_name + "_ASCII.TXT" + return stem + "_" + ext + "_ASCII.TXT" + + +def legacy_report_name(binary_name: str) -> str: + """Manual-export convention: ``.TXT`` (e.g. when an operator + saves an event report to text directly from BW's UI rather than + letting ACH auto-export it). Kept as a fallback so the codec-agent + test fixtures (``decode-re/5-8-26/event-c/M529LK44.AB0.TXT``) still + pair correctly.""" + return binary_name + ".TXT" + + +def report_path_for(binary_path: str) -> str: + """Legacy entry point — returns the manual-export path. Prefer + :func:`ach_report_name` for new BW deployments. Retained for + backward compatibility with any caller still on the old convention.""" + return legacy_report_name(binary_path) + + +def is_histogram_event(filename: str) -> bool: + """True if the filename's extension marks the file as a Full Histogram + event (BW filename scheme: 4-char extensions of the form ``AB0T`` where + ``T = H``). Old-firmware events use 3-char extensions where waveform-vs- + histogram is not encoded in the name; we can't tell those apart and + return False (the conservative answer — we don't want to suppress + "no report" warnings on potentially-waveform old-firmware events). + + Used purely for log clarity — when a forward goes through without a + paired TXT, the log distinguishes "histogram, no report expected" + (acceptable: BW may not have written one even though it normally + does for ACH-routed histograms) from "no report ⚠" on a waveform + (more suspicious: BW almost always writes the TXT for waveform events). + Forwarding logic itself doesn't depend on this check. + """ + name = os.path.basename(filename) + ext = os.path.splitext(name)[1].lstrip(".").upper() + return len(ext) == 4 and ext.endswith("H") + + +# ── State file ──────────────────────────────────────────────────────────────── + + +class ForwardState: + """Idempotency record: which event files have we already forwarded? + + State file format (JSON): + + { + "version": 1, + "forwarded": { + "": { + "filename": "M529LK44.AB0", + "size": 4400, + "forwarded_at": "2026-05-08T...Z" + }, + ... + } + } + + Keyed by sha256 (not filename) so that re-saved or re-uploaded + identical content is recognised as already-forwarded even if the + file moved or got renamed. Filename is preserved for human + inspection. + """ + + def __init__(self, path: str): + self.path = path + self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}} + self._load() + + def _load(self) -> None: + try: + with open(self.path, "r", encoding="utf-8") as f: + d = json.load(f) + if not isinstance(d, dict): + raise ValueError("state file root is not an object") + if d.get("version") != STATE_SCHEMA_VERSION: + log.warning( + "forward state version mismatch (got %r, want %d) — starting fresh", + d.get("version"), STATE_SCHEMA_VERSION, + ) + return + forwarded = d.get("forwarded") + if isinstance(forwarded, dict): + self._data["forwarded"] = forwarded + except FileNotFoundError: + pass + except (OSError, ValueError, json.JSONDecodeError) as exc: + log.warning("failed to load forward state from %s: %s", self.path, exc) + + def _save(self) -> None: + tmp = self.path + ".tmp" + try: + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self._data, f, indent=2, sort_keys=True) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, self.path) + except OSError as exc: + log.warning("failed to save forward state to %s: %s", self.path, exc) + + def is_forwarded(self, sha256: str) -> bool: + return sha256 in self._data["forwarded"] + + def status(self, sha256: str) -> Optional[bool]: + """Return forwarding status for *sha256*. + + Returns: + None — never forwarded. Eligible for a fresh forward. + True — forwarded successfully with its paired report + (or in a legacy entry that pre-dates the + had_report field — assumed complete for safety). + NOT a candidate for re-forward. + False — forwarded WITHOUT its paired ``_ASCII.TXT`` + (BW's TXT-write lagged past the grace period). + Eligible for re-forward IF the TXT now exists, + so the SFM server's upsert path can refresh the + DB row with the report's authoritative values. + + Legacy state-file entries without a ``had_report`` key default + to ``True`` so an upgrade doesn't unexpectedly re-forward + every entry the operator has accumulated. + """ + entry = self._data["forwarded"].get(sha256) + if entry is None: + return None + return bool(entry.get("had_report", True)) + + def mark_forwarded( + self, + sha256: str, + filename: str, + size: int, + had_report: bool = True, + ) -> None: + """Record a successful forward. + + Set ``had_report=False`` when the forward shipped the binary + without its paired ASCII report. Such entries are re-checked + on subsequent scans and re-forwarded once the TXT appears, so + SFM's upsert refreshes the DB row with the device-authoritative + peak/project values. + + Idempotent: re-marking an existing sha256 with ``had_report=True`` + is the explicit promotion path used when a re-pair succeeds. + """ + self._data["forwarded"][sha256] = { + "filename": filename, + "size": size, + "forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "had_report": had_report, + } + self._save() + + def count(self) -> int: + return len(self._data["forwarded"]) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def sha256_of_file(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool: + """Return True if the file's mtime is at least `quiescence_seconds` + in the past — i.e. no longer being written.""" + try: + mtime = os.path.getmtime(path) + except OSError: + return False + return (now_ts - mtime) >= quiescence_seconds + + +# ── Scan pass ───────────────────────────────────────────────────────────────── + + +def find_pending_events( + watch_dir: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + max_per_pass: int = 0, +) -> List[Tuple[str, Optional[str]]]: + """ + Walk `watch_dir` and return the list of (binary_path, txt_path_or_None) + pairs that need forwarding. + + Filtering rules: + - Filename must match the BW event filename regex. + - File must be quiescent (mtime >= quiescence_seconds in the past). + - File must not exceed `max_age_days` (matches the heartbeat + path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the + forwarder). + - File's sha256 must NOT already be in the forwarded state. + - If a `.TXT` exists and is quiescent, we pair them. + Otherwise, if the binary is older than + missing_report_grace_seconds, we forward without the TXT. + Younger binaries with a missing TXT are deferred — let BW + finish writing the report. + - When `max_per_pass > 0`, return at most that many pairs. + Older files (lower mtime) are forwarded first so backfill + proceeds chronologically. Use this to drip-feed a folder + with thousands of qualifying events instead of hammering + the SFM server with one giant burst. + """ + if not os.path.isdir(watch_dir): + log.warning("forward scan: watch dir not found: %s", watch_dir) + return [] + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + pending: List[Tuple[str, Optional[str]]] = [] + skipped_inflight = 0 + skipped_already_forwarded = 0 + + try: + with os.scandir(watch_dir) as it: + entries = list(it) + except OSError as exc: + log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc) + return [] + + # Cache existence of TXT partners so we don't stat() each twice. + names = {e.name for e in entries if e.is_file()} + + # Sort by mtime ASCENDING so chronological backfill happens oldest-first. + # When max_per_pass clamps the list, we always advance — we don't get + # stuck re-considering the same N newest files every scan. + def _mtime(entry: os.DirEntry) -> float: + try: + return entry.stat().st_mtime + except OSError: + return 0.0 + + entries = sorted( + (e for e in entries if e.is_file()), + key=_mtime, + ) + + for e in entries: + if not e.is_file(): + continue + if not is_event_binary(e.path): + continue + + try: + mtime = e.stat().st_mtime + size = e.stat().st_size + except OSError: + continue + + # Out-of-window: too old or too fresh + if (now_ts - mtime) > max_age_seconds: + continue + if not _is_quiescent(e.path, now_ts, quiescence_seconds): + skipped_inflight += 1 + continue + + # Idempotency: skip if we already forwarded this content + # successfully. Three cases via state.status(digest): + # True — forwarded WITH report → permanently done, skip. + # False — forwarded WITHOUT report → re-pair candidate. + # Forward again only if a paired TXT is now present + # so SFM's upsert refreshes the DB row. + # None — never forwarded → normal first-forward path. + try: + digest = sha256_of_file(e.path) + except OSError as exc: + log.warning("forward scan: sha256 failed for %s: %s", e.path, exc) + continue + fwd_status = state.status(digest) + if fwd_status is True: + skipped_already_forwarded += 1 + continue + + # TXT pairing — try BW ACH convention first + # (__ASCII.TXT) and fall back to the manual-export + # convention (.TXT). Both checked case-insensitively + # against the cached directory listing. ACH wins when both + # exist — that's the format BW's official ACH server writes. + candidates = [ach_report_name(e.name), legacy_report_name(e.name)] + + # Case-insensitive name lookup against the cached set. + names_lc_to_actual = None + txt_name: Optional[str] = None + for cand in candidates: + if cand in names: + txt_name = cand + break + # Build lower-case index lazily — most folders have very few + # TXT files relative to binaries, so the linear scan only + # fires when neither exact-case candidate matches. + if names_lc_to_actual is None: + names_lc_to_actual = {n.lower(): n for n in names} + actual = names_lc_to_actual.get(cand.lower()) + if actual: + txt_name = actual + break + + txt_path: Optional[str] = None + if txt_name: + candidate = os.path.join(watch_dir, txt_name) + if _is_quiescent(candidate, now_ts, quiescence_seconds): + txt_path = candidate + # else: TXT is mid-write; treat as not-yet-paired and defer. + + if fwd_status is False: + # Previously forwarded WITHOUT report. We're here looking + # for a re-pair opportunity. If the TXT is now present + # and quiescent, include in pending for re-forward (the + # SFM server's upsert will refresh the DB row with the + # report's authoritative values). Otherwise skip — no + # point re-forwarding the same binary alone again. + if txt_path is None: + skipped_already_forwarded += 1 + continue + elif txt_path is None: + # First-time forward and TXT not yet present. Wait for the + # grace period before forwarding alone. + if (now_ts - mtime) < missing_report_grace_seconds: + skipped_inflight += 1 + continue + + pending.append((e.path, txt_path)) + # Stash size + digest on the tuple-replacement for use during forward; + # callers can re-derive but caching avoids a second sha256. + + # Per-pass cap: once we have enough pending, stop scanning. + if max_per_pass and len(pending) >= max_per_pass: + break + + log.debug( + "forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d", + len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass, + ) + return pending + + +# ── Multipart upload ────────────────────────────────────────────────────────── + + +def _encode_multipart( + parts: List[Tuple[str, str, str, bytes]], +) -> Tuple[bytes, str]: + """Encode a list of (field_name, filename, content_type, data) tuples + as a multipart/form-data body. Returns (body_bytes, content_type + header value).""" + boundary = "----Series3WatcherBoundary" + os.urandom(8).hex() + chunks: List[bytes] = [] + for field_name, filename, content_type, data in parts: + chunks.append(("--" + boundary + "\r\n").encode("ascii")) + chunks.append( + (f'Content-Disposition: form-data; name="{field_name}"; ' + f'filename="{filename}"\r\n').encode("ascii") + ) + chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii")) + chunks.append(data) + chunks.append(b"\r\n") + chunks.append(("--" + boundary + "--\r\n").encode("ascii")) + body = b"".join(chunks) + content_type_hdr = f"multipart/form-data; boundary={boundary}" + return body, content_type_hdr + + +def _import_endpoint(sfm_url: str) -> str: + """Compose the import endpoint URL from a base SFM URL.""" + return sfm_url.rstrip("/") + "/db/import/blastware_file" + + +def forward_event_pair( + sfm_url: str, + binary_path: str, + txt_path: Optional[str], + *, + serial_hint: Optional[str] = None, + timeout: float = DEFAULT_HTTP_TIMEOUT, +) -> Dict[str, Any]: + """POST a single event (binary + optional .TXT) to the SFM import + endpoint. + + Returns a dict mirroring the per-file outcome the server returned + (see /db/import/blastware_file response.results[0]) on success, or + a dict with `status="error"` on transport/HTTP failure. + """ + binary_name = os.path.basename(binary_path) + with open(binary_path, "rb") as f: + binary_bytes = f.read() + + parts = [("files", binary_name, "application/octet-stream", binary_bytes)] + if txt_path is not None: + with open(txt_path, "rb") as f: + txt_bytes = f.read() + parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes)) + + body, content_type = _encode_multipart(parts) + + url = _import_endpoint(sfm_url) + if serial_hint: + sep = "&" if "?" in url else "?" + url = f"{url}{sep}serial={serial_hint}" + + req = urllib.request.Request( + url, data=body, method="POST", + headers={ + "Content-Type": content_type, + "Content-Length": str(len(body)), + "User-Agent": "series3-watcher/sfm-forwarder", + "Accept": "application/json", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8", errors="replace") + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return { + "status": "error", + "filename": binary_name, + "detail": f"server returned non-JSON: {raw[:200]!r}", + } + # Server returns {"count":N, "results":[{...}]}. Pull our row out. + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name and entry.get("status") == "ok": + return entry + # No matching ok row → propagate the first error we find + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name: + return entry + return { + "status": "error", + "filename": binary_name, + "detail": f"unexpected server response: {payload!r}", + } + except urllib.error.HTTPError as exc: + try: + body_excerpt = exc.read().decode("utf-8", errors="replace")[:300] + except Exception: + body_excerpt = "" + return { + "status": "error", + "filename": binary_name, + "detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}", + } + except urllib.error.URLError as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"connection error: {exc.reason}", + } + except (OSError, TimeoutError) as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"transport error: {exc}", + } + + +# ── Top-level orchestration ─────────────────────────────────────────────────── + + +def forward_pending( + watch_dir: str, + sfm_url: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + timeout: float = DEFAULT_HTTP_TIMEOUT, + max_per_pass: int = 0, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """ + Run one full pass: find pending events, POST each one, update state. + + Returns a counts dict suitable for logging: + + { + "scanned": , # total event binaries seen + "forwarded": , # successfully POSTed this pass + "errors": , # POST failures (will retry next pass) + "with_report":, # of forwarded, how many had a paired TXT + } + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + pending = find_pending_events( + watch_dir, state, + max_age_days=max_age_days, + quiescence_seconds=quiescence_seconds, + missing_report_grace_seconds=missing_report_grace_seconds, + max_per_pass=max_per_pass, + ) + + counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0} + + for binary_path, txt_path in pending: + result = forward_event_pair( + sfm_url, binary_path, txt_path, + timeout=timeout, + ) + if result.get("status") == "ok": + try: + digest = sha256_of_file(binary_path) + size = os.path.getsize(binary_path) + # Record whether this forward shipped a paired TXT. + # Forwards without a TXT are flagged had_report=False so + # subsequent scans re-check whether the TXT has since + # appeared and trigger a re-forward (the SFM server's + # upsert path refreshes the DB row with the report's + # authoritative values). + state.mark_forwarded( + digest, + os.path.basename(binary_path), + size, + had_report=(txt_path is not None), + ) + except OSError as exc: + _log(f"[forward] post-success state save failed for " + f"{os.path.basename(binary_path)}: {exc}") + counts["forwarded"] += 1 + if txt_path: + counts["with_report"] += 1 + + # Differentiate three cases in the log so "no report" is only + # noisy when something's actually unexpected: + # - waveform + TXT → "+ attached" + # - waveform without TXT → "no report ⚠" (BW maybe didn't auto-export) + # - histogram (any flavour) → "(histogram, no report expected)" + if txt_path: + report_token = "+ {} attached".format(os.path.basename(txt_path)) + elif is_histogram_event(binary_path): + report_token = "(histogram, no report expected)" + else: + report_token = "no report ⚠" + + _log( + "[forward] OK {} ({}B, {}, inserted={}, skipped={})".format( + os.path.basename(binary_path), + result.get("filesize", 0), + report_token, + result.get("inserted", 0), + result.get("skipped", 0), + ) + ) + else: + counts["errors"] += 1 + _log( + f"[forward] ERR {os.path.basename(binary_path)}: " + f"{result.get('detail', 'unknown error')}" + ) + + return counts + + +# ── Seed-state mode (skip historical backfill on first deploy) ──────────────── + + +def seed_state_from_folder( + watch_dir: str, + state: ForwardState, + *, + max_age_days: int = 365, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """Walk `watch_dir` and mark every existing event binary as already + forwarded — without POSTing anything. + + This is the right tool for a first deploy on a machine that already + has tens or hundreds of thousands of historical events in the BW + ACH folder. Run it ONCE before enabling SFM_FORWARD_ENABLED: + + python event_forwarder.py --seed-state \ + --watch "C:\\Blastware 10\\Event\\autocall home" \ + --state "C:\\...\\sfm_forwarded.json" \ + [--max-age-days 365] + + The watcher then starts forwarding only events that appear AFTER + the seed run. Files older than `max_age_days` are still skipped + by the regular scan loop — we don't bother seeding them because + they wouldn't be forwarded anyway. + + Returns a counts dict: + {"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int} + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0} + + if not os.path.isdir(watch_dir): + _log(f"[seed] watch dir not found: {watch_dir}") + return counts + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + try: + with os.scandir(watch_dir) as it: + entries = [e for e in it if e.is_file()] + except OSError as exc: + _log(f"[seed] scandir failed on {watch_dir}: {exc}") + return counts + + for e in entries: + if not is_event_binary(e.path): + continue + counts["scanned"] += 1 + try: + mtime = e.stat().st_mtime + size = e.stat().st_size + except OSError: + continue + if (now_ts - mtime) > max_age_seconds: + counts["skipped_too_old"] += 1 + continue + try: + digest = sha256_of_file(e.path) + except OSError as exc: + _log(f"[seed] sha256 failed for {e.path}: {exc}") + continue + if state.is_forwarded(digest): + counts["already_known"] += 1 + continue + state.mark_forwarded(digest, e.name, size) + counts["seeded"] += 1 + if counts["seeded"] % 1000 == 0: + _log(f"[seed] progress: {counts['seeded']} seeded so far...") + + _log( + f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} " + f"already_known={counts['already_known']} " + f"skipped_too_old={counts['skipped_too_old']}" + ) + return counts + + +# ── CLI entry point ───────────────────────────────────────────────────────── + + +def _main() -> int: + """Command-line interface for one-shot operations. + + Currently supports a single mode: + + python event_forwarder.py --seed-state \ + --watch "" \ + --state "" \ + [--max-age-days 365] + + which marks every existing in-window event binary as already + forwarded (without POSTing) so the watcher only forwards events + appearing AFTER the seed. + """ + import argparse + parser = argparse.ArgumentParser( + description="Series 3 Watcher — SFM event forwarder utilities", + ) + parser.add_argument( + "--seed-state", action="store_true", + help="Mark every event binary in --watch as already-forwarded " + "(without POSTing). Use this BEFORE enabling SFM_FORWARD " + "on a machine with a large historical archive.", + ) + parser.add_argument( + "--watch", required=True, + help="Path to the Blastware ACH folder.", + ) + parser.add_argument( + "--state", required=True, + help="Path to the JSON state file. Will be created if missing.", + ) + parser.add_argument( + "--max-age-days", type=int, default=365, + help="Only seed files newer than this many days (default 365).", + ) + args = parser.parse_args() + + if not args.seed_state: + parser.error("specify --seed-state (no other modes supported yet)") + + print(f"[seed] watch_dir = {args.watch}") + print(f"[seed] state = {args.state}") + print(f"[seed] max_age = {args.max_age_days} days") + + state = ForwardState(args.state) + print(f"[seed] state currently has {state.count()} entries") + seed_state_from_folder( + args.watch, state, + max_age_days=args.max_age_days, + logger=lambda m: print(m), + ) + print(f"[seed] state now has {state.count()} entries") + return 0 + + +if __name__ == "__main__": + import sys + sys.exit(_main()) diff --git a/installer.iss b/installer.iss index 1dd056a..1625c87 100644 --- a/installer.iss +++ b/installer.iss @@ -3,7 +3,7 @@ [Setup] AppName=Series 3 Watcher -AppVersion=1.4.4 +AppVersion=1.5.0 AppPublisher=Terra-Mechanics Inc. DefaultDirName={pf}\Series3Watcher DefaultGroupName=Series 3 Watcher diff --git a/series3_tray.py b/series3_tray.py index 2914440..555a2f6 100644 --- a/series3_tray.py +++ b/series3_tray.py @@ -1,5 +1,5 @@ """ -Series 3 Watcher — System Tray Launcher v1.4.4 +Series 3 Watcher — System Tray Launcher v1.5.0 Requires: pystray, Pillow, tkinter (stdlib) Run with: pythonw series3_tray.py (no console window) diff --git a/series3_watcher.py b/series3_watcher.py index b5881cf..9d7eb98 100644 --- a/series3_watcher.py +++ b/series3_watcher.py @@ -80,6 +80,36 @@ def load_config(path: str) -> Dict[str, Any]: # Auto-updater source "UPDATE_SOURCE": get_str("UPDATE_SOURCE", "gitea"), "UPDATE_URL": get_str("UPDATE_URL", ""), + + # SFM event forwarder — when enabled, forwards each Blastware + # event binary (+ paired .TXT report when present) to an SFM + # server's /db/import/blastware_file endpoint. Default-off so + # existing 1.4.x deployments don't change behaviour on + # auto-update; operators flip it on by setting SFM_URL + + # SFM_FORWARD_ENABLED=true in config.ini. + "SFM_FORWARD_ENABLED": get_bool("SFM_FORWARD_ENABLED", False), + "SFM_URL": get_str("SFM_URL", ""), + "SFM_FORWARD_INTERVAL_SECONDS": get_int("SFM_FORWARD_INTERVAL_SECONDS", 60), + # Files modified within the last N seconds are skipped (BW may + # still be writing them). + "SFM_QUIESCENCE_SECONDS": get_int("SFM_QUIESCENCE_SECONDS", 5), + # If a binary's .TXT report hasn't appeared after this many + # seconds, forward the binary alone rather than blocking + # forever. + "SFM_MISSING_REPORT_GRACE_SECONDS": get_int( + "SFM_MISSING_REPORT_GRACE_SECONDS", 60 + ), + # Per-request HTTP timeout (seconds). + "SFM_HTTP_TIMEOUT": get_int("SFM_HTTP_TIMEOUT", 60), + # State file for forwarded-sha256 idempotency tracking. + # Defaults next to the log file for easy operator access. + "SFM_STATE_FILE": get_str("SFM_STATE_FILE", ""), + # Per-pass cap — forward at most N events per scan tick. + # 0 = unlimited. Default 500 as a safety against accidentally + # backfilling tens of thousands of events in one burst on + # first deploy in a folder that's been accumulating for years. + # See README "First-time deployment" section. + "SFM_MAX_FORWARDS_PER_PASS": get_int("SFM_MAX_FORWARDS_PER_PASS", 500), } @@ -217,7 +247,7 @@ def scan_latest( # --- API heartbeat / SFM telemetry helpers --- -VERSION = "1.4.4" +VERSION = "1.5.0" def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]: @@ -366,6 +396,36 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: sniff_cache: Dict[str, Tuple[float, str]] = {} last_api_ts: float = 0.0 + last_forward_ts: float = 0.0 + + # ---- SFM event-forwarder setup ---- + # Default-off; only initialised when both flag and URL are set. + sfm_state = None + if cfg.get("SFM_FORWARD_ENABLED") and cfg.get("SFM_URL"): + try: + from event_forwarder import ForwardState + state_file = cfg.get("SFM_STATE_FILE") or os.path.join( + os.path.dirname(LOG_FILE) or here, "sfm_forwarded.json" + ) + sfm_state = ForwardState(state_file) + print( + "[CFG] SFM_FORWARD_ENABLED=true SFM_URL={} state={} ({} already-forwarded)".format( + cfg.get("SFM_URL"), state_file, sfm_state.count(), + ) + ) + log_message( + LOG_FILE, ENABLE_LOGGING, + "[cfg] sfm forwarder enabled url={} state={} already_forwarded={}".format( + cfg.get("SFM_URL"), state_file, sfm_state.count(), + ), + ) + except Exception as e: + print("[WARN] SFM forwarder init failed: {}".format(e)) + log_message(LOG_FILE, ENABLE_LOGGING, + "[warn] sfm forwarder init failed: {}".format(e)) + sfm_state = None + else: + print("[CFG] SFM_FORWARD_ENABLED=false (event forwarding disabled)") while not stop_event.is_set(): try: @@ -447,6 +507,52 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: else: state["api_status"] = "disabled" + # ---- SFM event forwarder ---- + # Same scan loop as the heartbeat, but on its own cadence + # (SFM_FORWARD_INTERVAL_SECONDS). Default-off — sfm_state + # is None unless config explicitly enabled it AND supplied + # an SFM_URL. + if sfm_state is not None: + now_ts = time.time() + fwd_interval = int(cfg.get("SFM_FORWARD_INTERVAL_SECONDS", 60)) + if now_ts - last_forward_ts >= fwd_interval: + try: + from event_forwarder import forward_pending + counts = forward_pending( + WATCH_PATH, + cfg.get("SFM_URL", ""), + sfm_state, + max_age_days=MAX_EVENT_AGE_DAYS, + quiescence_seconds=int(cfg.get("SFM_QUIESCENCE_SECONDS", 5)), + missing_report_grace_seconds=int( + cfg.get("SFM_MISSING_REPORT_GRACE_SECONDS", 60) + ), + timeout=int(cfg.get("SFM_HTTP_TIMEOUT", 60)), + max_per_pass=int(cfg.get("SFM_MAX_FORWARDS_PER_PASS", 500)), + logger=lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), + ) + last_forward_ts = now_ts + if counts["scanned"] > 0: + summary = ( + "[forward] scanned={} forwarded={} " + "with_report={} errors={}".format( + counts["scanned"], counts["forwarded"], + counts["with_report"], counts["errors"], + ) + ) + print(summary) + log_message(LOG_FILE, ENABLE_LOGGING, summary) + state["sfm_status"] = "ok" if counts["errors"] == 0 else "errors" + state["last_forward"] = datetime.now() + state["last_forward_counts"] = counts + except Exception as e: + err = "[forward-error] {}".format(e) + print(err) + log_message(LOG_FILE, ENABLE_LOGGING, err) + state["sfm_status"] = "fail" + else: + state["sfm_status"] = "disabled" + except Exception as e: err = "[loop-error] {}".format(e) print(err) diff --git a/settings_dialog.py b/settings_dialog.py index e06a4cc..0b22531 100644 --- a/settings_dialog.py +++ b/settings_dialog.py @@ -1,5 +1,5 @@ """ -Series 3 Watcher — Settings Dialog v1.4.4 +Series 3 Watcher — Settings Dialog v1.5.0 Provides a Tkinter settings dialog that doubles as a first-run wizard. @@ -41,6 +41,18 @@ DEFAULTS = { # Auto-updater "UPDATE_SOURCE": "gitea", "UPDATE_URL": "", + + # SFM event forwarder (default-off; existing 1.4.x deployments + # don't change behaviour after auto-update until an operator + # opts in by setting SFM_URL + flipping SFM_FORWARD_ENABLED). + "SFM_FORWARD_ENABLED": "false", + "SFM_URL": "", + "SFM_FORWARD_INTERVAL_SECONDS": "60", + "SFM_QUIESCENCE_SECONDS": "5", + "SFM_MISSING_REPORT_GRACE_SECONDS": "60", + "SFM_HTTP_TIMEOUT": "60", + "SFM_STATE_FILE": "", + "SFM_MAX_FORWARDS_PER_PASS": "500", } @@ -237,6 +249,17 @@ class SettingsDialog: self.var_update_source = tk.StringVar(value=v["UPDATE_SOURCE"].lower() if v["UPDATE_SOURCE"].lower() in ("gitea", "url", "disabled") else "gitea") self.var_update_url = tk.StringVar(value=v["UPDATE_URL"]) + # SFM event forwarder + self.var_sfm_enabled = tk.BooleanVar( + value=v["SFM_FORWARD_ENABLED"].lower() in ("1", "true", "yes", "on")) + self.var_sfm_url = tk.StringVar(value=v["SFM_URL"]) + self.var_sfm_forward_interval = tk.StringVar(value=v["SFM_FORWARD_INTERVAL_SECONDS"]) + self.var_sfm_quiescence = tk.StringVar(value=v["SFM_QUIESCENCE_SECONDS"]) + self.var_sfm_missing_report_grace = tk.StringVar(value=v["SFM_MISSING_REPORT_GRACE_SECONDS"]) + self.var_sfm_http_timeout = tk.StringVar(value=v["SFM_HTTP_TIMEOUT"]) + self.var_sfm_state_file = tk.StringVar(value=v["SFM_STATE_FILE"]) + self.var_sfm_max_per_pass = tk.StringVar(value=v["SFM_MAX_FORWARDS_PER_PASS"]) + # --- UI construction --- def _build_ui(self): @@ -264,6 +287,7 @@ class SettingsDialog: self._build_tab_scanning(nb) self._build_tab_logging(nb) self._build_tab_updates(nb) + self._build_tab_sfm(nb) # Buttons btn_frame = tk.Frame(outer) @@ -462,6 +486,124 @@ class SettingsDialog: else: self._update_url_entry.config(state="disabled") + # ────────────────────────────────────────────────────────────────── + # SFM Forward tab + # ────────────────────────────────────────────────────────────────── + + def _build_tab_sfm(self, nb): + """Configure the SFM event forwarder. + + When enabled, every Blastware event binary in the watch folder + (plus its paired .TXT report when present) is POSTed to an SFM + server's /db/import/blastware_file endpoint. Default-off so + existing 1.4.x deployments don't change behaviour after an + auto-update — operator opts in by setting the URL and flipping + the checkbox. + """ + f = self._tab_frame(nb, "SFM Forward") + + _add_label_check(f, 0, "Forward events to SFM", self.var_sfm_enabled) + + # SFM URL row — entry + Test button (mirrors the Connection tab's pattern) + tk.Label(f, text="SFM Server URL", anchor="w").grid( + row=1, column=0, sticky="w", padx=(8, 4), pady=4 + ) + url_frame = tk.Frame(f) + url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4) + url_frame.columnconfigure(0, weight=1) + + sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32) + sfm_entry.grid(row=0, column=0, sticky="ew") + + self._sfm_test_btn = ttk.Button( + url_frame, text="Test", width=6, command=self._test_sfm_connection, + ) + self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0)) + + self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20) + self._sfm_test_status.grid(row=0, column=2, padx=(6, 0)) + + _add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 5, 3600) + _add_label_spinbox(f, 3, "Max Events Per Pass", self.var_sfm_max_per_pass, 0, 100000) + _add_label_spinbox(f, 4, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60) + _add_label_spinbox(f, 5, "Missing-Report Grace (sec)", self.var_sfm_missing_report_grace, 0, 600) + _add_label_spinbox(f, 6, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 600) + + tk.Label(f, text="State File", anchor="w").grid( + row=7, column=0, sticky="w", padx=(8, 4), pady=4 + ) + state_frame = tk.Frame(f) + state_frame.grid(row=7, column=1, sticky="ew", padx=(0, 8), pady=4) + state_frame.columnconfigure(0, weight=1) + + state_entry = ttk.Entry(state_frame, textvariable=self.var_sfm_state_file, width=32) + state_entry.grid(row=0, column=0, sticky="ew") + + def _browse_state(): + path = filedialog.asksaveasfilename( + title="SFM forward-state file", + defaultextension=".json", + filetypes=[("JSON", "*.json"), ("All Files", "*.*")], + initialfile="sfm_forwarded.json", + ) + if path: + self.var_sfm_state_file.set(path) + + ttk.Button(state_frame, text="Browse...", width=10, command=_browse_state).grid( + row=0, column=1, padx=(4, 0) + ) + + hint_text = ( + "Forwards every Blastware event binary (and its paired .TXT report)\n" + "to an SFM server, where the report is parsed for searchable\n" + "per-channel stats: PPV, ZC Freq, Time of Peak, Peak Acceleration,\n" + "Peak Displacement, sensor self-check, monitor log.\n\n" + "Idempotent: forwarded files are tracked by sha256 in the state\n" + "file; restarts and re-scans never re-POST. Leave State File blank\n" + "to default to /sfm_forwarded.json." + ) + tk.Label(f, text=hint_text, justify="left", fg="#555555", wraplength=380).grid( + row=8, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4) + ) + + def _test_sfm_connection(self): + """GET /health and show the result.""" + import urllib.request + import urllib.error + + self._sfm_test_status.config(text="Testing...", foreground="grey") + self._sfm_test_btn.config(state="disabled") + self.root.update_idletasks() + + raw = self.var_sfm_url.get().strip() + if not raw: + self._sfm_test_status.config(text="Enter a URL first", foreground="orange") + self._sfm_test_btn.config(state="normal") + return + + url = raw.rstrip("/") + "/health" + + try: + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=5) as resp: + if resp.status == 200: + self._sfm_test_status.config(text="Connected!", foreground="green") + else: + self._sfm_test_status.config( + text="HTTP {}".format(resp.status), foreground="orange", + ) + except urllib.error.URLError as e: + reason = str(e.reason) if hasattr(e, "reason") else str(e) + self._sfm_test_status.config( + text="Failed: {}".format(reason[:30]), foreground="red", + ) + except Exception as e: + self._sfm_test_status.config( + text="Error: {}".format(str(e)[:30]), foreground="red", + ) + finally: + self._sfm_test_btn.config(state="normal") + # --- Validation helpers --- @@ -494,6 +636,11 @@ class SettingsDialog: (self.var_scan_interval, "Scan Interval", 10, 3600, 300), (self.var_mlg_header_bytes, "MLG Header Bytes", 256, 65536, 2048), (self.var_log_retention_days, "Log Retention Days", 1, 365, 30), + (self.var_sfm_forward_interval, "SFM Forward Interval", 5, 3600, 60), + (self.var_sfm_quiescence, "SFM Quiescence", 1, 60, 5), + (self.var_sfm_missing_report_grace, "SFM Missing-Report Grace", 0, 600, 60), + (self.var_sfm_http_timeout, "SFM HTTP Timeout", 5, 600, 60), + (self.var_sfm_max_per_pass, "SFM Max Events Per Pass", 0, 100000, 500), ] int_values = {} for var, name, mn, mx, dflt in checks: @@ -502,6 +649,17 @@ class SettingsDialog: return # validation failed; keep dialog open int_values[name] = result + # SFM forwarding requires a URL when enabled — common foot-gun + # to flip the checkbox without filling in the field. + if self.var_sfm_enabled.get() and not self.var_sfm_url.get().strip(): + messagebox.showerror( + "Validation Error", + "SFM Forward is enabled but the SFM Server URL field is empty.\n\n" + "Either set the URL (e.g. http://10.0.0.44:8200) or uncheck " + "'Forward events to SFM'.", + ) + return + # Resolve source_id placeholder source_id = self.var_source_id.get().strip() # Strip placeholder hint if user left it @@ -530,6 +688,16 @@ class SettingsDialog: "LOG_RETENTION_DAYS": str(int_values["Log Retention Days"]), "UPDATE_SOURCE": self.var_update_source.get().strip() or "gitea", "UPDATE_URL": self.var_update_url.get().strip(), + + # SFM event forwarder + "SFM_FORWARD_ENABLED": "true" if self.var_sfm_enabled.get() else "false", + "SFM_URL": self.var_sfm_url.get().strip().rstrip("/"), + "SFM_FORWARD_INTERVAL_SECONDS": str(int_values["SFM Forward Interval"]), + "SFM_QUIESCENCE_SECONDS": str(int_values["SFM Quiescence"]), + "SFM_MISSING_REPORT_GRACE_SECONDS": str(int_values["SFM Missing-Report Grace"]), + "SFM_HTTP_TIMEOUT": str(int_values["SFM HTTP Timeout"]), + "SFM_STATE_FILE": self.var_sfm_state_file.get().strip(), + "SFM_MAX_FORWARDS_PER_PASS": str(int_values["SFM Max Events Per Pass"]), } try: diff --git a/test_event_forwarder.py b/test_event_forwarder.py new file mode 100644 index 0000000..e425fe6 --- /dev/null +++ b/test_event_forwarder.py @@ -0,0 +1,718 @@ +""" +test_event_forwarder.py — unit tests for the SFM event forwarder. + +Covers: + - is_event_binary() filename matching (positive + negative cases) + - ForwardState load/save round-trip + idempotency check + - find_pending_events() pairing + quiescence + grace-period logic + - _encode_multipart() byte-level shape (boundary + headers) + - forward_event_pair() end-to-end against a tiny stdlib HTTP server + that mimics the SFM /db/import/blastware_file endpoint + +Stdlib only — runs with `python -m pytest test_event_forwarder.py` +on Python 3.8+ (the watcher's compat target). +""" + +from __future__ import annotations + +import http.server +import json +import os +import socket +import tempfile +import threading +import time +import unittest +from pathlib import Path + +import event_forwarder as ef + + +# ── is_event_binary() ──────────────────────────────────────────────────────── + + +class TestIsEventBinary(unittest.TestCase): + + def test_recognises_typical_blastware_filenames(self): + for name in [ + "M529LK44.AB0", + "M529LKVQ.6S0", + "M529LKVQ.6S0W", + "S353L4H0.3M0W", + "P036L318.C80H", + "M529LIY6.N00", + ]: + self.assertTrue(ef.is_event_binary(name), name) + + def test_rejects_lowercase_extensions_we_explicitly_exclude(self): + for name in ["BE11529.MLG", "M529LK44.AB0.TXT", "agent.log", + "config.ini", "foo.bak", "bar.tmp", + "something.h5", "noise.json"]: + self.assertFalse(ef.is_event_binary(name), name) + + def test_ach_report_name(self): + """BW ACH convention: .__ASCII.TXT""" + cases = [ + ("M529LK44.AB0", "M529LK44_AB0_ASCII.TXT"), + ("N844L20G.630H", "N844L20G_630H_ASCII.TXT"), + ("I145L64P.GD0W", "I145L64P_GD0W_ASCII.TXT"), + ("H907L1R7.PG0H", "H907L1R7_PG0H_ASCII.TXT"), + ] + for binary, expected in cases: + self.assertEqual(ef.ach_report_name(binary), expected, binary) + + def test_legacy_report_name(self): + """Manual-export convention: .TXT""" + self.assertEqual(ef.legacy_report_name("M529LK44.AB0"), + "M529LK44.AB0.TXT") + + def test_is_histogram_event(self): + # 4-char extension ending in H = histogram + for name in ["H907L1R7.PG0H", "S353L4H0.8S0H", "P036L318.C80H"]: + self.assertTrue(ef.is_histogram_event(name), name) + # 4-char extension ending in W = waveform + for name in ["S353L4H0.3M0W", "M529LKVQ.6S0W", "P036L318.C80W"]: + self.assertFalse(ef.is_histogram_event(name), name) + # 3-char old-firmware extensions can't be classified — return False + for name in ["M529LK44.AB0", "M529LIY6.N00", "M529LJ8V.490"]: + self.assertFalse(ef.is_histogram_event(name), name) + + def test_rejects_non_matching_filenames(self): + for name in ["", "no_extension", + "TooShort.AB0", # stem must be 8 chars + "TOOLONG12345.AB0", # stem must be 8 chars + "M529LK44.A", # ext too short + "M529LK44.ABCDE", # ext too long + "M52.AB0", # stem too short + "1234ABCD.AB0"]: # first char must be letter + self.assertFalse(ef.is_event_binary(name), name) + + +# ── ForwardState ───────────────────────────────────────────────────────────── + + +class TestForwardState(unittest.TestCase): + + def test_round_trip_persists_marked_entries(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + s = ef.ForwardState(path) + self.assertFalse(s.is_forwarded("abc123")) + s.mark_forwarded("abc123", "M529LK44.AB0", 4400) + self.assertTrue(s.is_forwarded("abc123")) + + # Re-load from disk + s2 = ef.ForwardState(path) + self.assertTrue(s2.is_forwarded("abc123")) + self.assertEqual(s2.count(), 1) + + def test_corrupt_state_file_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + f.write("not valid json {{{") + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + def test_version_mismatch_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + json.dump({"version": 999, "forwarded": {"x": {}}}, f) + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + +# ── find_pending_events() ──────────────────────────────────────────────────── + + +class TestFindPendingEvents(unittest.TestCase): + + def _make(self, dir_path: Path, name: str, age_seconds: float = 100, + content: bytes = b"x") -> Path: + """Create a file with controlled mtime.""" + p = dir_path / name + p.write_bytes(content) + # Set mtime to simulate age + target = time.time() - age_seconds + os.utime(p, (target, target)) + return p + + def test_returns_pair_when_both_files_present_and_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + txt_p = self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0") + self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") + + def test_pairs_with_ach_underscore_ascii_naming(self): + """BW ACH writes M529LK44.AB0 + M529LK44_AB0_ASCII.TXT. The + watcher must pair these even though the .TXT filename doesn't + carry a literal copy of the binary's name.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "N844L20G.630H", age_seconds=120, content=b"binary") + self._make(tmp_p, "N844L20G_630H_ASCII.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][0]), "N844L20G.630H") + self.assertEqual(os.path.basename(pending[0][1]), + "N844L20G_630H_ASCII.TXT") + + def test_pairs_with_ach_underscore_ascii_naming_for_waveform(self): + """Same as above but for new-firmware waveform events + (extension ends in W).""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "I145L64P.GD0W", age_seconds=120, content=b"binary") + self._make(tmp_p, "I145L64P_GD0W_ASCII.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][1]), + "I145L64P_GD0W_ASCII.TXT") + + def test_pairing_prefers_ach_naming_when_both_exist(self): + """If a folder has BOTH conventions (operator manually exported + AND ACH also auto-exported), ACH wins because that's the + canonical name in modern BW deployments.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + # Both partner files present + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual") + self._make(tmp_p, "M529LK44_AB0_ASCII.TXT", age_seconds=100, content=b"ach") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][1]), + "M529LK44_AB0_ASCII.TXT") + + def test_pairing_falls_back_to_dot_txt_when_ach_absent(self): + """If only the manual-export filename exists, the legacy + convention still works (preserves codec-agent test fixtures).""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") + + def test_skips_if_already_forwarded(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary")) + + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_if_too_fresh_to_be_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=1, content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=1, content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_forwards_alone_after_grace_when_txt_missing(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"binary") + # No .TXT created. + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + bin_path, txt_path = pending[0] + self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0") + self.assertIsNone(txt_path) + + def test_re_pair_after_late_arriving_txt(self): + """If we forwarded the binary alone (TXT was late) and the TXT + later appears, the binary becomes eligible for re-forward.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", + age_seconds=200, content=b"binary") + # Mark as already-forwarded WITHOUT a paired report (the + # state we'd be in after a TXT-too-late forward). + state = ef.ForwardState(str(tmp_p / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"), + had_report=False) + + # First scan: TXT not present yet → still skipped. + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(pending, [], + "no TXT present → no re-pair attempt") + + # Now BW finally writes the TXT. + self._make(tmp_p, "M529LK44.AB0.TXT", + age_seconds=100, content=b"report") + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1, + "TXT now present → re-pair attempt expected") + self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0") + self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") + + def test_re_pair_not_attempted_when_already_had_report(self): + """Successful WITH-report forwards stay permanently skipped. + Adding more files later does NOT trigger a re-forward.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"x") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"r") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + state.mark_forwarded(ef.sha256_of_file(str(bin_p)), + "M529LK44.AB0", 1, had_report=True) + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(pending, [], + "had_report=True forwards stay skipped") + + def test_legacy_state_entries_default_to_had_report_true(self): + """Backward compat: state-file entries from before the + had_report field existed are treated as fully forwarded so + an upgrade doesn't re-forward every entry.""" + import json + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + path = str(tmp_p / "fwd.json") + with open(path, "w") as f: + json.dump({ + "version": 1, + "forwarded": { + "abc123": { + "filename": "M529LK01.AB0", + "size": 123, + "forwarded_at": "2025-01-01T00:00:00Z", + # No had_report field — legacy entry + } + } + }, f) + state = ef.ForwardState(path) + self.assertIs(state.status("abc123"), True, + "legacy entry must default to 'fully forwarded'") + + def test_state_status_returns_none_for_unknown_sha(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + self.assertIs(state.status("never-seen"), None) + + def test_state_mark_with_had_report_false(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + state.mark_forwarded("xyz", "f.AB0", 100, had_report=False) + self.assertIs(state.status("xyz"), False) + # Subsequent re-mark with had_report=True promotes to done. + state.mark_forwarded("xyz", "f.AB0", 100, had_report=True) + self.assertIs(state.status("xyz"), True) + + def test_defers_when_txt_missing_and_within_grace(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK44.AB0", age_seconds=15, content=b"binary") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_old_files_beyond_max_age_days(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # 10 days old, but max_age_days=1 → should be excluded + self._make(tmp_p, "M529LK44.AB0", age_seconds=10 * 86400, + content=b"binary") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=10 * 86400, + content=b"report") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=1, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_ignores_mlg_and_other_non_event_files(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") + self._make(tmp_p, "agent.log", age_seconds=120, content=b"log") + self._make(tmp_p, "config.ini", age_seconds=120, content=b"cfg") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_max_per_pass_caps_returned_count(self): + """When max_per_pass is set, return at most that many pairs.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # Create 5 distinct event binaries with paired .TXTs + for i, name in enumerate( + ["M529LK01.AB0", "M529LK02.AB0", "M529LK03.AB0", + "M529LK04.AB0", "M529LK05.AB0"], + ): + self._make(tmp_p, name, age_seconds=120 + i, + content=("bin-" + str(i)).encode()) + self._make(tmp_p, name + ".TXT", age_seconds=110 + i, + content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + max_per_pass=2, + ) + self.assertEqual(len(pending), 2) + + def test_max_per_pass_zero_means_unlimited(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + for i in range(4): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=120 + i, + content=("bin-" + str(i)).encode()) + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + max_per_pass=0, + ) + self.assertEqual(len(pending), 4) + + def test_max_per_pass_returns_oldest_first(self): + """Backfill should advance chronologically — oldest qualifying + files first. This way successive scans always make progress + instead of getting stuck re-considering the same N newest files.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # ages: 200s (oldest), 150s, 100s, 50s (skipped — within grace) + ages = [200, 150, 100, 50] + for i, age in enumerate(ages): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=age, content=("c" + str(i)).encode()) + self._make(tmp_p, "M529LK0{}.AB0.TXT".format(i), + age_seconds=age - 10, content=b"r") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, max_per_pass=2, + ) + # Oldest two should be M529LK00 (200s) and M529LK01 (150s) + names = [os.path.basename(p[0]) for p in pending] + self.assertEqual(names, ["M529LK00.AB0", "M529LK01.AB0"]) + + +# ── Seed-state mode ────────────────────────────────────────────────────────── + + +class TestSeedStateFromFolder(unittest.TestCase): + + def _make(self, dir_path: Path, name: str, age_seconds: float = 100, + content: bytes = b"x") -> Path: + p = dir_path / name + p.write_bytes(content) + target = time.time() - age_seconds + os.utime(p, (target, target)) + return p + + def test_seeds_every_in_window_event_without_posting(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + for i in range(3): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=120 + i, content=("e" + str(i)).encode()) + # Plus a non-event file we should ignore + self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts = ef.seed_state_from_folder( + str(tmp_p), state, max_age_days=30, + ) + self.assertEqual(counts["scanned"], 3) + self.assertEqual(counts["seeded"], 3) + self.assertEqual(counts["already_known"], 0) + self.assertEqual(state.count(), 3) + + def test_seed_skips_files_beyond_max_age_days(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"new") + self._make(tmp_p, "M529LK02.AB0", age_seconds=10 * 86400, + content=b"in-window") # 10d < 30d cutoff + self._make(tmp_p, "M529LK03.AB0", age_seconds=400 * 86400, + content=b"way-old") # 400d > 30d cutoff + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts = ef.seed_state_from_folder( + str(tmp_p), state, max_age_days=30, + ) + self.assertEqual(counts["seeded"], 2) + self.assertEqual(counts["skipped_too_old"], 1) + + def test_seeded_files_are_then_skipped_by_normal_scan(self): + """End-to-end: seed once, then a normal scan should produce + zero pending events for the seeded files.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") + self._make(tmp_p, "M529LK01.AB0.TXT", age_seconds=110, content=b"r") + self._make(tmp_p, "M529LK02.AB0", age_seconds=120, content=b"y") + self._make(tmp_p, "M529LK02.AB0.TXT", age_seconds=110, content=b"r") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0, + "seed should have marked everything already-forwarded") + + def test_seed_is_idempotent(self): + """Re-running seed twice doesn't duplicate entries or POST anything.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts1 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + counts2 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + self.assertEqual(counts1["seeded"], 1) + self.assertEqual(counts2["seeded"], 0) + self.assertEqual(counts2["already_known"], 1) + self.assertEqual(state.count(), 1) + + +# ── Multipart encoder ──────────────────────────────────────────────────────── + + +class TestMultipartEncoder(unittest.TestCase): + + def test_encodes_two_parts_with_proper_boundary(self): + body, content_type = ef._encode_multipart([ + ("files", "a.bin", "application/octet-stream", b"\x01\x02"), + ("files", "a.txt", "text/plain", b"hello"), + ]) + # Content-Type header carries the boundary + self.assertTrue(content_type.startswith("multipart/form-data; boundary=")) + boundary = content_type.split("boundary=", 1)[1] + self.assertIn(boundary.encode("ascii"), body) + + # Body shape + text = body.decode("latin-1") + self.assertIn(f'name="files"; filename="a.bin"', text) + self.assertIn(f'name="files"; filename="a.txt"', text) + self.assertIn("Content-Type: application/octet-stream", text) + self.assertIn("Content-Type: text/plain", text) + # Trailing close boundary present + self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--")) + + +# ── End-to-end forward_event_pair against a fake server ────────────────────── + + +class _FakeImportHandler(http.server.BaseHTTPRequestHandler): + """Mimics seismo-relay's POST /db/import/blastware_file response.""" + received = [] # class-level capture for test inspection + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + ctype = self.headers.get("Content-Type", "") + + # Crude multipart split — enough to count parts and grab filenames. + parts = body.split(b"--" + ctype.split("boundary=")[-1].encode()) + # Locate filename= occurrences — that's our part count + filenames = [] + for p in parts: + for line in p.split(b"\r\n"): + if b'filename="' in line: + fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0] + filenames.append(fn.decode("latin-1")) + + self.__class__.received.append({ + "path": self.path, + "ctype": ctype, + "filenames": filenames, + }) + + # Build a faux SFM response: success for the first .bin-style filename + results = [] + binary_fn = next( + (fn for fn in filenames if not fn.lower().endswith(".txt")), + None, + ) + if binary_fn: + results.append({ + "filename": binary_fn, + "status": "ok", + "stored_filename": binary_fn, + "filesize": len(body), + "sha256": "00" * 32, + "report_attached": any(fn.lower().endswith(".txt") for fn in filenames), + "inserted": 1, + "skipped": 0, + }) + + payload = json.dumps({"count": len(results), "results": results}).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, *_a, **_kw): # silence the test runner + pass + + +def _start_fake_server() -> tuple[http.server.HTTPServer, str]: + """Start an HTTPServer on a random local port; return (server, base_url).""" + server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler) + threading.Thread(target=server.serve_forever, daemon=True).start() + host, port = server.server_address + return server, f"http://{host}:{port}" + + +class TestForwardEventPair(unittest.TestCase): + + def setUp(self): + _FakeImportHandler.received = [] + self.server, self.base_url = _start_fake_server() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + + def test_post_with_paired_report(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = tmp_p / "M529LK44.AB0" + txt_p = tmp_p / "M529LK44.AB0.TXT" + bin_p.write_bytes(b"\x10\x20\x30 binary") + txt_p.write_bytes(b'"Serial Number : BE11529"\n') + + result = ef.forward_event_pair( + self.base_url, str(bin_p), str(txt_p), timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertEqual(result["filename"], "M529LK44.AB0") + self.assertTrue(result["report_attached"]) + + self.assertEqual(len(_FakeImportHandler.received), 1) + req = _FakeImportHandler.received[0] + self.assertEqual(req["path"], "/db/import/blastware_file") + self.assertIn("M529LK44.AB0", req["filenames"]) + self.assertIn("M529LK44.AB0.TXT", req["filenames"]) + + def test_post_without_report(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "M529LK44.AB0" + bin_p.write_bytes(b"binary only") + + result = ef.forward_event_pair( + self.base_url, str(bin_p), None, timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertFalse(result["report_attached"]) + req = _FakeImportHandler.received[0] + self.assertEqual(req["filenames"], ["M529LK44.AB0"]) + + def test_post_propagates_serial_hint_in_query(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "M529LK44.AB0" + bin_p.write_bytes(b"x") + ef.forward_event_pair( + self.base_url, str(bin_p), None, + serial_hint="BE11529", timeout=5.0, + ) + req = _FakeImportHandler.received[0] + self.assertIn("serial=BE11529", req["path"]) + + +if __name__ == "__main__": + unittest.main()