16 Commits

Author SHA1 Message Date
serversdown a457619158 Merge pull request 'doc: update to 1.5.0' (#10) from dev into main
Reviewed-on: #10
2026-05-11 12:31:57 -04:00
serversdown 48fba65f20 doc: update to 1.5.0 2026-05-11 16:31:21 +00:00
serversdown 9b3ae6d548 Merge pull request 'feat(forward): SFM event forwarder (v1.5.0)' (#9) from dev into main
Reviewed-on: #9
2026-05-11 12:29:21 -04:00
serversdown 65b3af90ae feat(forward): re-pair late-arriving TXTs on subsequent scans
When a binary is forwarded WITHOUT its paired _ASCII.TXT (because
the TXT wasn't quiescent within the grace period — BW slow to
write, AV scanning, etc.), the old behaviour was to permanently
mark the binary as "done" in the state file, even though the TXT
might land seconds later.  Result: that event lived in SFM forever
with broken-codec peak values and no project info.

Fix: state entries now carry a had_report flag.  Forwards without
a TXT set had_report=False.  On subsequent scans, the watcher
treats had_report=False entries as re-pair candidates — they get
re-forwarded once the TXT appears, and the SFM server's upsert
path (in seismo-relay's insert_events IntegrityError handler)
refreshes the DB row with the report's authoritative values.

Three status states in ForwardState.status(sha256):
  None  — never forwarded.  First-forward path.
  True  — forwarded successfully WITH report (or legacy entry
          without the had_report field).  Permanently done.
  False — forwarded WITHOUT report.  Re-pair if TXT now exists.

Backward compat: legacy state-file entries (no had_report key)
default to True so existing deployments don't unexpectedly
re-forward every entry on upgrade.

Tests cover:
  - re-pair when TXT appears after a had_report=False forward
  - had_report=True entries stay skipped permanently
  - legacy entries (missing field) treated as fully forwarded
  - state.status() returns None for unknown sha
  - re-marking had_report=False then True promotes to fully-done

36 watcher tests pass (was 31, +5 new).
2026-05-11 16:22:53 +00:00
serversdown e6c25ab941 Merge pull request 'feat(forward): SFM event forwarder (v1.5.0)' (#8) from sfm-event-forwarder into dev
Reviewed-on: #8
2026-05-11 12:10:44 -04:00
serversdown c81f4ee61f docs(README): add Roadmap (Future) section
Captures the watcher-side deferred work from the v1.5.0 development
thread:

  - File archive manager (90-day events into year/month subfolders)
  - MLG forwarding (currently excluded from is_event_binary())
  - Pre-deploy seed-state UX in the Settings dialog

Points operators to seismo-relay's parallel Roadmap for the
server-side counterparts (MLG ingest endpoint, etc.).
2026-05-11 16:08:15 +00:00
serversdown 19548466ad chore(release): consolidate v1.5.1–v1.5.4 into single v1.5.0 Unreleased entry
None of v1.5.1 / v1.5.2 / v1.5.3 / v1.5.4 ever shipped — they only
existed as separate CHANGELOG entries on this unmerged feature
branch.  SemVer ties version numbers to releases, not commits.
From a field machine's perspective, the world skips straight from
v1.4.4 to whatever the next built-and-pushed installer is tagged.

Revert VERSION / AppVersion / module-docstring version comments
to v1.5.0 across:
  - series3_watcher.py     (VERSION = "1.5.0")
  - installer.iss          (AppVersion=1.5.0)
  - series3_tray.py        (docstring)
  - settings_dialog.py     (docstring)
  - README.md              (banner + footer)

Consolidate the four split CHANGELOG sections into a single
"## [Unreleased] — v1.5.0" entry covering all the work in one
release.  Configuration key table consolidated into one place.
No commit history rewriting — the per-commit code changes still
make sense as logical units for code review; only the surface-
level version metadata + CHANGELOG narrative was misleading.

Going forward: accumulate work-in-progress under an "[Unreleased]"
heading, bump VERSION only at actual release time.
2026-05-10 22:24:35 +00:00
serversdown 770336e09f fix(forward): pair BW ACH ASCII reports using the _ASCII.TXT convention (v1.5.4)
Blastware's official Auto Call Home server writes per-event ASCII
reports as <stem>_<ext>_ASCII.TXT (e.g. N844L20G_630H_ASCII.TXT),
not <binary>.TXT (e.g. N844L20G.630H.TXT).  Versions v1.5.0–v1.5.3
only looked for the latter and silently shipped every binary alone,
so the SFM database lost the per-event Peak Acceleration / Peak
Displacement / ZC Freq / Time of Peak / Peak Vector Sum + time /
sensor self-check fields on every forwarded event.

Fix: pair-finding logic now tries the ACH-convention filename first
and falls back to <binary>.TXT for compatibility with operator-saved
manual exports and existing test fixtures.

  ach_report_name("M529LK44.AB0")    → "M529LK44_AB0_ASCII.TXT"
  legacy_report_name("M529LK44.AB0") → "M529LK44.AB0.TXT"

When both files exist (operator manually saved + ACH auto-exported),
ACH wins because that's the canonical name on modern BW deployments.
Both candidates checked case-insensitively against the cached
directory listing — no extra stat() calls.

6 new unit tests cover the new pairing logic, helper-function
correctness, and the precedence rule.  Total now 31 tests, all green.

Field-deploy note: re-running v1.5.4 on a folder where v1.5.0–v1.5.3
already ran will NOT re-forward historical events — the
sfm_forwarded.json state file remembers them by sha256.  To re-forward
historical events to populate SFM with the now-correctly-paired
reports, delete the state file before starting v1.5.4.
2026-05-10 20:10:38 +00:00
serversdown a166918a9d fix(forward-log): distinguish histograms from missing-report (v1.5.3)
On machines running histogram-mode units (extensions ending in H,
e.g. H907L1R7.PG0H), every forwarded event was logging "no report"
even though histograms never get auto-exported ASCII reports from
Blastware — making the log look like every forward was misconfigured
when in fact things were working correctly.

Three log states now:

  - Waveform + paired TXT
      → "+ <txt> attached"
  - Waveform without TXT (likely BW config issue)
      → "no report ⚠"
  - Histogram (any flavour)
      → "(histogram, no report expected)"

New is_histogram_event() helper classifies by BW filename extension:
4-char ext ending in H = histogram; old-firmware 3-char extensions
default to non-histogram (safe default — we'd rather flag a missing
report than silently suppress the warning on a real waveform event).

Forwarding logic itself is unchanged — this is purely log clarity.
2026-05-10 01:39:22 +00:00
serversdown 815c643fb2 feat(forward): rate cap + seed-state mode for safe backfill (v1.5.2)
Two safety nets for first-deploy on Blastware ACH machines that
have accumulated tens or hundreds of thousands of historical events
in the watch folder.

1. SFM_MAX_FORWARDS_PER_PASS (default 500, 0=unlimited)
   ---------------------------------------------------
   Cap on the number of events forwarded per scan tick.  At the
   60-second default interval that's ~30K events/hour throughput —
   the SFM server gets a steady drip instead of one giant burst.
   Scan now sorts by mtime ascending so backfill advances
   chronologically (oldest first) and successive scans always
   make progress instead of re-considering the same N newest files.

   Wired into:
     - event_forwarder.find_pending_events / forward_pending
     - series3_watcher.run_watcher loop
     - config-template.ini
     - settings_dialog SFM Forward tab (new "Max Events Per Pass"
       spinbox, validated in _on_save)

2. event_forwarder.py --seed-state CLI
   -----------------------------------
   One-shot mode that walks the watch folder, sha256s every in-window
   event binary, and marks them all as already-forwarded WITHOUT
   POSTing anything.  Run before flipping SFM_FORWARD_ENABLED=true
   to skip the historical backfill entirely — the watcher then only
   forwards events that appear AFTER the seed.

   Usage:
       python event_forwarder.py --seed-state \
           --watch "C:\Blastware 10\Event\autocall home" \
           --state "C:\...\sfm_forwarded.json" \
           [--max-age-days 365]

7 new unit tests:
  - max_per_pass cap enforcement (=N, =0 unlimited, oldest-first
    ordering)
  - seed-state mode (in-window seeding, max-age skip,
    end-to-end skip-after-seed, idempotent re-runs)

README adds a "First-time deployment" section walking through both
options.  Bumps to v1.5.2.
2026-05-10 00:20:10 +00:00
serversdown 3ee0cae31e fix(settings): add SFM Forward tab to settings dialog (v1.5.1)
v1.5.0 shipped the forwarder module + INI keys but the settings
dialog wasn't extended, so the only way operators could enable
forwarding was hand-editing config.ini.  This adds a sixth tab
("SFM Forward") with:

  - Forward events to SFM    checkbox
  - SFM Server URL           entry + Test button (GETs /health)
  - Forward Interval (sec)   spinbox
  - Quiescence (sec)         spinbox
  - Missing-Report Grace     spinbox
  - HTTP Timeout             spinbox
  - State File               entry + Browse...

Save-time guard: enabling the forwarder without a URL raises a
validation error rather than silently saving a non-functional
config.

Patch release — same on-disk INI schema, no config migration.
2026-05-10 00:01:25 +00:00
serversdown f4ec6ef945 feat(forward): SFM event forwarder (v1.5.0)
When SFM_FORWARD_ENABLED=true and SFM_URL is set, every Blastware
event binary in the ACH watch folder is forwarded to an SFM server's
/db/import/blastware_file endpoint as a multipart POST.  The paired
<binary>.TXT ASCII report (which Blastware's ACH writes alongside
each event) is shipped in the same request, letting the SFM server
index the full per-channel stats — PPV, ZC Freq, Time of Peak, Peak
Acceleration / Displacement, Peak Vector Sum + time, sensor
self-check Pass/Fail per channel, and monitor-log timestamps —
without depending on the still-undecoded BW waveform body codec.

New module event_forwarder.py:
  - is_event_binary() filename matcher (BW's <P><serial3><stem>.<ext>
    scheme; rejects .MLG, .TXT, .log, .ini, .h5, etc.)
  - ForwardState (.json file keyed by sha256 — idempotent across
    restarts and auto-updates)
  - find_pending_events() with quiescence + grace-period guards
  - Hand-rolled multipart encoder (stdlib-only)
  - forward_event_pair() / forward_pending() — POST loop with
    structured per-event outcomes

Wired into series3_watcher.run_watcher() on its own cadence
(SFM_FORWARD_INTERVAL_SECONDS, default 60s) so it doesn't slow the
existing 5-min heartbeat scan.

Default-off: existing 1.4.x deployments keep their old behaviour
after auto-updating until an operator sets SFM_URL +
SFM_FORWARD_ENABLED=true and restarts.

17 unit tests in test_event_forwarder.py cover filename matching,
state idempotency, scan logic (quiescence, grace, max age,
already-forwarded, .TXT pairing), multipart byte shape, and an
end-to-end POST against a tiny stdlib http.server fake.

Bumps version 1.4.4 → 1.5.0 (minor — additive feature, no API break).
Requires SFM server v0.16+ for the paired-.TXT import endpoint.
2026-05-09 00:03:31 +00:00
serversdown 1abdc13645 Merge pull request 'bump to 1.4.4 (the nonupdate update)' (#6) from dev into main
Reviewed-on: #6
2026-03-17 21:54:59 -04:00
claude 010016d515 bump to 1.4.4 (the nonupdate update)
chore: clean up code, deprecate status config.
2026-03-17 21:54:15 -04:00
serversdown f790b21808 Merge pull request 'merge v1.4.3' (#5) from dev into main
Reviewed-on: #5
2026-03-17 21:11:41 -04:00
claude 439feb9942 Feat: Update settings tab implemented.
Auto-updates now configurable (URL, source (gitea or private server), log activity for auto updates.
fix: Update now hardened to prevent installation of corrupt or incorrect .exe files. (security to be hardened in the future)
2026-03-17 21:08:37 -04:00
10 changed files with 2217 additions and 41 deletions
+53
View File
@@ -6,6 +6,59 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
---
## [5-11-26] — v1.5.0
First release of the SFM event forwarder.
### Added — SFM event forwarder
- **Forward Blastware event binaries (+ paired BW ACH ASCII reports) to an SFM server.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every event binary in the BW ACH watch folder is POSTed as multipart to `/db/import/blastware_file` along with its `<stem>_<ext>_ASCII.TXT` partner report (BW ACH convention; manual-export `<binary>.TXT` is also supported as a fallback). SFM parses the report and indexes the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) into a searchable database — no codec decoding required.
- **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `<log dir>/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates.
- **Re-pair on late-arriving TXT.** When the watcher forwards a binary alone (`_ASCII.TXT` partner didn't appear within `SFM_MISSING_REPORT_GRACE_SECONDS`), the state file records `had_report: false`. On subsequent scans, the watcher re-checks whether the TXT has since arrived. If yes, the event is re-forwarded with the TXT attached — the SFM server's upsert path refreshes the DB row with the report's device-authoritative peak / project values. Without this, slow-disk or AV-interrupted TXT writes would permanently leave that event with broken-codec peaks in the SFM database. Legacy state-file entries (without the `had_report` field) default to `had_report: true` so an upgrade doesn't unexpectedly re-forward existing entries.
- **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's report partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever.
- **Per-pass rate cap.** `SFM_MAX_FORWARDS_PER_PASS` (default 500) drips first-deploy backfill instead of hammering the SFM server in one burst. At 60-second `SFM_FORWARD_INTERVAL_SECONDS` cadence that's ~30K events/hour throughput. Set to `0` for unlimited. Scan walks oldest-first so backfill advances chronologically and successive scans reliably progress.
- **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. Recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear from then on get forwarded.
- **SFM Forward tab in the Settings dialog** with: Forward checkbox, SFM URL + Test button (GETs `/health`), Forward Interval / Quiescence / Missing-Report Grace / HTTP Timeout / Max Events Per Pass spinboxes, State File entry with Browse... Save-time guard: enabling forwarding without a URL shows a validation error.
- **Histogram-aware log clarity.** Histogram events (extensions ending in `H`) don't get auto-exported reports from BW; the log distinguishes that case (`(histogram, no report expected)`) from a waveform with unexpectedly missing report (`no report ⚠`).
- **README "First-time deployment" section** documenting the seed-state workflow + the rate cap as belt-and-suspenders for safe rollout on machines with hundreds of thousands of historical events.
- 31 new unit tests in `test_event_forwarder.py` covering filename matching, state idempotency, scan logic (quiescence / grace period / max age / already-forwarded / TXT pairing), multipart byte shape, rate cap (oldest-first, cap=0 unlimited, cap=N enforcement), seed-state mode (in-window seeding / max-age skip / end-to-end skip-after-seed / idempotent re-runs), histogram classification, and an end-to-end POST against a stdlib fake server.
### Configuration
New `[agent]` keys (all default-off — existing 1.4.x deployments don't change behaviour on auto-update):
| Key | Default | Notes |
|---|---|---|
| `SFM_FORWARD_ENABLED` | `false` | Master toggle for the forwarder |
| `SFM_URL` | empty | e.g. `http://10.0.0.44:8200` |
| `SFM_FORWARD_INTERVAL_SECONDS` | `60` | Scan-and-forward cadence |
| `SFM_QUIESCENCE_SECONDS` | `5` | Skip files modified in the last N seconds |
| `SFM_MISSING_REPORT_GRACE_SECONDS` | `60` | Forward without TXT after this delay |
| `SFM_HTTP_TIMEOUT` | `60` | Per-request HTTP timeout |
| `SFM_STATE_FILE` | `<log dir>/sfm_forwarded.json` | Override location of the forwarded-sha256 state file |
| `SFM_MAX_FORWARDS_PER_PASS` | `500` | Per-scan cap (`0` = unlimited) |
### Compatibility
- Requires SFM server v0.16+ (the `/db/import/blastware_file` endpoint that accepts paired `_ASCII.TXT` reports + the BW-report label normalisation — released alongside this watcher version on the seismo-relay side).
## [1.4.4] - 2026-03-17
### Removed
- `OK_HOURS` and `MISSING_HOURS` config keys and Settings dialog fields removed — unit status thresholds are calculated by terra-view from raw `age_minutes`, not by the watcher. These fields had no effect since v1.4.2.
## [1.4.3] - 2026-03-17
### Added
- Auto-updater now logs all activity to the watcher log file (`[updater]` prefix) — silent failures are now visible.
- Configurable update source: `UPDATE_SOURCE = gitea` (default), `url`, or `disabled`. In `url` mode the watcher fetches `version.txt` and the `.exe` from a custom base URL (e.g. terra-view) instead of the Gitea API — enables updates on isolated networks that cannot reach Gitea. `disabled` turns off automatic checks while keeping the remote push path (from terra-view) functional.
- New **Updates** tab in the Settings dialog to configure `UPDATE_SOURCE` and `UPDATE_URL`.
### Fixed
- Downloaded `.exe` is now validated before applying: absolute size floor (100 KB), relative size floor (50% of current exe), and MZ magic bytes check. A corrupt or truncated download is now rejected and logged rather than silently overwriting the live exe.
- Swap `.bat` now backs up the current exe as `<exe>.old` before overwriting, providing a manual rollback copy if needed.
- Swap `.bat` retry loop is now capped at 5 attempts — was previously infinite if the file remained locked.
- Swap `.bat` now cleans up the temp download file on both success and failure.
## [1.4.2] - 2026-03-17
### Changed
+55 -4
View File
@@ -1,4 +1,4 @@
# Series 3 Watcher v1.4.2
# Series 3 Watcher v1.5.0
Monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. Runs as a **system tray app** that starts automatically on login, reports heartbeats to terra-view, and self-updates from Gitea.
@@ -71,8 +71,6 @@ All settings live in `config.ini`. The Setup Wizard covers every field, but here
| Key | Description |
|-----|-------------|
| `SCAN_INTERVAL_SECONDS` | How often to scan the folder (default `300`) |
| `OK_HOURS` | Age threshold for OK status (default `12`) |
| `MISSING_HOURS` | Age threshold for Missing status (default `24`) |
| `MLG_HEADER_BYTES` | Bytes to read from each `.MLG` header for unit ID (default `2048`) |
| `RECENT_WARN_DAYS` | Log unsniffable files newer than this window |
@@ -83,6 +81,47 @@ All settings live in `config.ini`. The Setup Wizard covers every field, but here
| `ENABLE_LOGGING` | `true` / `false` |
| `LOG_RETENTION_DAYS` | Auto-clear log after this many days (default `30`) |
### Auto-Updater
| Key | Description |
|-----|-------------|
| `UPDATE_SOURCE` | `gitea` (default) or `url` — where to check for updates |
| `UPDATE_URL` | Base URL of the update server when `UPDATE_SOURCE = url` (e.g. terra-view URL). The watcher fetches `/api/updates/series3-watcher/version.txt` and `/api/updates/series3-watcher/series3-watcher.exe` from this base. |
### SFM Event Forwarder (v1.5.0+)
Forwards each Blastware event binary (and its paired `<binary>.TXT` ASCII report when present) to an SFM server's `/db/import/blastware_file` endpoint, where the report is parsed and the rich per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, sensor self-check) land in a searchable database. **Default-off** — existing deployments keep their old behaviour after auto-updating until the operator opts in.
| Key | Description |
|-----|-------------|
| `SFM_FORWARD_ENABLED` | `true` to enable forwarding (default `false`) |
| `SFM_URL` | Base URL of the SFM server, e.g. `http://10.0.0.44:8200` |
| `SFM_FORWARD_INTERVAL_SECONDS` | Scan-and-forward cadence (default `60`); independent of the heartbeat interval |
| `SFM_QUIESCENCE_SECONDS` | Skip files modified within this many seconds (default `5`) — avoids forwarding mid-write |
| `SFM_MISSING_REPORT_GRACE_SECONDS` | If a `.TXT` partner hasn't appeared after this many seconds, forward the binary alone (default `60`) |
| `SFM_HTTP_TIMEOUT` | Per-request HTTP timeout in seconds (default `60`) |
| `SFM_STATE_FILE` | Path to the JSON state file tracking sha256 of forwarded events. Leave blank to default to `<log dir>/sfm_forwarded.json` |
| `SFM_MAX_FORWARDS_PER_PASS` | Max events forwarded per scan tick (default `500`, `0` = unlimited). Drip-feeds backfill so a folder with thousands of qualifying events doesn't hammer the SFM server in one giant burst. |
Forwarded files are tracked by sha256 in the state file, so re-scans / restarts / auto-updates never re-POST the same content. A failed POST stays in the pending pool and is retried on the next interval.
#### First-time deployment on a folder with a large historical archive
If you're enabling SFM forwarding on a Blastware ACH machine that's been accumulating events for years (tens or hundreds of thousands of files in the watch folder), you almost certainly **don't** want the watcher to forward all of them on first run. Two options:
1. **Skip the historical backfill (recommended).** Run the seed-state CLI once before flipping `SFM_FORWARD_ENABLED=true`. It walks the folder, sha256s every existing in-window event, and marks them all as already-forwarded — without POSTing anything. The watcher then only forwards events that appear *after* the seed run.
```
python event_forwarder.py --seed-state ^
--watch "C:\Blastware 10\Event\autocall home" ^
--state "C:\Users\<you>\AppData\Local\Series3Watcher\agent_logs\sfm_forwarded.json" ^
--max-age-days 365
```
2. **Throttle the backfill.** Leave `SFM_MAX_FORWARDS_PER_PASS` at its 500 default and let the watcher drip-feed. With a 60-second `SFM_FORWARD_INTERVAL_SECONDS` that's ~30K events/hour throughput. Backfill of 30K events takes about an hour, 100K takes ~3.5 hours. The cap fires per scan, so heartbeat and forwarding share the watcher's main loop without saturating it.
Combine both for a fully controlled rollout: seed-state to skip the deep archive, then leave the cap on as a steady-state safety net.
---
## Tray Icon
@@ -114,9 +153,21 @@ To view connected watchers: **Settings → Developer → Watcher Manager**.
---
## Roadmap (Future)
Deferred work — parked but worth tracking. Pairs with seismo-relay's
[Roadmap (Future)](https://gitea.serversdown.net/serversdown/seismo-relay#roadmap-future)
where the corresponding server-side work lives.
- [ ] **File archive manager.** Move BW autocall-home events older than ~90 days into `<watch_folder>_archive/<year>/<month>/` subfolders so the active watch directory doesn't accumulate hundreds of thousands of entries (filesystem dir lookups slow at 100K+, BW UI hangs opening the folder, watcher's own scandir gets expensive). Plan drafted in the codec-RE branch's plan-mode session, including a critical pre-coding test (does Blastware UI walk subfolders or only see the flat watch folder?) that determines the archive layout (in-place subfolders vs sibling archive). Default-off, dry-run mode, opt-in per machine.
- [ ] **MLG forwarding.** Currently the watcher's `is_event_binary()` filter explicitly excludes `.MLG` per-unit monitor log files — only event binaries (`.AB0` / `.PG0H` / etc.) and their paired `_ASCII.TXT` reports get forwarded. Adding an `POST /db/import/mlg_file` SFM endpoint + a parallel `.MLG` scan path on the watcher would populate `monitor_log` rows for non-ACH-routed units (coverage queries, "was this unit monitoring on date X" lookups). MLG files are append-only / mutable so the watcher needs a different dedup strategy than the per-event sha256 state file — better to forward whole file every scan and let the server dedup by `(serial, start_time)` on insert.
- [ ] **Pre-deploy seed-state UX in the Settings dialog.** Currently `event_forwarder.py --seed-state` is a CLI-only operation. A "Skip backfill" button next to the SFM Forward checkbox would let operators opt-out of re-forwarding the historical archive without dropping to a command line.
---
## Versioning
Follows **Semantic Versioning**. Current release: **v1.4.2**.
Follows **Semantic Versioning**. Current release: **v1.5.0**.
See `CHANGELOG.md` for full history.
---
+16 -3
View File
@@ -2,19 +2,32 @@
echo Building series3-watcher.exe...
pip install pyinstaller pystray Pillow
REM Extract version from series3_watcher.py (looks for: VERSION = "1.4.2")
for /f "tokens=3 delims= " %%V in ('findstr /C:"VERSION = " series3_watcher.py') do set RAW_VER=%%V
set VERSION=%RAW_VER:"=%
set EXE_NAME=series3-watcher-%VERSION%
echo Version: %VERSION%
echo Output: dist\%EXE_NAME%.exe
REM Check whether icon.ico exists alongside this script.
REM If it does, embed it as the .exe icon AND bundle it as a data file
REM so the tray overlay can load it at runtime.
if exist "%~dp0icon.ico" (
pyinstaller --onefile --windowed --name series3-watcher ^
pyinstaller --onefile --windowed --name "%EXE_NAME%" ^
--icon="%~dp0icon.ico" ^
--add-data "%~dp0icon.ico;." ^
series3_tray.py
) else (
echo [INFO] icon.ico not found -- building without custom icon.
pyinstaller --onefile --windowed --name series3-watcher series3_tray.py
pyinstaller --onefile --windowed --name "%EXE_NAME%" series3_tray.py
)
REM Copy versioned exe to plain name for Inno Setup
copy /Y "dist\%EXE_NAME%.exe" "dist\series3-watcher.exe"
echo.
echo Done. Check dist\series3-watcher.exe
echo Done.
echo Gitea upload: dist\%EXE_NAME%.exe
echo Inno Setup: dist\series3-watcher.exe (copy of above)
pause
+45 -2
View File
@@ -14,8 +14,6 @@ MAX_EVENT_AGE_DAYS = 365
# Scanning
SCAN_INTERVAL_SECONDS = 30
OK_HOURS = 12
MISSING_HOURS = 24
# Logging
ENABLE_LOGGING = True
@@ -29,3 +27,48 @@ MLG_HEADER_BYTES = 2048 ; used for unit-id extraction
DEEP_SNIFF = True ; toggle deep sniff on/off
SNIFF_BYTES = 65536 ; max bytes to scan for Notes/Cal
# Auto-updater source: gitea (default) or url
UPDATE_SOURCE = gitea
# If UPDATE_SOURCE = url, set UPDATE_URL to the base URL of the update server (e.g. terra-view)
UPDATE_URL =
# --- SFM Event Forwarder ---
# When enabled, every Blastware event binary (and its paired .TXT
# report when present) is forwarded to an SFM server's
# /db/import/blastware_file endpoint as a multipart POST. The SFM
# server parses the .TXT and indexes the event's full per-channel
# stats (PPV, ZC Freq, Time of Peak, Peak Acceleration, Peak
# Displacement, sensor self-check) for sortable / filterable review.
#
# Default-off so existing deployments don't change behaviour after an
# auto-update. To enable on a field machine: set SFM_URL, then flip
# SFM_FORWARD_ENABLED to true and restart the watcher.
SFM_FORWARD_ENABLED = false
SFM_URL = ; e.g. http://10.0.0.44:8200
SFM_FORWARD_INTERVAL_SECONDS = 60 ; scan-and-forward cadence (independent of heartbeat)
# Files modified within the last N seconds are skipped (BW may still
# be writing them). Defence against truncated uploads.
SFM_QUIESCENCE_SECONDS = 5
# If a binary's .TXT report hasn't appeared after this many seconds,
# forward the binary alone rather than blocking forever waiting.
SFM_MISSING_REPORT_GRACE_SECONDS = 60
# Per-request HTTP timeout (seconds).
SFM_HTTP_TIMEOUT = 60
# Path to the JSON state file tracking which events have been
# forwarded (sha256-keyed, idempotent across restarts). Leave blank
# to default to <log dir>/sfm_forwarded.json.
SFM_STATE_FILE =
# Per-pass cap — forward at most N events per scan tick. 0 = unlimited.
# Default 500 throttles first-deploy backfill on machines with large
# historical archives (tens or hundreds of thousands of events) so
# the SFM server isn't hammered with one giant burst. At 60s scan
# interval × 500 events/pass that's 30K events/hour throughput.
# See README "First-time deployment" for the recommended
# `--seed-state` workflow that skips the historical backfill entirely.
SFM_MAX_FORWARDS_PER_PASS = 500
+833
View File
@@ -0,0 +1,833 @@
"""
event_forwarder.py — forward Blastware event files to an SFM server.
Watches the same Blastware ACH folder the heartbeat path watches.
For each event binary that hasn't been forwarded yet, pairs it with
its `<binary>.TXT` report (when available) and POSTs both to SFM's
`/db/import/blastware_file` endpoint as one multipart request.
The receiving SFM server (seismo-relay v0.16+) detects paired binaries
and reports by filename, parses the .TXT into structured fields
(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak
Displacement / sensor self-check / monitor log), and persists every
field into the SFM database for sortable / filterable monthly-summary
review.
Design notes
────────────
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
Multipart encoding is hand-rolled.
- **Idempotent across restarts.** Forwarded files are tracked by
sha256 in a JSON state file (`.forwarded.json` next to config.ini).
Re-scanning the watch folder doesn't re-POST anything.
- **Default-off.** Callers must enable via config
(`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x
deployments that auto-update to the new version stay non-forwarding
until an operator flips the switch.
- **Quiescence guard.** Files modified within the last few seconds
are skipped — Blastware ACH writes the .TXT after the binary, so
we wait until both look stable before forwarding.
- **Best-effort report pairing.** When the .TXT hasn't appeared yet
but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the
binary is forwarded alone (the SFM endpoint accepts that and just
skips the rich fields — we'd rather get the binary indexed than
block forever waiting for a TXT that never arrived).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
log = logging.getLogger(__name__)
# Default tuning. All overridable via config.ini SFM_* keys.
DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds
DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout
STATE_SCHEMA_VERSION = 1
# ── Filename matching ─────────────────────────────────────────────────────────
#
# Blastware's filename scheme (confirmed in seismo-relay docs):
# prefix_letter (BZ) + 3-digit serial-tail + 4-char base36 timestamp stem
# + "." + 3-or-4-char extension.
# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00.
#
# We accept lowercase too because some filesystems lower-case names.
_EVENT_FILENAME_RE = re.compile(
r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$"
)
# Filenames we explicitly skip even if they happen to match the regex.
_NON_EVENT_EXTS = {
".mlg", # monitor-log files (separate heartbeat path)
".txt", # ASCII reports — handled via pairing, not as primary files
".log",
".ini",
".dat",
".bak",
".tmp",
".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence)
".h5",
".sfm.json",
".json",
}
def is_event_binary(path: str) -> bool:
"""Return True if `path`'s basename looks like a Blastware event binary."""
name = os.path.basename(path)
if not _EVENT_FILENAME_RE.match(name):
return False
ext = os.path.splitext(name)[1].lower()
if ext in _NON_EVENT_EXTS:
return False
return True
def ach_report_name(binary_name: str) -> str:
"""BW ACH report-naming convention.
Blastware's official Auto Call Home server writes per-event ASCII
reports as ``<stem>_<ext>_ASCII.TXT`` — the ``.`` between stem and
ext is replaced with ``_`` and ``_ASCII.TXT`` is appended.
Examples:
``M529LK44.AB0`` → ``M529LK44_AB0_ASCII.TXT``
``N844L20G.630H`` → ``N844L20G_630H_ASCII.TXT``
``H907L1R7.PG0H`` → ``H907L1R7_PG0H_ASCII.TXT``
For a filename without a dot (defensive — shouldn't happen for real
BW events) we still append ``_ASCII.TXT``.
"""
stem, dot, ext = binary_name.rpartition(".")
if not dot:
return binary_name + "_ASCII.TXT"
return stem + "_" + ext + "_ASCII.TXT"
def legacy_report_name(binary_name: str) -> str:
"""Manual-export convention: ``<binary>.TXT`` (e.g. when an operator
saves an event report to text directly from BW's UI rather than
letting ACH auto-export it). Kept as a fallback so the codec-agent
test fixtures (``decode-re/5-8-26/event-c/M529LK44.AB0.TXT``) still
pair correctly."""
return binary_name + ".TXT"
def report_path_for(binary_path: str) -> str:
"""Legacy entry point — returns the manual-export path. Prefer
:func:`ach_report_name` for new BW deployments. Retained for
backward compatibility with any caller still on the old convention."""
return legacy_report_name(binary_path)
def is_histogram_event(filename: str) -> bool:
"""True if the filename's extension marks the file as a Full Histogram
event (BW filename scheme: 4-char extensions of the form ``AB0T`` where
``T = H``). Old-firmware events use 3-char extensions where waveform-vs-
histogram is not encoded in the name; we can't tell those apart and
return False (the conservative answer — we don't want to suppress
"no report" warnings on potentially-waveform old-firmware events).
Used purely for log clarity — when a forward goes through without a
paired TXT, the log distinguishes "histogram, no report expected"
(acceptable: BW may not have written one even though it normally
does for ACH-routed histograms) from "no report ⚠" on a waveform
(more suspicious: BW almost always writes the TXT for waveform events).
Forwarding logic itself doesn't depend on this check.
"""
name = os.path.basename(filename)
ext = os.path.splitext(name)[1].lstrip(".").upper()
return len(ext) == 4 and ext.endswith("H")
# ── State file ────────────────────────────────────────────────────────────────
class ForwardState:
"""Idempotency record: which event files have we already forwarded?
State file format (JSON):
{
"version": 1,
"forwarded": {
"<sha256>": {
"filename": "M529LK44.AB0",
"size": 4400,
"forwarded_at": "2026-05-08T...Z"
},
...
}
}
Keyed by sha256 (not filename) so that re-saved or re-uploaded
identical content is recognised as already-forwarded even if the
file moved or got renamed. Filename is preserved for human
inspection.
"""
def __init__(self, path: str):
self.path = path
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
self._load()
def _load(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as f:
d = json.load(f)
if not isinstance(d, dict):
raise ValueError("state file root is not an object")
if d.get("version") != STATE_SCHEMA_VERSION:
log.warning(
"forward state version mismatch (got %r, want %d) — starting fresh",
d.get("version"), STATE_SCHEMA_VERSION,
)
return
forwarded = d.get("forwarded")
if isinstance(forwarded, dict):
self._data["forwarded"] = forwarded
except FileNotFoundError:
pass
except (OSError, ValueError, json.JSONDecodeError) as exc:
log.warning("failed to load forward state from %s: %s", self.path, exc)
def _save(self) -> None:
tmp = self.path + ".tmp"
try:
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except OSError as exc:
log.warning("failed to save forward state to %s: %s", self.path, exc)
def is_forwarded(self, sha256: str) -> bool:
return sha256 in self._data["forwarded"]
def status(self, sha256: str) -> Optional[bool]:
"""Return forwarding status for *sha256*.
Returns:
None — never forwarded. Eligible for a fresh forward.
True — forwarded successfully with its paired report
(or in a legacy entry that pre-dates the
had_report field — assumed complete for safety).
NOT a candidate for re-forward.
False — forwarded WITHOUT its paired ``_ASCII.TXT``
(BW's TXT-write lagged past the grace period).
Eligible for re-forward IF the TXT now exists,
so the SFM server's upsert path can refresh the
DB row with the report's authoritative values.
Legacy state-file entries without a ``had_report`` key default
to ``True`` so an upgrade doesn't unexpectedly re-forward
every entry the operator has accumulated.
"""
entry = self._data["forwarded"].get(sha256)
if entry is None:
return None
return bool(entry.get("had_report", True))
def mark_forwarded(
self,
sha256: str,
filename: str,
size: int,
had_report: bool = True,
) -> None:
"""Record a successful forward.
Set ``had_report=False`` when the forward shipped the binary
without its paired ASCII report. Such entries are re-checked
on subsequent scans and re-forwarded once the TXT appears, so
SFM's upsert refreshes the DB row with the device-authoritative
peak/project values.
Idempotent: re-marking an existing sha256 with ``had_report=True``
is the explicit promotion path used when a re-pair succeeds.
"""
self._data["forwarded"][sha256] = {
"filename": filename,
"size": size,
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"had_report": had_report,
}
self._save()
def count(self) -> int:
return len(self._data["forwarded"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def sha256_of_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
"""Return True if the file's mtime is at least `quiescence_seconds`
in the past — i.e. no longer being written."""
try:
mtime = os.path.getmtime(path)
except OSError:
return False
return (now_ts - mtime) >= quiescence_seconds
# ── Scan pass ─────────────────────────────────────────────────────────────────
def find_pending_events(
watch_dir: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
max_per_pass: int = 0,
) -> List[Tuple[str, Optional[str]]]:
"""
Walk `watch_dir` and return the list of (binary_path, txt_path_or_None)
pairs that need forwarding.
Filtering rules:
- Filename must match the BW event filename regex.
- File must be quiescent (mtime >= quiescence_seconds in the past).
- File must not exceed `max_age_days` (matches the heartbeat
path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the
forwarder).
- File's sha256 must NOT already be in the forwarded state.
- If a `<binary>.TXT` exists and is quiescent, we pair them.
Otherwise, if the binary is older than
missing_report_grace_seconds, we forward without the TXT.
Younger binaries with a missing TXT are deferred — let BW
finish writing the report.
- When `max_per_pass > 0`, return at most that many pairs.
Older files (lower mtime) are forwarded first so backfill
proceeds chronologically. Use this to drip-feed a folder
with thousands of qualifying events instead of hammering
the SFM server with one giant burst.
"""
if not os.path.isdir(watch_dir):
log.warning("forward scan: watch dir not found: %s", watch_dir)
return []
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
pending: List[Tuple[str, Optional[str]]] = []
skipped_inflight = 0
skipped_already_forwarded = 0
try:
with os.scandir(watch_dir) as it:
entries = list(it)
except OSError as exc:
log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc)
return []
# Cache existence of TXT partners so we don't stat() each twice.
names = {e.name for e in entries if e.is_file()}
# Sort by mtime ASCENDING so chronological backfill happens oldest-first.
# When max_per_pass clamps the list, we always advance — we don't get
# stuck re-considering the same N newest files every scan.
def _mtime(entry: os.DirEntry) -> float:
try:
return entry.stat().st_mtime
except OSError:
return 0.0
entries = sorted(
(e for e in entries if e.is_file()),
key=_mtime,
)
for e in entries:
if not e.is_file():
continue
if not is_event_binary(e.path):
continue
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
# Out-of-window: too old or too fresh
if (now_ts - mtime) > max_age_seconds:
continue
if not _is_quiescent(e.path, now_ts, quiescence_seconds):
skipped_inflight += 1
continue
# Idempotency: skip if we already forwarded this content
# successfully. Three cases via state.status(digest):
# True — forwarded WITH report → permanently done, skip.
# False — forwarded WITHOUT report → re-pair candidate.
# Forward again only if a paired TXT is now present
# so SFM's upsert refreshes the DB row.
# None — never forwarded → normal first-forward path.
try:
digest = sha256_of_file(e.path)
except OSError as exc:
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
continue
fwd_status = state.status(digest)
if fwd_status is True:
skipped_already_forwarded += 1
continue
# TXT pairing — try BW ACH convention first
# (<stem>_<ext>_ASCII.TXT) and fall back to the manual-export
# convention (<binary>.TXT). Both checked case-insensitively
# against the cached directory listing. ACH wins when both
# exist — that's the format BW's official ACH server writes.
candidates = [ach_report_name(e.name), legacy_report_name(e.name)]
# Case-insensitive name lookup against the cached set.
names_lc_to_actual = None
txt_name: Optional[str] = None
for cand in candidates:
if cand in names:
txt_name = cand
break
# Build lower-case index lazily — most folders have very few
# TXT files relative to binaries, so the linear scan only
# fires when neither exact-case candidate matches.
if names_lc_to_actual is None:
names_lc_to_actual = {n.lower(): n for n in names}
actual = names_lc_to_actual.get(cand.lower())
if actual:
txt_name = actual
break
txt_path: Optional[str] = None
if txt_name:
candidate = os.path.join(watch_dir, txt_name)
if _is_quiescent(candidate, now_ts, quiescence_seconds):
txt_path = candidate
# else: TXT is mid-write; treat as not-yet-paired and defer.
if fwd_status is False:
# Previously forwarded WITHOUT report. We're here looking
# for a re-pair opportunity. If the TXT is now present
# and quiescent, include in pending for re-forward (the
# SFM server's upsert will refresh the DB row with the
# report's authoritative values). Otherwise skip — no
# point re-forwarding the same binary alone again.
if txt_path is None:
skipped_already_forwarded += 1
continue
elif txt_path is None:
# First-time forward and TXT not yet present. Wait for the
# grace period before forwarding alone.
if (now_ts - mtime) < missing_report_grace_seconds:
skipped_inflight += 1
continue
pending.append((e.path, txt_path))
# Stash size + digest on the tuple-replacement for use during forward;
# callers can re-derive but caching avoids a second sha256.
# Per-pass cap: once we have enough pending, stop scanning.
if max_per_pass and len(pending) >= max_per_pass:
break
log.debug(
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
)
return pending
# ── Multipart upload ──────────────────────────────────────────────────────────
def _encode_multipart(
parts: List[Tuple[str, str, str, bytes]],
) -> Tuple[bytes, str]:
"""Encode a list of (field_name, filename, content_type, data) tuples
as a multipart/form-data body. Returns (body_bytes, content_type
header value)."""
boundary = "----Series3WatcherBoundary" + os.urandom(8).hex()
chunks: List[bytes] = []
for field_name, filename, content_type, data in parts:
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
chunks.append(
(f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode("ascii")
)
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
chunks.append(data)
chunks.append(b"\r\n")
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
body = b"".join(chunks)
content_type_hdr = f"multipart/form-data; boundary={boundary}"
return body, content_type_hdr
def _import_endpoint(sfm_url: str) -> str:
"""Compose the import endpoint URL from a base SFM URL."""
return sfm_url.rstrip("/") + "/db/import/blastware_file"
def forward_event_pair(
sfm_url: str,
binary_path: str,
txt_path: Optional[str],
*,
serial_hint: Optional[str] = None,
timeout: float = DEFAULT_HTTP_TIMEOUT,
) -> Dict[str, Any]:
"""POST a single event (binary + optional .TXT) to the SFM import
endpoint.
Returns a dict mirroring the per-file outcome the server returned
(see /db/import/blastware_file response.results[0]) on success, or
a dict with `status="error"` on transport/HTTP failure.
"""
binary_name = os.path.basename(binary_path)
with open(binary_path, "rb") as f:
binary_bytes = f.read()
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
if txt_path is not None:
with open(txt_path, "rb") as f:
txt_bytes = f.read()
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
body, content_type = _encode_multipart(parts)
url = _import_endpoint(sfm_url)
if serial_hint:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}serial={serial_hint}"
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Content-Type": content_type,
"Content-Length": str(len(body)),
"User-Agent": "series3-watcher/sfm-forwarder",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return {
"status": "error",
"filename": binary_name,
"detail": f"server returned non-JSON: {raw[:200]!r}",
}
# Server returns {"count":N, "results":[{...}]}. Pull our row out.
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name and entry.get("status") == "ok":
return entry
# No matching ok row → propagate the first error we find
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name:
return entry
return {
"status": "error",
"filename": binary_name,
"detail": f"unexpected server response: {payload!r}",
}
except urllib.error.HTTPError as exc:
try:
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
except Exception:
body_excerpt = ""
return {
"status": "error",
"filename": binary_name,
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
}
except urllib.error.URLError as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"connection error: {exc.reason}",
}
except (OSError, TimeoutError) as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"transport error: {exc}",
}
# ── Top-level orchestration ───────────────────────────────────────────────────
def forward_pending(
watch_dir: str,
sfm_url: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
timeout: float = DEFAULT_HTTP_TIMEOUT,
max_per_pass: int = 0,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""
Run one full pass: find pending events, POST each one, update state.
Returns a counts dict suitable for logging:
{
"scanned": <int>, # total event binaries seen
"forwarded": <int>, # successfully POSTed this pass
"errors": <int>, # POST failures (will retry next pass)
"with_report":<int>, # of forwarded, how many had a paired TXT
}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
pending = find_pending_events(
watch_dir, state,
max_age_days=max_age_days,
quiescence_seconds=quiescence_seconds,
missing_report_grace_seconds=missing_report_grace_seconds,
max_per_pass=max_per_pass,
)
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
for binary_path, txt_path in pending:
result = forward_event_pair(
sfm_url, binary_path, txt_path,
timeout=timeout,
)
if result.get("status") == "ok":
try:
digest = sha256_of_file(binary_path)
size = os.path.getsize(binary_path)
# Record whether this forward shipped a paired TXT.
# Forwards without a TXT are flagged had_report=False so
# subsequent scans re-check whether the TXT has since
# appeared and trigger a re-forward (the SFM server's
# upsert path refreshes the DB row with the report's
# authoritative values).
state.mark_forwarded(
digest,
os.path.basename(binary_path),
size,
had_report=(txt_path is not None),
)
except OSError as exc:
_log(f"[forward] post-success state save failed for "
f"{os.path.basename(binary_path)}: {exc}")
counts["forwarded"] += 1
if txt_path:
counts["with_report"] += 1
# Differentiate three cases in the log so "no report" is only
# noisy when something's actually unexpected:
# - waveform + TXT → "+ <txt> attached"
# - waveform without TXT → "no report ⚠" (BW maybe didn't auto-export)
# - histogram (any flavour) → "(histogram, no report expected)"
if txt_path:
report_token = "+ {} attached".format(os.path.basename(txt_path))
elif is_histogram_event(binary_path):
report_token = "(histogram, no report expected)"
else:
report_token = "no report ⚠"
_log(
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
os.path.basename(binary_path),
result.get("filesize", 0),
report_token,
result.get("inserted", 0),
result.get("skipped", 0),
)
)
else:
counts["errors"] += 1
_log(
f"[forward] ERR {os.path.basename(binary_path)}: "
f"{result.get('detail', 'unknown error')}"
)
return counts
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
def seed_state_from_folder(
watch_dir: str,
state: ForwardState,
*,
max_age_days: int = 365,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""Walk `watch_dir` and mark every existing event binary as already
forwarded — without POSTing anything.
This is the right tool for a first deploy on a machine that already
has tens or hundreds of thousands of historical events in the BW
ACH folder. Run it ONCE before enabling SFM_FORWARD_ENABLED:
python event_forwarder.py --seed-state \
--watch "C:\\Blastware 10\\Event\\autocall home" \
--state "C:\\...\\sfm_forwarded.json" \
[--max-age-days 365]
The watcher then starts forwarding only events that appear AFTER
the seed run. Files older than `max_age_days` are still skipped
by the regular scan loop — we don't bother seeding them because
they wouldn't be forwarded anyway.
Returns a counts dict:
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
if not os.path.isdir(watch_dir):
_log(f"[seed] watch dir not found: {watch_dir}")
return counts
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
try:
with os.scandir(watch_dir) as it:
entries = [e for e in it if e.is_file()]
except OSError as exc:
_log(f"[seed] scandir failed on {watch_dir}: {exc}")
return counts
for e in entries:
if not is_event_binary(e.path):
continue
counts["scanned"] += 1
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
counts["skipped_too_old"] += 1
continue
try:
digest = sha256_of_file(e.path)
except OSError as exc:
_log(f"[seed] sha256 failed for {e.path}: {exc}")
continue
if state.is_forwarded(digest):
counts["already_known"] += 1
continue
state.mark_forwarded(digest, e.name, size)
counts["seeded"] += 1
if counts["seeded"] % 1000 == 0:
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
_log(
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
f"already_known={counts['already_known']} "
f"skipped_too_old={counts['skipped_too_old']}"
)
return counts
# ── CLI entry point ─────────────────────────────────────────────────────────
def _main() -> int:
"""Command-line interface for one-shot operations.
Currently supports a single mode:
python event_forwarder.py --seed-state \
--watch "<path/to/BW autocall folder>" \
--state "<path/to/sfm_forwarded.json>" \
[--max-age-days 365]
which marks every existing in-window event binary as already
forwarded (without POSTing) so the watcher only forwards events
appearing AFTER the seed.
"""
import argparse
parser = argparse.ArgumentParser(
description="Series 3 Watcher — SFM event forwarder utilities",
)
parser.add_argument(
"--seed-state", action="store_true",
help="Mark every event binary in --watch as already-forwarded "
"(without POSTing). Use this BEFORE enabling SFM_FORWARD "
"on a machine with a large historical archive.",
)
parser.add_argument(
"--watch", required=True,
help="Path to the Blastware ACH folder.",
)
parser.add_argument(
"--state", required=True,
help="Path to the JSON state file. Will be created if missing.",
)
parser.add_argument(
"--max-age-days", type=int, default=365,
help="Only seed files newer than this many days (default 365).",
)
args = parser.parse_args()
if not args.seed_state:
parser.error("specify --seed-state (no other modes supported yet)")
print(f"[seed] watch_dir = {args.watch}")
print(f"[seed] state = {args.state}")
print(f"[seed] max_age = {args.max_age_days} days")
state = ForwardState(args.state)
print(f"[seed] state currently has {state.count()} entries")
seed_state_from_folder(
args.watch, state,
max_age_days=args.max_age_days,
logger=lambda m: print(m),
)
print(f"[seed] state now has {state.count()} entries")
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())
+1 -1
View File
@@ -3,7 +3,7 @@
[Setup]
AppName=Series 3 Watcher
AppVersion=1.4.2
AppVersion=1.5.0
AppPublisher=Terra-Mechanics Inc.
DefaultDirName={pf}\Series3Watcher
DefaultGroupName=Series 3 Watcher
+141 -15
View File
@@ -1,5 +1,5 @@
"""
Series 3 Watcher — System Tray Launcher v1.4.2
Series 3 Watcher — System Tray Launcher v1.5.0
Requires: pystray, Pillow, tkinter (stdlib)
Run with: pythonw series3_tray.py (no console window)
@@ -16,6 +16,7 @@ import sys
import subprocess
import tempfile
import threading
import configparser
import urllib.request
import urllib.error
from datetime import datetime
@@ -52,11 +53,24 @@ def _version_tuple(v):
return tuple(parts)
def check_for_update():
"""
Query Gitea for the latest release.
Returns (tag, download_url) if an update is available, else (None, None).
"""
def _update_log(msg):
"""Append a timestamped line to the watcher log for update events."""
try:
log_path = os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "",
"Series3Watcher", "agent_logs", "series3_watcher.log"
)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a") as f:
f.write("[{}] [updater] {}\n".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), msg
))
except Exception:
pass
def _check_for_update_gitea():
"""Query Gitea API for latest release. Returns (tag, download_url) or (None, None)."""
import json as _json
try:
req = urllib.request.Request(
@@ -71,21 +85,78 @@ def check_for_update():
tag = latest.get("tag_name", "")
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
# Find the .exe asset
assets = latest.get("assets", [])
for asset in assets:
name = asset.get("name", "")
if name.lower().endswith(".exe"):
name = asset.get("name", "").lower()
if name.endswith(".exe") and "setup" not in name:
return tag, asset.get("browser_download_url")
_update_log("Newer release {} found but no valid .exe asset".format(tag))
return tag, None
except Exception:
except Exception as e:
_update_log("check_for_update (gitea) failed: {}".format(e))
return None, None
def _check_for_update_url(base_url):
"""Query a custom URL server for latest version. Returns (tag, download_url) or (None, None)."""
if not base_url:
_update_log("UPDATE_SOURCE=url but UPDATE_URL is empty — skipping")
return None, None
try:
ver_url = base_url.rstrip("/") + "/api/updates/series3-watcher/version.txt"
req = urllib.request.Request(
ver_url,
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=8) as resp:
tag = resp.read().decode("utf-8").strip()
if not tag:
return None, None
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
exe_url = base_url.rstrip("/") + "/api/updates/series3-watcher/series3-watcher.exe"
return tag, exe_url
except Exception as e:
_update_log("check_for_update (url mode) failed: {}".format(e))
return None, None
def check_for_update():
"""
Check for an update using the configured source (gitea, url, or disabled).
Reads UPDATE_SOURCE and UPDATE_URL from config.ini at check time.
Returns (tag, download_url) if an update is available, else (None, None).
Returns (None, None) immediately if UPDATE_SOURCE = disabled.
"""
try:
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str
cp.read(CONFIG_PATH, encoding="utf-8")
section = cp["agent"] if cp.has_section("agent") else {}
update_source = section.get("UPDATE_SOURCE", "gitea").strip().lower()
update_url = section.get("UPDATE_URL", "").strip()
except Exception:
update_source = "gitea"
update_url = ""
if update_source == "disabled":
return None, None
_update_log("Checking for update (source={}, version={})".format(
update_source, _CURRENT_VERSION
))
if update_source == "url":
return _check_for_update_url(update_url)
else:
return _check_for_update_gitea()
def apply_update(download_url):
"""
Download new .exe to a temp file, write a swap .bat, launch it, exit.
The bat waits for us to exit, then swaps the files and relaunches.
Download new .exe to a temp file, validate it, write a swap .bat, launch it, exit.
The bat backs up the old exe, retries the copy up to 5 times if locked, then relaunches.
The .exe.old backup is left in place as a rollback copy.
"""
exe_path = os.path.abspath(sys.executable if getattr(sys, "frozen", False) else sys.argv[0])
@@ -93,6 +164,8 @@ def apply_update(download_url):
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".exe", prefix="s3w_update_")
os.close(tmp_fd)
_update_log("Downloading update from: {}".format(download_url))
req = urllib.request.Request(
download_url,
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
@@ -101,26 +174,79 @@ def apply_update(download_url):
with open(tmp_path, "wb") as f:
f.write(resp.read())
# Three-layer validation before touching the live exe
try:
dl_size = os.path.getsize(tmp_path)
current_size = os.path.getsize(exe_path)
_update_log("Download complete ({} bytes), validating...".format(dl_size))
if dl_size < 100 * 1024:
_update_log("Validation failed: too small ({} bytes) — aborting".format(dl_size))
os.remove(tmp_path)
return False
if current_size > 0 and dl_size < current_size * 0.5:
_update_log("Validation failed: suspiciously small ({} bytes vs current {} bytes) — aborting".format(
dl_size, current_size
))
os.remove(tmp_path)
return False
with open(tmp_path, "rb") as _f:
magic = _f.read(2)
if magic != b"MZ":
_update_log("Validation failed: not a valid Windows exe (bad magic bytes) — aborting")
os.remove(tmp_path)
return False
_update_log("Validation passed ({} bytes, MZ ok)".format(dl_size))
except Exception as e:
_update_log("Validation error: {} — aborting".format(e))
try:
os.remove(tmp_path)
except Exception:
pass
return False
bat_fd, bat_path = tempfile.mkstemp(suffix=".bat", prefix="s3w_swap_")
os.close(bat_fd)
bat_content = (
"@echo off\r\n"
"ping 127.0.0.1 -n 4 > nul\r\n"
"copy /Y \"{exe}\" \"{exe}.old\"\r\n"
"set RETRIES=0\r\n"
":retry\r\n"
"copy /Y \"{new}\" \"{exe}\"\r\n"
"if errorlevel 1 (\r\n"
" set /a RETRIES+=1\r\n"
" if %RETRIES% GEQ 5 goto fail\r\n"
" ping 127.0.0.1 -n 3 > nul\r\n"
" goto retry\r\n"
")\r\n"
"start \"\" \"{exe}\"\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 0\r\n"
":fail\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 1\r\n"
).format(new=tmp_path, exe=exe_path)
with open(bat_path, "w") as f:
f.write(bat_content)
_update_log("Launching swap bat — exiting for update")
subprocess.Popen(
["cmd", "/C", bat_path],
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0,
)
return True
except Exception:
except Exception as e:
_update_log("apply_update failed: {}".format(e))
return False
@@ -352,7 +478,7 @@ class WatcherTray:
self._do_update()
return # exit loop; swap bat will relaunch
# Periodic Gitea update check (every ~5 min)
# Periodic update check (every ~5 min)
update_check_counter += 1
if update_check_counter >= 30:
update_check_counter = 0
@@ -379,7 +505,7 @@ class WatcherTray:
self.stop_event.set()
if self._icon is not None:
self._icon.stop()
# If update failed, just keep running silently
# If update failed, keep running silently — error is in the log
# --- Entry point ---
+112 -4
View File
@@ -1,5 +1,5 @@
"""
Series 3 Watcher — v1.4.2
Series 3 Watcher — v1.4.3
Environment:
- Python 3.8 (Windows 7 compatible)
@@ -60,8 +60,6 @@ def load_config(path: str) -> Dict[str, Any]:
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
@@ -78,6 +76,40 @@ def load_config(path: str) -> Dict[str, Any]:
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_watcher"),
# Auto-updater source
"UPDATE_SOURCE": get_str("UPDATE_SOURCE", "gitea"),
"UPDATE_URL": get_str("UPDATE_URL", ""),
# SFM event forwarder — when enabled, forwards each Blastware
# event binary (+ paired .TXT report when present) to an SFM
# server's /db/import/blastware_file endpoint. Default-off so
# existing 1.4.x deployments don't change behaviour on
# auto-update; operators flip it on by setting SFM_URL +
# SFM_FORWARD_ENABLED=true in config.ini.
"SFM_FORWARD_ENABLED": get_bool("SFM_FORWARD_ENABLED", False),
"SFM_URL": get_str("SFM_URL", ""),
"SFM_FORWARD_INTERVAL_SECONDS": get_int("SFM_FORWARD_INTERVAL_SECONDS", 60),
# Files modified within the last N seconds are skipped (BW may
# still be writing them).
"SFM_QUIESCENCE_SECONDS": get_int("SFM_QUIESCENCE_SECONDS", 5),
# If a binary's .TXT report hasn't appeared after this many
# seconds, forward the binary alone rather than blocking
# forever.
"SFM_MISSING_REPORT_GRACE_SECONDS": get_int(
"SFM_MISSING_REPORT_GRACE_SECONDS", 60
),
# Per-request HTTP timeout (seconds).
"SFM_HTTP_TIMEOUT": get_int("SFM_HTTP_TIMEOUT", 60),
# State file for forwarded-sha256 idempotency tracking.
# Defaults next to the log file for easy operator access.
"SFM_STATE_FILE": get_str("SFM_STATE_FILE", ""),
# Per-pass cap — forward at most N events per scan tick.
# 0 = unlimited. Default 500 as a safety against accidentally
# backfilling tens of thousands of events in one burst on
# first deploy in a folder that's been accumulating for years.
# See README "First-time deployment" section.
"SFM_MAX_FORWARDS_PER_PASS": get_int("SFM_MAX_FORWARDS_PER_PASS", 500),
}
@@ -215,7 +247,7 @@ def scan_latest(
# --- API heartbeat / SFM telemetry helpers ---
VERSION = "1.4.2"
VERSION = "1.5.0"
def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]:
@@ -364,6 +396,36 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
sniff_cache: Dict[str, Tuple[float, str]] = {}
last_api_ts: float = 0.0
last_forward_ts: float = 0.0
# ---- SFM event-forwarder setup ----
# Default-off; only initialised when both flag and URL are set.
sfm_state = None
if cfg.get("SFM_FORWARD_ENABLED") and cfg.get("SFM_URL"):
try:
from event_forwarder import ForwardState
state_file = cfg.get("SFM_STATE_FILE") or os.path.join(
os.path.dirname(LOG_FILE) or here, "sfm_forwarded.json"
)
sfm_state = ForwardState(state_file)
print(
"[CFG] SFM_FORWARD_ENABLED=true SFM_URL={} state={} ({} already-forwarded)".format(
cfg.get("SFM_URL"), state_file, sfm_state.count(),
)
)
log_message(
LOG_FILE, ENABLE_LOGGING,
"[cfg] sfm forwarder enabled url={} state={} already_forwarded={}".format(
cfg.get("SFM_URL"), state_file, sfm_state.count(),
),
)
except Exception as e:
print("[WARN] SFM forwarder init failed: {}".format(e))
log_message(LOG_FILE, ENABLE_LOGGING,
"[warn] sfm forwarder init failed: {}".format(e))
sfm_state = None
else:
print("[CFG] SFM_FORWARD_ENABLED=false (event forwarding disabled)")
while not stop_event.is_set():
try:
@@ -445,6 +507,52 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
else:
state["api_status"] = "disabled"
# ---- SFM event forwarder ----
# Same scan loop as the heartbeat, but on its own cadence
# (SFM_FORWARD_INTERVAL_SECONDS). Default-off — sfm_state
# is None unless config explicitly enabled it AND supplied
# an SFM_URL.
if sfm_state is not None:
now_ts = time.time()
fwd_interval = int(cfg.get("SFM_FORWARD_INTERVAL_SECONDS", 60))
if now_ts - last_forward_ts >= fwd_interval:
try:
from event_forwarder import forward_pending
counts = forward_pending(
WATCH_PATH,
cfg.get("SFM_URL", ""),
sfm_state,
max_age_days=MAX_EVENT_AGE_DAYS,
quiescence_seconds=int(cfg.get("SFM_QUIESCENCE_SECONDS", 5)),
missing_report_grace_seconds=int(
cfg.get("SFM_MISSING_REPORT_GRACE_SECONDS", 60)
),
timeout=int(cfg.get("SFM_HTTP_TIMEOUT", 60)),
max_per_pass=int(cfg.get("SFM_MAX_FORWARDS_PER_PASS", 500)),
logger=lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
last_forward_ts = now_ts
if counts["scanned"] > 0:
summary = (
"[forward] scanned={} forwarded={} "
"with_report={} errors={}".format(
counts["scanned"], counts["forwarded"],
counts["with_report"], counts["errors"],
)
)
print(summary)
log_message(LOG_FILE, ENABLE_LOGGING, summary)
state["sfm_status"] = "ok" if counts["errors"] == 0 else "errors"
state["last_forward"] = datetime.now()
state["last_forward_counts"] = counts
except Exception as e:
err = "[forward-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
state["sfm_status"] = "fail"
else:
state["sfm_status"] = "disabled"
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
+243 -12
View File
@@ -1,5 +1,5 @@
"""
Series 3 Watcher — Settings Dialog v1.4.2
Series 3 Watcher — Settings Dialog v1.5.0
Provides a Tkinter settings dialog that doubles as a first-run wizard.
@@ -30,8 +30,6 @@ DEFAULTS = {
"SERIES3_PATH": r"C:\Blastware 10\Event\autocall home",
"MAX_EVENT_AGE_DAYS": "365",
"SCAN_INTERVAL_SECONDS":"300",
"OK_HOURS": "12",
"MISSING_HOURS": "24",
"MLG_HEADER_BYTES": "2048",
"ENABLE_LOGGING": "true",
"LOG_FILE": os.path.join(
@@ -39,6 +37,22 @@ DEFAULTS = {
"Series3Watcher", "agent_logs", "series3_watcher.log"
),
"LOG_RETENTION_DAYS": "30",
# Auto-updater
"UPDATE_SOURCE": "gitea",
"UPDATE_URL": "",
# SFM event forwarder (default-off; existing 1.4.x deployments
# don't change behaviour after auto-update until an operator
# opts in by setting SFM_URL + flipping SFM_FORWARD_ENABLED).
"SFM_FORWARD_ENABLED": "false",
"SFM_URL": "",
"SFM_FORWARD_INTERVAL_SECONDS": "60",
"SFM_QUIESCENCE_SECONDS": "5",
"SFM_MISSING_REPORT_GRACE_SECONDS": "60",
"SFM_HTTP_TIMEOUT": "60",
"SFM_STATE_FILE": "",
"SFM_MAX_FORWARDS_PER_PASS": "500",
}
@@ -225,14 +239,27 @@ class SettingsDialog:
# Scanning
self.var_scan_interval = tk.StringVar(value=v["SCAN_INTERVAL_SECONDS"])
self.var_ok_hours = tk.StringVar(value=v["OK_HOURS"])
self.var_missing_hours = tk.StringVar(value=v["MISSING_HOURS"])
self.var_mlg_header_bytes = tk.StringVar(value=v["MLG_HEADER_BYTES"])
# Logging
self.var_enable_logging = tk.BooleanVar(value=v["ENABLE_LOGGING"].lower() in ("1","true","yes","on"))
self.var_log_retention_days = tk.StringVar(value=v["LOG_RETENTION_DAYS"])
# Updates
self.var_update_source = tk.StringVar(value=v["UPDATE_SOURCE"].lower() if v["UPDATE_SOURCE"].lower() in ("gitea", "url", "disabled") else "gitea")
self.var_update_url = tk.StringVar(value=v["UPDATE_URL"])
# SFM event forwarder
self.var_sfm_enabled = tk.BooleanVar(
value=v["SFM_FORWARD_ENABLED"].lower() in ("1", "true", "yes", "on"))
self.var_sfm_url = tk.StringVar(value=v["SFM_URL"])
self.var_sfm_forward_interval = tk.StringVar(value=v["SFM_FORWARD_INTERVAL_SECONDS"])
self.var_sfm_quiescence = tk.StringVar(value=v["SFM_QUIESCENCE_SECONDS"])
self.var_sfm_missing_report_grace = tk.StringVar(value=v["SFM_MISSING_REPORT_GRACE_SECONDS"])
self.var_sfm_http_timeout = tk.StringVar(value=v["SFM_HTTP_TIMEOUT"])
self.var_sfm_state_file = tk.StringVar(value=v["SFM_STATE_FILE"])
self.var_sfm_max_per_pass = tk.StringVar(value=v["SFM_MAX_FORWARDS_PER_PASS"])
# --- UI construction ---
def _build_ui(self):
@@ -259,6 +286,8 @@ class SettingsDialog:
self._build_tab_paths(nb)
self._build_tab_scanning(nb)
self._build_tab_logging(nb)
self._build_tab_updates(nb)
self._build_tab_sfm(nb)
# Buttons
btn_frame = tk.Frame(outer)
@@ -389,15 +418,193 @@ class SettingsDialog:
def _build_tab_scanning(self, nb):
f = self._tab_frame(nb, "Scanning")
_add_label_spinbox(f, 0, "Scan Interval (sec)", self.var_scan_interval, 10, 3600)
_add_label_spinbox(f, 1, "OK Hours", self.var_ok_hours, 1, 168)
_add_label_spinbox(f, 2, "Missing Hours", self.var_missing_hours, 1, 168)
_add_label_spinbox(f, 3, "MLG Header Bytes", self.var_mlg_header_bytes, 256, 65536)
_add_label_spinbox(f, 1, "MLG Header Bytes", self.var_mlg_header_bytes, 256, 65536)
def _build_tab_logging(self, nb):
f = self._tab_frame(nb, "Logging")
_add_label_check(f, 0, "Enable Logging", self.var_enable_logging)
_add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365)
def _build_tab_updates(self, nb):
f = self._tab_frame(nb, "Updates")
tk.Label(
f,
text="Auto-Update Source",
anchor="w",
).grid(row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 2))
radio_frame = tk.Frame(f)
radio_frame.grid(row=0, column=1, sticky="w", padx=(0, 8), pady=(8, 2))
rb_gitea = ttk.Radiobutton(
radio_frame, text="Gitea (default)",
variable=self.var_update_source, value="gitea",
command=self._on_update_source_change,
)
rb_gitea.grid(row=0, column=0, sticky="w", padx=(0, 12))
rb_url = ttk.Radiobutton(
radio_frame, text="Custom URL",
variable=self.var_update_source, value="url",
command=self._on_update_source_change,
)
rb_url.grid(row=0, column=1, sticky="w", padx=(0, 12))
rb_disabled = ttk.Radiobutton(
radio_frame, text="Disabled",
variable=self.var_update_source, value="disabled",
command=self._on_update_source_change,
)
rb_disabled.grid(row=0, column=2, sticky="w")
tk.Label(f, text="Update Server URL", anchor="w").grid(
row=1, column=0, sticky="w", padx=(8, 4), pady=4
)
self._update_url_entry = ttk.Entry(f, textvariable=self.var_update_url, width=42)
self._update_url_entry.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
hint_text = (
"Gitea: checks the Gitea release page automatically every 5 minutes.\n"
"Custom URL: fetches version.txt and series3-watcher.exe from a web\n"
"server — use when Gitea is not reachable (e.g. terra-view URL).\n"
"Disabled: no automatic update checks. Remote push from terra-view\n"
"still works when disabled."
)
tk.Label(f, text=hint_text, justify="left", fg="#555555",
wraplength=380).grid(
row=2, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(4, 8)
)
# Set initial state of URL entry
self._on_update_source_change()
def _on_update_source_change(self):
"""Enable/disable the URL entry based on selected update source."""
if self.var_update_source.get() == "url":
self._update_url_entry.config(state="normal")
else:
self._update_url_entry.config(state="disabled")
# ──────────────────────────────────────────────────────────────────
# SFM Forward tab
# ──────────────────────────────────────────────────────────────────
def _build_tab_sfm(self, nb):
"""Configure the SFM event forwarder.
When enabled, every Blastware event binary in the watch folder
(plus its paired .TXT report when present) is POSTed to an SFM
server's /db/import/blastware_file endpoint. Default-off so
existing 1.4.x deployments don't change behaviour after an
auto-update — operator opts in by setting the URL and flipping
the checkbox.
"""
f = self._tab_frame(nb, "SFM Forward")
_add_label_check(f, 0, "Forward events to SFM", self.var_sfm_enabled)
# SFM URL row — entry + Test button (mirrors the Connection tab's pattern)
tk.Label(f, text="SFM Server URL", anchor="w").grid(
row=1, column=0, sticky="w", padx=(8, 4), pady=4
)
url_frame = tk.Frame(f)
url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
url_frame.columnconfigure(0, weight=1)
sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32)
sfm_entry.grid(row=0, column=0, sticky="ew")
self._sfm_test_btn = ttk.Button(
url_frame, text="Test", width=6, command=self._test_sfm_connection,
)
self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0))
self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20)
self._sfm_test_status.grid(row=0, column=2, padx=(6, 0))
_add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 5, 3600)
_add_label_spinbox(f, 3, "Max Events Per Pass", self.var_sfm_max_per_pass, 0, 100000)
_add_label_spinbox(f, 4, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60)
_add_label_spinbox(f, 5, "Missing-Report Grace (sec)", self.var_sfm_missing_report_grace, 0, 600)
_add_label_spinbox(f, 6, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 600)
tk.Label(f, text="State File", anchor="w").grid(
row=7, column=0, sticky="w", padx=(8, 4), pady=4
)
state_frame = tk.Frame(f)
state_frame.grid(row=7, column=1, sticky="ew", padx=(0, 8), pady=4)
state_frame.columnconfigure(0, weight=1)
state_entry = ttk.Entry(state_frame, textvariable=self.var_sfm_state_file, width=32)
state_entry.grid(row=0, column=0, sticky="ew")
def _browse_state():
path = filedialog.asksaveasfilename(
title="SFM forward-state file",
defaultextension=".json",
filetypes=[("JSON", "*.json"), ("All Files", "*.*")],
initialfile="sfm_forwarded.json",
)
if path:
self.var_sfm_state_file.set(path)
ttk.Button(state_frame, text="Browse...", width=10, command=_browse_state).grid(
row=0, column=1, padx=(4, 0)
)
hint_text = (
"Forwards every Blastware event binary (and its paired .TXT report)\n"
"to an SFM server, where the report is parsed for searchable\n"
"per-channel stats: PPV, ZC Freq, Time of Peak, Peak Acceleration,\n"
"Peak Displacement, sensor self-check, monitor log.\n\n"
"Idempotent: forwarded files are tracked by sha256 in the state\n"
"file; restarts and re-scans never re-POST. Leave State File blank\n"
"to default to <log dir>/sfm_forwarded.json."
)
tk.Label(f, text=hint_text, justify="left", fg="#555555", wraplength=380).grid(
row=8, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4)
)
def _test_sfm_connection(self):
"""GET <sfm_url>/health and show the result."""
import urllib.request
import urllib.error
self._sfm_test_status.config(text="Testing...", foreground="grey")
self._sfm_test_btn.config(state="disabled")
self.root.update_idletasks()
raw = self.var_sfm_url.get().strip()
if not raw:
self._sfm_test_status.config(text="Enter a URL first", foreground="orange")
self._sfm_test_btn.config(state="normal")
return
url = raw.rstrip("/") + "/health"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
self._sfm_test_status.config(text="Connected!", foreground="green")
else:
self._sfm_test_status.config(
text="HTTP {}".format(resp.status), foreground="orange",
)
except urllib.error.URLError as e:
reason = str(e.reason) if hasattr(e, "reason") else str(e)
self._sfm_test_status.config(
text="Failed: {}".format(reason[:30]), foreground="red",
)
except Exception as e:
self._sfm_test_status.config(
text="Error: {}".format(str(e)[:30]), foreground="red",
)
finally:
self._sfm_test_btn.config(state="normal")
# --- Validation helpers ---
def _get_int_var(self, var, name, min_val, max_val, default):
@@ -427,10 +634,13 @@ class SettingsDialog:
(self.var_api_interval, "API Interval", 30, 3600, 300),
(self.var_max_event_age_days, "Max Event Age Days", 1, 3650, 365),
(self.var_scan_interval, "Scan Interval", 10, 3600, 300),
(self.var_ok_hours, "OK Hours", 1, 168, 12),
(self.var_missing_hours, "Missing Hours", 1, 168, 24),
(self.var_mlg_header_bytes, "MLG Header Bytes", 256, 65536, 2048),
(self.var_log_retention_days, "Log Retention Days", 1, 365, 30),
(self.var_sfm_forward_interval, "SFM Forward Interval", 5, 3600, 60),
(self.var_sfm_quiescence, "SFM Quiescence", 1, 60, 5),
(self.var_sfm_missing_report_grace, "SFM Missing-Report Grace", 0, 600, 60),
(self.var_sfm_http_timeout, "SFM HTTP Timeout", 5, 600, 60),
(self.var_sfm_max_per_pass, "SFM Max Events Per Pass", 0, 100000, 500),
]
int_values = {}
for var, name, mn, mx, dflt in checks:
@@ -439,6 +649,17 @@ class SettingsDialog:
return # validation failed; keep dialog open
int_values[name] = result
# SFM forwarding requires a URL when enabled — common foot-gun
# to flip the checkbox without filling in the field.
if self.var_sfm_enabled.get() and not self.var_sfm_url.get().strip():
messagebox.showerror(
"Validation Error",
"SFM Forward is enabled but the SFM Server URL field is empty.\n\n"
"Either set the URL (e.g. http://10.0.0.44:8200) or uncheck "
"'Forward events to SFM'.",
)
return
# Resolve source_id placeholder
source_id = self.var_source_id.get().strip()
# Strip placeholder hint if user left it
@@ -461,12 +682,22 @@ class SettingsDialog:
"SERIES3_PATH": self.var_series3_path.get().strip(),
"MAX_EVENT_AGE_DAYS": str(int_values["Max Event Age Days"]),
"SCAN_INTERVAL_SECONDS":str(int_values["Scan Interval"]),
"OK_HOURS": str(int_values["OK Hours"]),
"MISSING_HOURS": str(int_values["Missing Hours"]),
"MLG_HEADER_BYTES": str(int_values["MLG Header Bytes"]),
"ENABLE_LOGGING": "true" if self.var_enable_logging.get() else "false",
"LOG_FILE": self.var_log_file.get().strip(),
"LOG_RETENTION_DAYS": str(int_values["Log Retention Days"]),
"UPDATE_SOURCE": self.var_update_source.get().strip() or "gitea",
"UPDATE_URL": self.var_update_url.get().strip(),
# SFM event forwarder
"SFM_FORWARD_ENABLED": "true" if self.var_sfm_enabled.get() else "false",
"SFM_URL": self.var_sfm_url.get().strip().rstrip("/"),
"SFM_FORWARD_INTERVAL_SECONDS": str(int_values["SFM Forward Interval"]),
"SFM_QUIESCENCE_SECONDS": str(int_values["SFM Quiescence"]),
"SFM_MISSING_REPORT_GRACE_SECONDS": str(int_values["SFM Missing-Report Grace"]),
"SFM_HTTP_TIMEOUT": str(int_values["SFM HTTP Timeout"]),
"SFM_STATE_FILE": self.var_sfm_state_file.get().strip(),
"SFM_MAX_FORWARDS_PER_PASS": str(int_values["SFM Max Events Per Pass"]),
}
try:
+718
View File
@@ -0,0 +1,718 @@
"""
test_event_forwarder.py — unit tests for the SFM event forwarder.
Covers:
- is_event_binary() filename matching (positive + negative cases)
- ForwardState load/save round-trip + idempotency check
- find_pending_events() pairing + quiescence + grace-period logic
- _encode_multipart() byte-level shape (boundary + headers)
- forward_event_pair() end-to-end against a tiny stdlib HTTP server
that mimics the SFM /db/import/blastware_file endpoint
Stdlib only — runs with `python -m pytest test_event_forwarder.py`
on Python 3.8+ (the watcher's compat target).
"""
from __future__ import annotations
import http.server
import json
import os
import socket
import tempfile
import threading
import time
import unittest
from pathlib import Path
import event_forwarder as ef
# ── is_event_binary() ────────────────────────────────────────────────────────
class TestIsEventBinary(unittest.TestCase):
def test_recognises_typical_blastware_filenames(self):
for name in [
"M529LK44.AB0",
"M529LKVQ.6S0",
"M529LKVQ.6S0W",
"S353L4H0.3M0W",
"P036L318.C80H",
"M529LIY6.N00",
]:
self.assertTrue(ef.is_event_binary(name), name)
def test_rejects_lowercase_extensions_we_explicitly_exclude(self):
for name in ["BE11529.MLG", "M529LK44.AB0.TXT", "agent.log",
"config.ini", "foo.bak", "bar.tmp",
"something.h5", "noise.json"]:
self.assertFalse(ef.is_event_binary(name), name)
def test_ach_report_name(self):
"""BW ACH convention: <stem>.<ext> → <stem>_<ext>_ASCII.TXT"""
cases = [
("M529LK44.AB0", "M529LK44_AB0_ASCII.TXT"),
("N844L20G.630H", "N844L20G_630H_ASCII.TXT"),
("I145L64P.GD0W", "I145L64P_GD0W_ASCII.TXT"),
("H907L1R7.PG0H", "H907L1R7_PG0H_ASCII.TXT"),
]
for binary, expected in cases:
self.assertEqual(ef.ach_report_name(binary), expected, binary)
def test_legacy_report_name(self):
"""Manual-export convention: <binary>.TXT"""
self.assertEqual(ef.legacy_report_name("M529LK44.AB0"),
"M529LK44.AB0.TXT")
def test_is_histogram_event(self):
# 4-char extension ending in H = histogram
for name in ["H907L1R7.PG0H", "S353L4H0.8S0H", "P036L318.C80H"]:
self.assertTrue(ef.is_histogram_event(name), name)
# 4-char extension ending in W = waveform
for name in ["S353L4H0.3M0W", "M529LKVQ.6S0W", "P036L318.C80W"]:
self.assertFalse(ef.is_histogram_event(name), name)
# 3-char old-firmware extensions can't be classified — return False
for name in ["M529LK44.AB0", "M529LIY6.N00", "M529LJ8V.490"]:
self.assertFalse(ef.is_histogram_event(name), name)
def test_rejects_non_matching_filenames(self):
for name in ["", "no_extension",
"TooShort.AB0", # stem must be 8 chars
"TOOLONG12345.AB0", # stem must be 8 chars
"M529LK44.A", # ext too short
"M529LK44.ABCDE", # ext too long
"M52.AB0", # stem too short
"1234ABCD.AB0"]: # first char must be letter
self.assertFalse(ef.is_event_binary(name), name)
# ── ForwardState ─────────────────────────────────────────────────────────────
class TestForwardState(unittest.TestCase):
def test_round_trip_persists_marked_entries(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
s = ef.ForwardState(path)
self.assertFalse(s.is_forwarded("abc123"))
s.mark_forwarded("abc123", "M529LK44.AB0", 4400)
self.assertTrue(s.is_forwarded("abc123"))
# Re-load from disk
s2 = ef.ForwardState(path)
self.assertTrue(s2.is_forwarded("abc123"))
self.assertEqual(s2.count(), 1)
def test_corrupt_state_file_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
f.write("not valid json {{{")
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
def test_version_mismatch_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
json.dump({"version": 999, "forwarded": {"x": {}}}, f)
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
# ── find_pending_events() ────────────────────────────────────────────────────
class TestFindPendingEvents(unittest.TestCase):
def _make(self, dir_path: Path, name: str, age_seconds: float = 100,
content: bytes = b"x") -> Path:
"""Create a file with controlled mtime."""
p = dir_path / name
p.write_bytes(content)
# Set mtime to simulate age
target = time.time() - age_seconds
os.utime(p, (target, target))
return p
def test_returns_pair_when_both_files_present_and_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
txt_p = self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0")
self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT")
def test_pairs_with_ach_underscore_ascii_naming(self):
"""BW ACH writes M529LK44.AB0 + M529LK44_AB0_ASCII.TXT. The
watcher must pair these even though the .TXT filename doesn't
carry a literal copy of the binary's name."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "N844L20G.630H", age_seconds=120, content=b"binary")
self._make(tmp_p, "N844L20G_630H_ASCII.TXT", age_seconds=100, content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][0]), "N844L20G.630H")
self.assertEqual(os.path.basename(pending[0][1]),
"N844L20G_630H_ASCII.TXT")
def test_pairs_with_ach_underscore_ascii_naming_for_waveform(self):
"""Same as above but for new-firmware waveform events
(extension ends in W)."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "I145L64P.GD0W", age_seconds=120, content=b"binary")
self._make(tmp_p, "I145L64P_GD0W_ASCII.TXT", age_seconds=100, content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][1]),
"I145L64P_GD0W_ASCII.TXT")
def test_pairing_prefers_ach_naming_when_both_exist(self):
"""If a folder has BOTH conventions (operator manually exported
AND ACH also auto-exported), ACH wins because that's the
canonical name in modern BW deployments."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
# Both partner files present
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual")
self._make(tmp_p, "M529LK44_AB0_ASCII.TXT", age_seconds=100, content=b"ach")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][1]),
"M529LK44_AB0_ASCII.TXT")
def test_pairing_falls_back_to_dot_txt_when_ach_absent(self):
"""If only the manual-export filename exists, the legacy
convention still works (preserves codec-agent test fixtures)."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT")
def test_skips_if_already_forwarded(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_if_too_fresh_to_be_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK44.AB0", age_seconds=1, content=b"binary")
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=1, content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_forwards_alone_after_grace_when_txt_missing(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"binary")
# No .TXT created.
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
bin_path, txt_path = pending[0]
self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0")
self.assertIsNone(txt_path)
def test_re_pair_after_late_arriving_txt(self):
"""If we forwarded the binary alone (TXT was late) and the TXT
later appears, the binary becomes eligible for re-forward."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = self._make(tmp_p, "M529LK44.AB0",
age_seconds=200, content=b"binary")
# Mark as already-forwarded WITHOUT a paired report (the
# state we'd be in after a TXT-too-late forward).
state = ef.ForwardState(str(tmp_p / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"),
had_report=False)
# First scan: TXT not present yet → still skipped.
pending = ef.find_pending_events(
str(tmp_p), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(pending, [],
"no TXT present → no re-pair attempt")
# Now BW finally writes the TXT.
self._make(tmp_p, "M529LK44.AB0.TXT",
age_seconds=100, content=b"report")
pending = ef.find_pending_events(
str(tmp_p), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1,
"TXT now present → re-pair attempt expected")
self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0")
self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT")
def test_re_pair_not_attempted_when_already_had_report(self):
"""Successful WITH-report forwards stay permanently skipped.
Adding more files later does NOT trigger a re-forward."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"x")
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"r")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
state.mark_forwarded(ef.sha256_of_file(str(bin_p)),
"M529LK44.AB0", 1, had_report=True)
pending = ef.find_pending_events(
str(tmp_p), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(pending, [],
"had_report=True forwards stay skipped")
def test_legacy_state_entries_default_to_had_report_true(self):
"""Backward compat: state-file entries from before the
had_report field existed are treated as fully forwarded so
an upgrade doesn't re-forward every entry."""
import json
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
path = str(tmp_p / "fwd.json")
with open(path, "w") as f:
json.dump({
"version": 1,
"forwarded": {
"abc123": {
"filename": "M529LK01.AB0",
"size": 123,
"forwarded_at": "2025-01-01T00:00:00Z",
# No had_report field — legacy entry
}
}
}, f)
state = ef.ForwardState(path)
self.assertIs(state.status("abc123"), True,
"legacy entry must default to 'fully forwarded'")
def test_state_status_returns_none_for_unknown_sha(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
self.assertIs(state.status("never-seen"), None)
def test_state_mark_with_had_report_false(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
state.mark_forwarded("xyz", "f.AB0", 100, had_report=False)
self.assertIs(state.status("xyz"), False)
# Subsequent re-mark with had_report=True promotes to done.
state.mark_forwarded("xyz", "f.AB0", 100, had_report=True)
self.assertIs(state.status("xyz"), True)
def test_defers_when_txt_missing_and_within_grace(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK44.AB0", age_seconds=15, content=b"binary")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_old_files_beyond_max_age_days(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
# 10 days old, but max_age_days=1 → should be excluded
self._make(tmp_p, "M529LK44.AB0", age_seconds=10 * 86400,
content=b"binary")
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=10 * 86400,
content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=1,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_ignores_mlg_and_other_non_event_files(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg")
self._make(tmp_p, "agent.log", age_seconds=120, content=b"log")
self._make(tmp_p, "config.ini", age_seconds=120, content=b"cfg")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_max_per_pass_caps_returned_count(self):
"""When max_per_pass is set, return at most that many pairs."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
# Create 5 distinct event binaries with paired .TXTs
for i, name in enumerate(
["M529LK01.AB0", "M529LK02.AB0", "M529LK03.AB0",
"M529LK04.AB0", "M529LK05.AB0"],
):
self._make(tmp_p, name, age_seconds=120 + i,
content=("bin-" + str(i)).encode())
self._make(tmp_p, name + ".TXT", age_seconds=110 + i,
content=b"report")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
max_per_pass=2,
)
self.assertEqual(len(pending), 2)
def test_max_per_pass_zero_means_unlimited(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
for i in range(4):
self._make(tmp_p, "M529LK0{}.AB0".format(i),
age_seconds=120 + i,
content=("bin-" + str(i)).encode())
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30,
quiescence_seconds=5,
missing_report_grace_seconds=60,
max_per_pass=0,
)
self.assertEqual(len(pending), 4)
def test_max_per_pass_returns_oldest_first(self):
"""Backfill should advance chronologically — oldest qualifying
files first. This way successive scans always make progress
instead of getting stuck re-considering the same N newest files."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
# ages: 200s (oldest), 150s, 100s, 50s (skipped — within grace)
ages = [200, 150, 100, 50]
for i, age in enumerate(ages):
self._make(tmp_p, "M529LK0{}.AB0".format(i),
age_seconds=age, content=("c" + str(i)).encode())
self._make(tmp_p, "M529LK0{}.AB0.TXT".format(i),
age_seconds=age - 10, content=b"r")
state = ef.ForwardState(str(tmp_p / "fwd.json"))
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60, max_per_pass=2,
)
# Oldest two should be M529LK00 (200s) and M529LK01 (150s)
names = [os.path.basename(p[0]) for p in pending]
self.assertEqual(names, ["M529LK00.AB0", "M529LK01.AB0"])
# ── Seed-state mode ──────────────────────────────────────────────────────────
class TestSeedStateFromFolder(unittest.TestCase):
def _make(self, dir_path: Path, name: str, age_seconds: float = 100,
content: bytes = b"x") -> Path:
p = dir_path / name
p.write_bytes(content)
target = time.time() - age_seconds
os.utime(p, (target, target))
return p
def test_seeds_every_in_window_event_without_posting(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
for i in range(3):
self._make(tmp_p, "M529LK0{}.AB0".format(i),
age_seconds=120 + i, content=("e" + str(i)).encode())
# Plus a non-event file we should ignore
self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg")
state = ef.ForwardState(str(tmp_p / "seed.json"))
counts = ef.seed_state_from_folder(
str(tmp_p), state, max_age_days=30,
)
self.assertEqual(counts["scanned"], 3)
self.assertEqual(counts["seeded"], 3)
self.assertEqual(counts["already_known"], 0)
self.assertEqual(state.count(), 3)
def test_seed_skips_files_beyond_max_age_days(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"new")
self._make(tmp_p, "M529LK02.AB0", age_seconds=10 * 86400,
content=b"in-window") # 10d < 30d cutoff
self._make(tmp_p, "M529LK03.AB0", age_seconds=400 * 86400,
content=b"way-old") # 400d > 30d cutoff
state = ef.ForwardState(str(tmp_p / "seed.json"))
counts = ef.seed_state_from_folder(
str(tmp_p), state, max_age_days=30,
)
self.assertEqual(counts["seeded"], 2)
self.assertEqual(counts["skipped_too_old"], 1)
def test_seeded_files_are_then_skipped_by_normal_scan(self):
"""End-to-end: seed once, then a normal scan should produce
zero pending events for the seeded files."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x")
self._make(tmp_p, "M529LK01.AB0.TXT", age_seconds=110, content=b"r")
self._make(tmp_p, "M529LK02.AB0", age_seconds=120, content=b"y")
self._make(tmp_p, "M529LK02.AB0.TXT", age_seconds=110, content=b"r")
state = ef.ForwardState(str(tmp_p / "seed.json"))
ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
pending = ef.find_pending_events(
str(tmp_p), state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0,
"seed should have marked everything already-forwarded")
def test_seed_is_idempotent(self):
"""Re-running seed twice doesn't duplicate entries or POST anything."""
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x")
state = ef.ForwardState(str(tmp_p / "seed.json"))
counts1 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
counts2 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
self.assertEqual(counts1["seeded"], 1)
self.assertEqual(counts2["seeded"], 0)
self.assertEqual(counts2["already_known"], 1)
self.assertEqual(state.count(), 1)
# ── Multipart encoder ────────────────────────────────────────────────────────
class TestMultipartEncoder(unittest.TestCase):
def test_encodes_two_parts_with_proper_boundary(self):
body, content_type = ef._encode_multipart([
("files", "a.bin", "application/octet-stream", b"\x01\x02"),
("files", "a.txt", "text/plain", b"hello"),
])
# Content-Type header carries the boundary
self.assertTrue(content_type.startswith("multipart/form-data; boundary="))
boundary = content_type.split("boundary=", 1)[1]
self.assertIn(boundary.encode("ascii"), body)
# Body shape
text = body.decode("latin-1")
self.assertIn(f'name="files"; filename="a.bin"', text)
self.assertIn(f'name="files"; filename="a.txt"', text)
self.assertIn("Content-Type: application/octet-stream", text)
self.assertIn("Content-Type: text/plain", text)
# Trailing close boundary present
self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--"))
# ── End-to-end forward_event_pair against a fake server ──────────────────────
class _FakeImportHandler(http.server.BaseHTTPRequestHandler):
"""Mimics seismo-relay's POST /db/import/blastware_file response."""
received = [] # class-level capture for test inspection
def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
ctype = self.headers.get("Content-Type", "")
# Crude multipart split — enough to count parts and grab filenames.
parts = body.split(b"--" + ctype.split("boundary=")[-1].encode())
# Locate filename= occurrences — that's our part count
filenames = []
for p in parts:
for line in p.split(b"\r\n"):
if b'filename="' in line:
fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0]
filenames.append(fn.decode("latin-1"))
self.__class__.received.append({
"path": self.path,
"ctype": ctype,
"filenames": filenames,
})
# Build a faux SFM response: success for the first .bin-style filename
results = []
binary_fn = next(
(fn for fn in filenames if not fn.lower().endswith(".txt")),
None,
)
if binary_fn:
results.append({
"filename": binary_fn,
"status": "ok",
"stored_filename": binary_fn,
"filesize": len(body),
"sha256": "00" * 32,
"report_attached": any(fn.lower().endswith(".txt") for fn in filenames),
"inserted": 1,
"skipped": 0,
})
payload = json.dumps({"count": len(results), "results": results}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def log_message(self, *_a, **_kw): # silence the test runner
pass
def _start_fake_server() -> tuple[http.server.HTTPServer, str]:
"""Start an HTTPServer on a random local port; return (server, base_url)."""
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler)
threading.Thread(target=server.serve_forever, daemon=True).start()
host, port = server.server_address
return server, f"http://{host}:{port}"
class TestForwardEventPair(unittest.TestCase):
def setUp(self):
_FakeImportHandler.received = []
self.server, self.base_url = _start_fake_server()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
def test_post_with_paired_report(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = tmp_p / "M529LK44.AB0"
txt_p = tmp_p / "M529LK44.AB0.TXT"
bin_p.write_bytes(b"\x10\x20\x30 binary")
txt_p.write_bytes(b'"Serial Number : BE11529"\n')
result = ef.forward_event_pair(
self.base_url, str(bin_p), str(txt_p), timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertEqual(result["filename"], "M529LK44.AB0")
self.assertTrue(result["report_attached"])
self.assertEqual(len(_FakeImportHandler.received), 1)
req = _FakeImportHandler.received[0]
self.assertEqual(req["path"], "/db/import/blastware_file")
self.assertIn("M529LK44.AB0", req["filenames"])
self.assertIn("M529LK44.AB0.TXT", req["filenames"])
def test_post_without_report(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "M529LK44.AB0"
bin_p.write_bytes(b"binary only")
result = ef.forward_event_pair(
self.base_url, str(bin_p), None, timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertFalse(result["report_attached"])
req = _FakeImportHandler.received[0]
self.assertEqual(req["filenames"], ["M529LK44.AB0"])
def test_post_propagates_serial_hint_in_query(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "M529LK44.AB0"
bin_p.write_bytes(b"x")
ef.forward_event_pair(
self.base_url, str(bin_p), None,
serial_hint="BE11529", timeout=5.0,
)
req = _FakeImportHandler.received[0]
self.assertIn("serial=BE11529", req["path"])
if __name__ == "__main__":
unittest.main()