From 815c643fb290e2d3f7bbd31a076eb4b483af6d9d Mon Sep 17 00:00:00 2001 From: serversdown Date: Sun, 10 May 2026 00:20:10 +0000 Subject: [PATCH] feat(forward): rate cap + seed-state mode for safe backfill (v1.5.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two safety nets for first-deploy on Blastware ACH machines that have accumulated tens or hundreds of thousands of historical events in the watch folder. 1. SFM_MAX_FORWARDS_PER_PASS (default 500, 0=unlimited) --------------------------------------------------- Cap on the number of events forwarded per scan tick. At the 60-second default interval that's ~30K events/hour throughput — the SFM server gets a steady drip instead of one giant burst. Scan now sorts by mtime ascending so backfill advances chronologically (oldest first) and successive scans always make progress instead of re-considering the same N newest files. Wired into: - event_forwarder.find_pending_events / forward_pending - series3_watcher.run_watcher loop - config-template.ini - settings_dialog SFM Forward tab (new "Max Events Per Pass" spinbox, validated in _on_save) 2. event_forwarder.py --seed-state CLI ----------------------------------- One-shot mode that walks the watch folder, sha256s every in-window event binary, and marks them all as already-forwarded WITHOUT POSTing anything. Run before flipping SFM_FORWARD_ENABLED=true to skip the historical backfill entirely — the watcher then only forwards events that appear AFTER the seed. Usage: python event_forwarder.py --seed-state \ --watch "C:\Blastware 10\Event\autocall home" \ --state "C:\...\sfm_forwarded.json" \ [--max-age-days 365] 7 new unit tests: - max_per_pass cap enforcement (=N, =0 unlimited, oldest-first ordering) - seed-state mode (in-window seeding, max-age skip, end-to-end skip-after-seed, idempotent re-runs) README adds a "First-time deployment" section walking through both options. Bumps to v1.5.2. --- CHANGELOG.md | 13 +++ README.md | 24 +++++- config-template.ini | 9 ++ event_forwarder.py | 179 +++++++++++++++++++++++++++++++++++++++- installer.iss | 2 +- series3_tray.py | 2 +- series3_watcher.py | 9 +- settings_dialog.py | 19 +++-- test_event_forwarder.py | 148 +++++++++++++++++++++++++++++++++ 9 files changed, 390 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a56f7d7..6d0232b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 --- +## [1.5.2] - 2026-05-10 + +### Added +- **`SFM_MAX_FORWARDS_PER_PASS` rate cap.** Default 500 events per scan tick (60-second interval = ~30K events/hour). Drips first-deploy backfill instead of hammering the SFM server with one giant burst on machines that have hundreds of thousands of historical events in the BW ACH folder. `0` = unlimited (preserves the 1.5.0 behaviour for ops who want it). +- **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. The recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear *from then on* get forwarded. +- New "Max Events Per Pass" spinbox in the Settings dialog's SFM Forward tab. +- README "First-time deployment" section documenting both options. +- 7 new unit tests covering the cap (oldest-first ordering, cap=0 unlimited, cap=N enforcement) and the seed-state mode (skips out-of-window files, idempotent across re-runs, end-to-end skip-after-seed). + +### Behaviour change + +The scan loop now sorts entries by mtime ascending before walking, so backfill always advances chronologically (oldest qualifying event first). Without the cap the visible behaviour is identical; with the cap it means each scan reliably advances and we never get stuck re-considering the same N newest files. + ## [1.5.1] - 2026-05-10 ### Added diff --git a/README.md b/README.md index 73c567c..cdfa706 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Series 3 Watcher v1.5.1 +# Series 3 Watcher v1.5.2 Monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. Runs as a **system tray app** that starts automatically on login, reports heartbeats to terra-view, and self-updates from Gitea. @@ -88,7 +88,7 @@ All settings live in `config.ini`. The Setup Wizard covers every field, but here | `UPDATE_SOURCE` | `gitea` (default) or `url` — where to check for updates | | `UPDATE_URL` | Base URL of the update server when `UPDATE_SOURCE = url` (e.g. terra-view URL). The watcher fetches `/api/updates/series3-watcher/version.txt` and `/api/updates/series3-watcher/series3-watcher.exe` from this base. | -### SFM Event Forwarder (v1.5.1+) +### SFM Event Forwarder (v1.5.2+) Forwards each Blastware event binary (and its paired `.TXT` ASCII report when present) to an SFM server's `/db/import/blastware_file` endpoint, where the report is parsed and the rich per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, sensor self-check) land in a searchable database. **Default-off** — existing deployments keep their old behaviour after auto-updating until the operator opts in. @@ -101,9 +101,27 @@ Forwards each Blastware event binary (and its paired `.TXT` ASCII report | `SFM_MISSING_REPORT_GRACE_SECONDS` | If a `.TXT` partner hasn't appeared after this many seconds, forward the binary alone (default `60`) | | `SFM_HTTP_TIMEOUT` | Per-request HTTP timeout in seconds (default `60`) | | `SFM_STATE_FILE` | Path to the JSON state file tracking sha256 of forwarded events. Leave blank to default to `/sfm_forwarded.json` | +| `SFM_MAX_FORWARDS_PER_PASS` | Max events forwarded per scan tick (default `500`, `0` = unlimited). Drip-feeds backfill so a folder with thousands of qualifying events doesn't hammer the SFM server in one giant burst. | Forwarded files are tracked by sha256 in the state file, so re-scans / restarts / auto-updates never re-POST the same content. A failed POST stays in the pending pool and is retried on the next interval. +#### First-time deployment on a folder with a large historical archive + +If you're enabling SFM forwarding on a Blastware ACH machine that's been accumulating events for years (tens or hundreds of thousands of files in the watch folder), you almost certainly **don't** want the watcher to forward all of them on first run. Two options: + +1. **Skip the historical backfill (recommended).** Run the seed-state CLI once before flipping `SFM_FORWARD_ENABLED=true`. It walks the folder, sha256s every existing in-window event, and marks them all as already-forwarded — without POSTing anything. The watcher then only forwards events that appear *after* the seed run. + + ``` + python event_forwarder.py --seed-state ^ + --watch "C:\Blastware 10\Event\autocall home" ^ + --state "C:\Users\\AppData\Local\Series3Watcher\agent_logs\sfm_forwarded.json" ^ + --max-age-days 365 + ``` + +2. **Throttle the backfill.** Leave `SFM_MAX_FORWARDS_PER_PASS` at its 500 default and let the watcher drip-feed. With a 60-second `SFM_FORWARD_INTERVAL_SECONDS` that's ~30K events/hour throughput. Backfill of 30K events takes about an hour, 100K takes ~3.5 hours. The cap fires per scan, so heartbeat and forwarding share the watcher's main loop without saturating it. + +Combine both for a fully controlled rollout: seed-state to skip the deep archive, then leave the cap on as a steady-state safety net. + --- ## Tray Icon @@ -137,7 +155,7 @@ To view connected watchers: **Settings → Developer → Watcher Manager**. ## Versioning -Follows **Semantic Versioning**. Current release: **v1.5.1**. +Follows **Semantic Versioning**. Current release: **v1.5.2**. See `CHANGELOG.md` for full history. --- diff --git a/config-template.ini b/config-template.ini index 29ca234..45799c9 100644 --- a/config-template.ini +++ b/config-template.ini @@ -63,3 +63,12 @@ SFM_HTTP_TIMEOUT = 60 # to default to /sfm_forwarded.json. SFM_STATE_FILE = +# Per-pass cap — forward at most N events per scan tick. 0 = unlimited. +# Default 500 throttles first-deploy backfill on machines with large +# historical archives (tens or hundreds of thousands of events) so +# the SFM server isn't hammered with one giant burst. At 60s scan +# interval × 500 events/pass that's 30K events/hour throughput. +# See README "First-time deployment" for the recommended +# `--seed-state` workflow that skips the historical backfill entirely. +SFM_MAX_FORWARDS_PER_PASS = 500 + diff --git a/event_forwarder.py b/event_forwarder.py index 9a3a08a..f2c971a 100644 --- a/event_forwarder.py +++ b/event_forwarder.py @@ -209,6 +209,7 @@ def find_pending_events( max_age_days: int, quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + max_per_pass: int = 0, ) -> List[Tuple[str, Optional[str]]]: """ Walk `watch_dir` and return the list of (binary_path, txt_path_or_None) @@ -226,6 +227,11 @@ def find_pending_events( missing_report_grace_seconds, we forward without the TXT. Younger binaries with a missing TXT are deferred — let BW finish writing the report. + - When `max_per_pass > 0`, return at most that many pairs. + Older files (lower mtime) are forwarded first so backfill + proceeds chronologically. Use this to drip-feed a folder + with thousands of qualifying events instead of hammering + the SFM server with one giant burst. """ if not os.path.isdir(watch_dir): log.warning("forward scan: watch dir not found: %s", watch_dir) @@ -248,6 +254,20 @@ def find_pending_events( # Cache existence of TXT partners so we don't stat() each twice. names = {e.name for e in entries if e.is_file()} + # Sort by mtime ASCENDING so chronological backfill happens oldest-first. + # When max_per_pass clamps the list, we always advance — we don't get + # stuck re-considering the same N newest files every scan. + def _mtime(entry: os.DirEntry) -> float: + try: + return entry.stat().st_mtime + except OSError: + return 0.0 + + entries = sorted( + (e for e in entries if e.is_file()), + key=_mtime, + ) + for e in entries: if not e.is_file(): continue @@ -302,9 +322,13 @@ def find_pending_events( # Stash size + digest on the tuple-replacement for use during forward; # callers can re-derive but caching avoids a second sha256. + # Per-pass cap: once we have enough pending, stop scanning. + if max_per_pass and len(pending) >= max_per_pass: + break + log.debug( - "forward scan: %d pending skipped_inflight=%d already_forwarded=%d", - len(pending), skipped_inflight, skipped_already_forwarded, + "forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d", + len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass, ) return pending @@ -442,6 +466,7 @@ def forward_pending( quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, timeout: float = DEFAULT_HTTP_TIMEOUT, + max_per_pass: int = 0, logger: Optional[Any] = None, ) -> Dict[str, int]: """ @@ -467,6 +492,7 @@ def forward_pending( max_age_days=max_age_days, quiescence_seconds=quiescence_seconds, missing_report_grace_seconds=missing_report_grace_seconds, + max_per_pass=max_per_pass, ) counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0} @@ -502,3 +528,152 @@ def forward_pending( ) return counts + + +# ── Seed-state mode (skip historical backfill on first deploy) ──────────────── + + +def seed_state_from_folder( + watch_dir: str, + state: ForwardState, + *, + max_age_days: int = 365, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """Walk `watch_dir` and mark every existing event binary as already + forwarded — without POSTing anything. + + This is the right tool for a first deploy on a machine that already + has tens or hundreds of thousands of historical events in the BW + ACH folder. Run it ONCE before enabling SFM_FORWARD_ENABLED: + + python event_forwarder.py --seed-state \ + --watch "C:\\Blastware 10\\Event\\autocall home" \ + --state "C:\\...\\sfm_forwarded.json" \ + [--max-age-days 365] + + The watcher then starts forwarding only events that appear AFTER + the seed run. Files older than `max_age_days` are still skipped + by the regular scan loop — we don't bother seeding them because + they wouldn't be forwarded anyway. + + Returns a counts dict: + {"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int} + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0} + + if not os.path.isdir(watch_dir): + _log(f"[seed] watch dir not found: {watch_dir}") + return counts + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + try: + with os.scandir(watch_dir) as it: + entries = [e for e in it if e.is_file()] + except OSError as exc: + _log(f"[seed] scandir failed on {watch_dir}: {exc}") + return counts + + for e in entries: + if not is_event_binary(e.path): + continue + counts["scanned"] += 1 + try: + mtime = e.stat().st_mtime + size = e.stat().st_size + except OSError: + continue + if (now_ts - mtime) > max_age_seconds: + counts["skipped_too_old"] += 1 + continue + try: + digest = sha256_of_file(e.path) + except OSError as exc: + _log(f"[seed] sha256 failed for {e.path}: {exc}") + continue + if state.is_forwarded(digest): + counts["already_known"] += 1 + continue + state.mark_forwarded(digest, e.name, size) + counts["seeded"] += 1 + if counts["seeded"] % 1000 == 0: + _log(f"[seed] progress: {counts['seeded']} seeded so far...") + + _log( + f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} " + f"already_known={counts['already_known']} " + f"skipped_too_old={counts['skipped_too_old']}" + ) + return counts + + +# ── CLI entry point ───────────────────────────────────────────────────────── + + +def _main() -> int: + """Command-line interface for one-shot operations. + + Currently supports a single mode: + + python event_forwarder.py --seed-state \ + --watch "" \ + --state "" \ + [--max-age-days 365] + + which marks every existing in-window event binary as already + forwarded (without POSTing) so the watcher only forwards events + appearing AFTER the seed. + """ + import argparse + parser = argparse.ArgumentParser( + description="Series 3 Watcher — SFM event forwarder utilities", + ) + parser.add_argument( + "--seed-state", action="store_true", + help="Mark every event binary in --watch as already-forwarded " + "(without POSTing). Use this BEFORE enabling SFM_FORWARD " + "on a machine with a large historical archive.", + ) + parser.add_argument( + "--watch", required=True, + help="Path to the Blastware ACH folder.", + ) + parser.add_argument( + "--state", required=True, + help="Path to the JSON state file. Will be created if missing.", + ) + parser.add_argument( + "--max-age-days", type=int, default=365, + help="Only seed files newer than this many days (default 365).", + ) + args = parser.parse_args() + + if not args.seed_state: + parser.error("specify --seed-state (no other modes supported yet)") + + print(f"[seed] watch_dir = {args.watch}") + print(f"[seed] state = {args.state}") + print(f"[seed] max_age = {args.max_age_days} days") + + state = ForwardState(args.state) + print(f"[seed] state currently has {state.count()} entries") + seed_state_from_folder( + args.watch, state, + max_age_days=args.max_age_days, + logger=lambda m: print(m), + ) + print(f"[seed] state now has {state.count()} entries") + return 0 + + +if __name__ == "__main__": + import sys + sys.exit(_main()) diff --git a/installer.iss b/installer.iss index d16051f..f910505 100644 --- a/installer.iss +++ b/installer.iss @@ -3,7 +3,7 @@ [Setup] AppName=Series 3 Watcher -AppVersion=1.5.1 +AppVersion=1.5.2 AppPublisher=Terra-Mechanics Inc. DefaultDirName={pf}\Series3Watcher DefaultGroupName=Series 3 Watcher diff --git a/series3_tray.py b/series3_tray.py index 142530a..b9e7fd7 100644 --- a/series3_tray.py +++ b/series3_tray.py @@ -1,5 +1,5 @@ """ -Series 3 Watcher — System Tray Launcher v1.5.1 +Series 3 Watcher — System Tray Launcher v1.5.2 Requires: pystray, Pillow, tkinter (stdlib) Run with: pythonw series3_tray.py (no console window) diff --git a/series3_watcher.py b/series3_watcher.py index 63cdfeb..43dd666 100644 --- a/series3_watcher.py +++ b/series3_watcher.py @@ -104,6 +104,12 @@ def load_config(path: str) -> Dict[str, Any]: # State file for forwarded-sha256 idempotency tracking. # Defaults next to the log file for easy operator access. "SFM_STATE_FILE": get_str("SFM_STATE_FILE", ""), + # Per-pass cap — forward at most N events per scan tick. + # 0 = unlimited. Default 500 as a safety against accidentally + # backfilling tens of thousands of events in one burst on + # first deploy in a folder that's been accumulating for years. + # See README "First-time deployment" section. + "SFM_MAX_FORWARDS_PER_PASS": get_int("SFM_MAX_FORWARDS_PER_PASS", 500), } @@ -241,7 +247,7 @@ def scan_latest( # --- API heartbeat / SFM telemetry helpers --- -VERSION = "1.5.1" +VERSION = "1.5.2" def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]: @@ -522,6 +528,7 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: cfg.get("SFM_MISSING_REPORT_GRACE_SECONDS", 60) ), timeout=int(cfg.get("SFM_HTTP_TIMEOUT", 60)), + max_per_pass=int(cfg.get("SFM_MAX_FORWARDS_PER_PASS", 500)), logger=lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), ) last_forward_ts = now_ts diff --git a/settings_dialog.py b/settings_dialog.py index 4a3a987..1ca02b9 100644 --- a/settings_dialog.py +++ b/settings_dialog.py @@ -1,5 +1,5 @@ """ -Series 3 Watcher — Settings Dialog v1.5.1 +Series 3 Watcher — Settings Dialog v1.5.2 Provides a Tkinter settings dialog that doubles as a first-run wizard. @@ -52,6 +52,7 @@ DEFAULTS = { "SFM_MISSING_REPORT_GRACE_SECONDS": "60", "SFM_HTTP_TIMEOUT": "60", "SFM_STATE_FILE": "", + "SFM_MAX_FORWARDS_PER_PASS": "500", } @@ -257,6 +258,7 @@ class SettingsDialog: self.var_sfm_missing_report_grace = tk.StringVar(value=v["SFM_MISSING_REPORT_GRACE_SECONDS"]) self.var_sfm_http_timeout = tk.StringVar(value=v["SFM_HTTP_TIMEOUT"]) self.var_sfm_state_file = tk.StringVar(value=v["SFM_STATE_FILE"]) + self.var_sfm_max_per_pass = tk.StringVar(value=v["SFM_MAX_FORWARDS_PER_PASS"]) # --- UI construction --- @@ -522,15 +524,16 @@ class SettingsDialog: self._sfm_test_status.grid(row=0, column=2, padx=(6, 0)) _add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 5, 3600) - _add_label_spinbox(f, 3, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60) - _add_label_spinbox(f, 4, "Missing-Report Grace (sec)", self.var_sfm_missing_report_grace, 0, 600) - _add_label_spinbox(f, 5, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 600) + _add_label_spinbox(f, 3, "Max Events Per Pass", self.var_sfm_max_per_pass, 0, 100000) + _add_label_spinbox(f, 4, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60) + _add_label_spinbox(f, 5, "Missing-Report Grace (sec)", self.var_sfm_missing_report_grace, 0, 600) + _add_label_spinbox(f, 6, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 600) tk.Label(f, text="State File", anchor="w").grid( - row=6, column=0, sticky="w", padx=(8, 4), pady=4 + row=7, column=0, sticky="w", padx=(8, 4), pady=4 ) state_frame = tk.Frame(f) - state_frame.grid(row=6, column=1, sticky="ew", padx=(0, 8), pady=4) + state_frame.grid(row=7, column=1, sticky="ew", padx=(0, 8), pady=4) state_frame.columnconfigure(0, weight=1) state_entry = ttk.Entry(state_frame, textvariable=self.var_sfm_state_file, width=32) @@ -560,7 +563,7 @@ class SettingsDialog: "to default to /sfm_forwarded.json." ) tk.Label(f, text=hint_text, justify="left", fg="#555555", wraplength=380).grid( - row=7, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4) + row=8, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4) ) def _test_sfm_connection(self): @@ -637,6 +640,7 @@ class SettingsDialog: (self.var_sfm_quiescence, "SFM Quiescence", 1, 60, 5), (self.var_sfm_missing_report_grace, "SFM Missing-Report Grace", 0, 600, 60), (self.var_sfm_http_timeout, "SFM HTTP Timeout", 5, 600, 60), + (self.var_sfm_max_per_pass, "SFM Max Events Per Pass", 0, 100000, 500), ] int_values = {} for var, name, mn, mx, dflt in checks: @@ -693,6 +697,7 @@ class SettingsDialog: "SFM_MISSING_REPORT_GRACE_SECONDS": str(int_values["SFM Missing-Report Grace"]), "SFM_HTTP_TIMEOUT": str(int_values["SFM HTTP Timeout"]), "SFM_STATE_FILE": self.var_sfm_state_file.get().strip(), + "SFM_MAX_FORWARDS_PER_PASS": str(int_values["SFM Max Events Per Pass"]), } try: diff --git a/test_event_forwarder.py b/test_event_forwarder.py index df4268f..8f0de8d 100644 --- a/test_event_forwarder.py +++ b/test_event_forwarder.py @@ -223,6 +223,154 @@ class TestFindPendingEvents(unittest.TestCase): ) self.assertEqual(len(pending), 0) + def test_max_per_pass_caps_returned_count(self): + """When max_per_pass is set, return at most that many pairs.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # Create 5 distinct event binaries with paired .TXTs + for i, name in enumerate( + ["M529LK01.AB0", "M529LK02.AB0", "M529LK03.AB0", + "M529LK04.AB0", "M529LK05.AB0"], + ): + self._make(tmp_p, name, age_seconds=120 + i, + content=("bin-" + str(i)).encode()) + self._make(tmp_p, name + ".TXT", age_seconds=110 + i, + content=b"report") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + max_per_pass=2, + ) + self.assertEqual(len(pending), 2) + + def test_max_per_pass_zero_means_unlimited(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + for i in range(4): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=120 + i, + content=("bin-" + str(i)).encode()) + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, + quiescence_seconds=5, + missing_report_grace_seconds=60, + max_per_pass=0, + ) + self.assertEqual(len(pending), 4) + + def test_max_per_pass_returns_oldest_first(self): + """Backfill should advance chronologically — oldest qualifying + files first. This way successive scans always make progress + instead of getting stuck re-considering the same N newest files.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + # ages: 200s (oldest), 150s, 100s, 50s (skipped — within grace) + ages = [200, 150, 100, 50] + for i, age in enumerate(ages): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=age, content=("c" + str(i)).encode()) + self._make(tmp_p, "M529LK0{}.AB0.TXT".format(i), + age_seconds=age - 10, content=b"r") + + state = ef.ForwardState(str(tmp_p / "fwd.json")) + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, max_per_pass=2, + ) + # Oldest two should be M529LK00 (200s) and M529LK01 (150s) + names = [os.path.basename(p[0]) for p in pending] + self.assertEqual(names, ["M529LK00.AB0", "M529LK01.AB0"]) + + +# ── Seed-state mode ────────────────────────────────────────────────────────── + + +class TestSeedStateFromFolder(unittest.TestCase): + + def _make(self, dir_path: Path, name: str, age_seconds: float = 100, + content: bytes = b"x") -> Path: + p = dir_path / name + p.write_bytes(content) + target = time.time() - age_seconds + os.utime(p, (target, target)) + return p + + def test_seeds_every_in_window_event_without_posting(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + for i in range(3): + self._make(tmp_p, "M529LK0{}.AB0".format(i), + age_seconds=120 + i, content=("e" + str(i)).encode()) + # Plus a non-event file we should ignore + self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts = ef.seed_state_from_folder( + str(tmp_p), state, max_age_days=30, + ) + self.assertEqual(counts["scanned"], 3) + self.assertEqual(counts["seeded"], 3) + self.assertEqual(counts["already_known"], 0) + self.assertEqual(state.count(), 3) + + def test_seed_skips_files_beyond_max_age_days(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"new") + self._make(tmp_p, "M529LK02.AB0", age_seconds=10 * 86400, + content=b"in-window") # 10d < 30d cutoff + self._make(tmp_p, "M529LK03.AB0", age_seconds=400 * 86400, + content=b"way-old") # 400d > 30d cutoff + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts = ef.seed_state_from_folder( + str(tmp_p), state, max_age_days=30, + ) + self.assertEqual(counts["seeded"], 2) + self.assertEqual(counts["skipped_too_old"], 1) + + def test_seeded_files_are_then_skipped_by_normal_scan(self): + """End-to-end: seed once, then a normal scan should produce + zero pending events for the seeded files.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") + self._make(tmp_p, "M529LK01.AB0.TXT", age_seconds=110, content=b"r") + self._make(tmp_p, "M529LK02.AB0", age_seconds=120, content=b"y") + self._make(tmp_p, "M529LK02.AB0.TXT", age_seconds=110, content=b"r") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + + pending = ef.find_pending_events( + str(tmp_p), state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0, + "seed should have marked everything already-forwarded") + + def test_seed_is_idempotent(self): + """Re-running seed twice doesn't duplicate entries or POST anything.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") + + state = ef.ForwardState(str(tmp_p / "seed.json")) + counts1 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + counts2 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) + self.assertEqual(counts1["seeded"], 1) + self.assertEqual(counts2["seeded"], 0) + self.assertEqual(counts2["already_known"], 1) + self.assertEqual(state.count(), 1) + # ── Multipart encoder ────────────────────────────────────────────────────────