diff --git a/CHANGELOG.md b/CHANGELOG.md index 92245e0..133eb76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ First release of the SFM event forwarder. ### Added — SFM event forwarder - **Forward Blastware event binaries (+ paired BW ACH ASCII reports) to an SFM server.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every event binary in the BW ACH watch folder is POSTed as multipart to `/db/import/blastware_file` along with its `__ASCII.TXT` partner report (BW ACH convention; manual-export `.TXT` is also supported as a fallback). SFM parses the report and indexes the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) into a searchable database — no codec decoding required. - **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates. +- **Re-pair on late-arriving TXT.** When the watcher forwards a binary alone (`_ASCII.TXT` partner didn't appear within `SFM_MISSING_REPORT_GRACE_SECONDS`), the state file records `had_report: false`. On subsequent scans, the watcher re-checks whether the TXT has since arrived. If yes, the event is re-forwarded with the TXT attached — the SFM server's upsert path refreshes the DB row with the report's device-authoritative peak / project values. Without this, slow-disk or AV-interrupted TXT writes would permanently leave that event with broken-codec peaks in the SFM database. Legacy state-file entries (without the `had_report` field) default to `had_report: true` so an upgrade doesn't unexpectedly re-forward existing entries. - **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's report partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever. - **Per-pass rate cap.** `SFM_MAX_FORWARDS_PER_PASS` (default 500) drips first-deploy backfill instead of hammering the SFM server in one burst. At 60-second `SFM_FORWARD_INTERVAL_SECONDS` cadence that's ~30K events/hour throughput. Set to `0` for unlimited. Scan walks oldest-first so backfill advances chronologically and successive scans reliably progress. - **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. Recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear from then on get forwarded. diff --git a/event_forwarder.py b/event_forwarder.py index d2311ac..cbc6d2b 100644 --- a/event_forwarder.py +++ b/event_forwarder.py @@ -218,11 +218,53 @@ class ForwardState: def is_forwarded(self, sha256: str) -> bool: return sha256 in self._data["forwarded"] - def mark_forwarded(self, sha256: str, filename: str, size: int) -> None: + def status(self, sha256: str) -> Optional[bool]: + """Return forwarding status for *sha256*. + + Returns: + None — never forwarded. Eligible for a fresh forward. + True — forwarded successfully with its paired report + (or in a legacy entry that pre-dates the + had_report field — assumed complete for safety). + NOT a candidate for re-forward. + False — forwarded WITHOUT its paired ``_ASCII.TXT`` + (BW's TXT-write lagged past the grace period). + Eligible for re-forward IF the TXT now exists, + so the SFM server's upsert path can refresh the + DB row with the report's authoritative values. + + Legacy state-file entries without a ``had_report`` key default + to ``True`` so an upgrade doesn't unexpectedly re-forward + every entry the operator has accumulated. + """ + entry = self._data["forwarded"].get(sha256) + if entry is None: + return None + return bool(entry.get("had_report", True)) + + def mark_forwarded( + self, + sha256: str, + filename: str, + size: int, + had_report: bool = True, + ) -> None: + """Record a successful forward. + + Set ``had_report=False`` when the forward shipped the binary + without its paired ASCII report. Such entries are re-checked + on subsequent scans and re-forwarded once the TXT appears, so + SFM's upsert refreshes the DB row with the device-authoritative + peak/project values. + + Idempotent: re-marking an existing sha256 with ``had_report=True`` + is the explicit promotion path used when a re-pair succeeds. + """ self._data["forwarded"][sha256] = { "filename": filename, "size": size, "forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "had_report": had_report, } self._save() @@ -340,12 +382,19 @@ def find_pending_events( continue # Idempotency: skip if we already forwarded this content + # successfully. Three cases via state.status(digest): + # True — forwarded WITH report → permanently done, skip. + # False — forwarded WITHOUT report → re-pair candidate. + # Forward again only if a paired TXT is now present + # so SFM's upsert refreshes the DB row. + # None — never forwarded → normal first-forward path. try: digest = sha256_of_file(e.path) except OSError as exc: log.warning("forward scan: sha256 failed for %s: %s", e.path, exc) continue - if state.is_forwarded(digest): + fwd_status = state.status(digest) + if fwd_status is True: skipped_already_forwarded += 1 continue @@ -380,9 +429,19 @@ def find_pending_events( txt_path = candidate # else: TXT is mid-write; treat as not-yet-paired and defer. - if txt_path is None: - # No TXT (or not yet quiescent). Wait for the grace - # period before forwarding alone. + if fwd_status is False: + # Previously forwarded WITHOUT report. We're here looking + # for a re-pair opportunity. If the TXT is now present + # and quiescent, include in pending for re-forward (the + # SFM server's upsert will refresh the DB row with the + # report's authoritative values). Otherwise skip — no + # point re-forwarding the same binary alone again. + if txt_path is None: + skipped_already_forwarded += 1 + continue + elif txt_path is None: + # First-time forward and TXT not yet present. Wait for the + # grace period before forwarding alone. if (now_ts - mtime) < missing_report_grace_seconds: skipped_inflight += 1 continue @@ -575,7 +634,18 @@ def forward_pending( try: digest = sha256_of_file(binary_path) size = os.path.getsize(binary_path) - state.mark_forwarded(digest, os.path.basename(binary_path), size) + # Record whether this forward shipped a paired TXT. + # Forwards without a TXT are flagged had_report=False so + # subsequent scans re-check whether the TXT has since + # appeared and trigger a re-forward (the SFM server's + # upsert path refreshes the DB row with the report's + # authoritative values). + state.mark_forwarded( + digest, + os.path.basename(binary_path), + size, + had_report=(txt_path is not None), + ) except OSError as exc: _log(f"[forward] post-success state save failed for " f"{os.path.basename(binary_path)}: {exc}") diff --git a/test_event_forwarder.py b/test_event_forwarder.py index 36f9c42..e425fe6 100644 --- a/test_event_forwarder.py +++ b/test_event_forwarder.py @@ -285,6 +285,95 @@ class TestFindPendingEvents(unittest.TestCase): self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0") self.assertIsNone(txt_path) + def test_re_pair_after_late_arriving_txt(self): + """If we forwarded the binary alone (TXT was late) and the TXT + later appears, the binary becomes eligible for re-forward.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", + age_seconds=200, content=b"binary") + # Mark as already-forwarded WITHOUT a paired report (the + # state we'd be in after a TXT-too-late forward). + state = ef.ForwardState(str(tmp_p / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"), + had_report=False) + + # First scan: TXT not present yet → still skipped. + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(pending, [], + "no TXT present → no re-pair attempt") + + # Now BW finally writes the TXT. + self._make(tmp_p, "M529LK44.AB0.TXT", + age_seconds=100, content=b"report") + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1, + "TXT now present → re-pair attempt expected") + self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0") + self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") + + def test_re_pair_not_attempted_when_already_had_report(self): + """Successful WITH-report forwards stay permanently skipped. + Adding more files later does NOT trigger a re-forward.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"x") + self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"r") + state = ef.ForwardState(str(tmp_p / "fwd.json")) + state.mark_forwarded(ef.sha256_of_file(str(bin_p)), + "M529LK44.AB0", 1, had_report=True) + pending = ef.find_pending_events( + str(tmp_p), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(pending, [], + "had_report=True forwards stay skipped") + + def test_legacy_state_entries_default_to_had_report_true(self): + """Backward compat: state-file entries from before the + had_report field existed are treated as fully forwarded so + an upgrade doesn't re-forward every entry.""" + import json + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + path = str(tmp_p / "fwd.json") + with open(path, "w") as f: + json.dump({ + "version": 1, + "forwarded": { + "abc123": { + "filename": "M529LK01.AB0", + "size": 123, + "forwarded_at": "2025-01-01T00:00:00Z", + # No had_report field — legacy entry + } + } + }, f) + state = ef.ForwardState(path) + self.assertIs(state.status("abc123"), True, + "legacy entry must default to 'fully forwarded'") + + def test_state_status_returns_none_for_unknown_sha(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + self.assertIs(state.status("never-seen"), None) + + def test_state_mark_with_had_report_false(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + state.mark_forwarded("xyz", "f.AB0", 100, had_report=False) + self.assertIs(state.status("xyz"), False) + # Subsequent re-mark with had_report=True promotes to done. + state.mark_forwarded("xyz", "f.AB0", 100, had_report=True) + self.assertIs(state.status("xyz"), True) + def test_defers_when_txt_missing_and_within_grace(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp)