feat(forward): re-pair late-arriving TXTs on subsequent scans
When a binary is forwarded WITHOUT its paired _ASCII.TXT (because
the TXT wasn't quiescent within the grace period — BW slow to
write, AV scanning, etc.), the old behaviour was to permanently
mark the binary as "done" in the state file, even though the TXT
might land seconds later. Result: that event lived in SFM forever
with broken-codec peak values and no project info.
Fix: state entries now carry a had_report flag. Forwards without
a TXT set had_report=False. On subsequent scans, the watcher
treats had_report=False entries as re-pair candidates — they get
re-forwarded once the TXT appears, and the SFM server's upsert
path (in seismo-relay's insert_events IntegrityError handler)
refreshes the DB row with the report's authoritative values.
Three status states in ForwardState.status(sha256):
None — never forwarded. First-forward path.
True — forwarded successfully WITH report (or legacy entry
without the had_report field). Permanently done.
False — forwarded WITHOUT report. Re-pair if TXT now exists.
Backward compat: legacy state-file entries (no had_report key)
default to True so existing deployments don't unexpectedly
re-forward every entry on upgrade.
Tests cover:
- re-pair when TXT appears after a had_report=False forward
- had_report=True entries stay skipped permanently
- legacy entries (missing field) treated as fully forwarded
- state.status() returns None for unknown sha
- re-marking had_report=False then True promotes to fully-done
36 watcher tests pass (was 31, +5 new).
This commit is contained in:
@@ -13,6 +13,7 @@ First release of the SFM event forwarder.
|
|||||||
### Added — SFM event forwarder
|
### Added — SFM event forwarder
|
||||||
- **Forward Blastware event binaries (+ paired BW ACH ASCII reports) to an SFM server.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every event binary in the BW ACH watch folder is POSTed as multipart to `/db/import/blastware_file` along with its `<stem>_<ext>_ASCII.TXT` partner report (BW ACH convention; manual-export `<binary>.TXT` is also supported as a fallback). SFM parses the report and indexes the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) into a searchable database — no codec decoding required.
|
- **Forward Blastware event binaries (+ paired BW ACH ASCII reports) to an SFM server.** When `SFM_FORWARD_ENABLED=true` and `SFM_URL` is set, every event binary in the BW ACH watch folder is POSTed as multipart to `/db/import/blastware_file` along with its `<stem>_<ext>_ASCII.TXT` partner report (BW ACH convention; manual-export `<binary>.TXT` is also supported as a fallback). SFM parses the report and indexes the full per-channel stats (PPV, ZC Freq, Time of Peak, Peak Acceleration / Displacement, Peak Vector Sum + time, sensor self-check Pass/Fail, monitor-log timestamps) into a searchable database — no codec decoding required.
|
||||||
- **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `<log dir>/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates.
|
- **Idempotent forwarding.** Forwarded files are tracked by sha256 in a JSON state file (default `<log dir>/sfm_forwarded.json`, override via `SFM_STATE_FILE`). Re-scans don't re-POST and the state survives restarts / auto-updates.
|
||||||
|
- **Re-pair on late-arriving TXT.** When the watcher forwards a binary alone (`_ASCII.TXT` partner didn't appear within `SFM_MISSING_REPORT_GRACE_SECONDS`), the state file records `had_report: false`. On subsequent scans, the watcher re-checks whether the TXT has since arrived. If yes, the event is re-forwarded with the TXT attached — the SFM server's upsert path refreshes the DB row with the report's device-authoritative peak / project values. Without this, slow-disk or AV-interrupted TXT writes would permanently leave that event with broken-codec peaks in the SFM database. Legacy state-file entries (without the `had_report` field) default to `had_report: true` so an upgrade doesn't unexpectedly re-forward existing entries.
|
||||||
- **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's report partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever.
|
- **Quiescence + grace-period guards.** Files modified within `SFM_QUIESCENCE_SECONDS` (default 5s) are skipped to avoid forwarding mid-write. If a binary's report partner hasn't appeared after `SFM_MISSING_REPORT_GRACE_SECONDS` (default 60s), the binary is forwarded alone rather than blocking forever.
|
||||||
- **Per-pass rate cap.** `SFM_MAX_FORWARDS_PER_PASS` (default 500) drips first-deploy backfill instead of hammering the SFM server in one burst. At 60-second `SFM_FORWARD_INTERVAL_SECONDS` cadence that's ~30K events/hour throughput. Set to `0` for unlimited. Scan walks oldest-first so backfill advances chronologically and successive scans reliably progress.
|
- **Per-pass rate cap.** `SFM_MAX_FORWARDS_PER_PASS` (default 500) drips first-deploy backfill instead of hammering the SFM server in one burst. At 60-second `SFM_FORWARD_INTERVAL_SECONDS` cadence that's ~30K events/hour throughput. Set to `0` for unlimited. Scan walks oldest-first so backfill advances chronologically and successive scans reliably progress.
|
||||||
- **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. Recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear from then on get forwarded.
|
- **`event_forwarder.py --seed-state` CLI mode.** Walks the watch folder once, sha256s every in-window event, and marks them all as already-forwarded *without* POSTing anything. Recommended pre-deploy workflow on machines with a large historical archive — flip `SFM_FORWARD_ENABLED=true` after seeding and only events that appear from then on get forwarded.
|
||||||
|
|||||||
+75
-5
@@ -218,11 +218,53 @@ class ForwardState:
|
|||||||
def is_forwarded(self, sha256: str) -> bool:
|
def is_forwarded(self, sha256: str) -> bool:
|
||||||
return sha256 in self._data["forwarded"]
|
return sha256 in self._data["forwarded"]
|
||||||
|
|
||||||
def mark_forwarded(self, sha256: str, filename: str, size: int) -> None:
|
def status(self, sha256: str) -> Optional[bool]:
|
||||||
|
"""Return forwarding status for *sha256*.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None — never forwarded. Eligible for a fresh forward.
|
||||||
|
True — forwarded successfully with its paired report
|
||||||
|
(or in a legacy entry that pre-dates the
|
||||||
|
had_report field — assumed complete for safety).
|
||||||
|
NOT a candidate for re-forward.
|
||||||
|
False — forwarded WITHOUT its paired ``_ASCII.TXT``
|
||||||
|
(BW's TXT-write lagged past the grace period).
|
||||||
|
Eligible for re-forward IF the TXT now exists,
|
||||||
|
so the SFM server's upsert path can refresh the
|
||||||
|
DB row with the report's authoritative values.
|
||||||
|
|
||||||
|
Legacy state-file entries without a ``had_report`` key default
|
||||||
|
to ``True`` so an upgrade doesn't unexpectedly re-forward
|
||||||
|
every entry the operator has accumulated.
|
||||||
|
"""
|
||||||
|
entry = self._data["forwarded"].get(sha256)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
return bool(entry.get("had_report", True))
|
||||||
|
|
||||||
|
def mark_forwarded(
|
||||||
|
self,
|
||||||
|
sha256: str,
|
||||||
|
filename: str,
|
||||||
|
size: int,
|
||||||
|
had_report: bool = True,
|
||||||
|
) -> None:
|
||||||
|
"""Record a successful forward.
|
||||||
|
|
||||||
|
Set ``had_report=False`` when the forward shipped the binary
|
||||||
|
without its paired ASCII report. Such entries are re-checked
|
||||||
|
on subsequent scans and re-forwarded once the TXT appears, so
|
||||||
|
SFM's upsert refreshes the DB row with the device-authoritative
|
||||||
|
peak/project values.
|
||||||
|
|
||||||
|
Idempotent: re-marking an existing sha256 with ``had_report=True``
|
||||||
|
is the explicit promotion path used when a re-pair succeeds.
|
||||||
|
"""
|
||||||
self._data["forwarded"][sha256] = {
|
self._data["forwarded"][sha256] = {
|
||||||
"filename": filename,
|
"filename": filename,
|
||||||
"size": size,
|
"size": size,
|
||||||
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||||||
|
"had_report": had_report,
|
||||||
}
|
}
|
||||||
self._save()
|
self._save()
|
||||||
|
|
||||||
@@ -340,12 +382,19 @@ def find_pending_events(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# Idempotency: skip if we already forwarded this content
|
# Idempotency: skip if we already forwarded this content
|
||||||
|
# successfully. Three cases via state.status(digest):
|
||||||
|
# True — forwarded WITH report → permanently done, skip.
|
||||||
|
# False — forwarded WITHOUT report → re-pair candidate.
|
||||||
|
# Forward again only if a paired TXT is now present
|
||||||
|
# so SFM's upsert refreshes the DB row.
|
||||||
|
# None — never forwarded → normal first-forward path.
|
||||||
try:
|
try:
|
||||||
digest = sha256_of_file(e.path)
|
digest = sha256_of_file(e.path)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
|
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
|
||||||
continue
|
continue
|
||||||
if state.is_forwarded(digest):
|
fwd_status = state.status(digest)
|
||||||
|
if fwd_status is True:
|
||||||
skipped_already_forwarded += 1
|
skipped_already_forwarded += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -380,9 +429,19 @@ def find_pending_events(
|
|||||||
txt_path = candidate
|
txt_path = candidate
|
||||||
# else: TXT is mid-write; treat as not-yet-paired and defer.
|
# else: TXT is mid-write; treat as not-yet-paired and defer.
|
||||||
|
|
||||||
|
if fwd_status is False:
|
||||||
|
# Previously forwarded WITHOUT report. We're here looking
|
||||||
|
# for a re-pair opportunity. If the TXT is now present
|
||||||
|
# and quiescent, include in pending for re-forward (the
|
||||||
|
# SFM server's upsert will refresh the DB row with the
|
||||||
|
# report's authoritative values). Otherwise skip — no
|
||||||
|
# point re-forwarding the same binary alone again.
|
||||||
if txt_path is None:
|
if txt_path is None:
|
||||||
# No TXT (or not yet quiescent). Wait for the grace
|
skipped_already_forwarded += 1
|
||||||
# period before forwarding alone.
|
continue
|
||||||
|
elif txt_path is None:
|
||||||
|
# First-time forward and TXT not yet present. Wait for the
|
||||||
|
# grace period before forwarding alone.
|
||||||
if (now_ts - mtime) < missing_report_grace_seconds:
|
if (now_ts - mtime) < missing_report_grace_seconds:
|
||||||
skipped_inflight += 1
|
skipped_inflight += 1
|
||||||
continue
|
continue
|
||||||
@@ -575,7 +634,18 @@ def forward_pending(
|
|||||||
try:
|
try:
|
||||||
digest = sha256_of_file(binary_path)
|
digest = sha256_of_file(binary_path)
|
||||||
size = os.path.getsize(binary_path)
|
size = os.path.getsize(binary_path)
|
||||||
state.mark_forwarded(digest, os.path.basename(binary_path), size)
|
# Record whether this forward shipped a paired TXT.
|
||||||
|
# Forwards without a TXT are flagged had_report=False so
|
||||||
|
# subsequent scans re-check whether the TXT has since
|
||||||
|
# appeared and trigger a re-forward (the SFM server's
|
||||||
|
# upsert path refreshes the DB row with the report's
|
||||||
|
# authoritative values).
|
||||||
|
state.mark_forwarded(
|
||||||
|
digest,
|
||||||
|
os.path.basename(binary_path),
|
||||||
|
size,
|
||||||
|
had_report=(txt_path is not None),
|
||||||
|
)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
_log(f"[forward] post-success state save failed for "
|
_log(f"[forward] post-success state save failed for "
|
||||||
f"{os.path.basename(binary_path)}: {exc}")
|
f"{os.path.basename(binary_path)}: {exc}")
|
||||||
|
|||||||
@@ -285,6 +285,95 @@ class TestFindPendingEvents(unittest.TestCase):
|
|||||||
self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0")
|
self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0")
|
||||||
self.assertIsNone(txt_path)
|
self.assertIsNone(txt_path)
|
||||||
|
|
||||||
|
def test_re_pair_after_late_arriving_txt(self):
|
||||||
|
"""If we forwarded the binary alone (TXT was late) and the TXT
|
||||||
|
later appears, the binary becomes eligible for re-forward."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_p = Path(tmp)
|
||||||
|
bin_p = self._make(tmp_p, "M529LK44.AB0",
|
||||||
|
age_seconds=200, content=b"binary")
|
||||||
|
# Mark as already-forwarded WITHOUT a paired report (the
|
||||||
|
# state we'd be in after a TXT-too-late forward).
|
||||||
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
||||||
|
digest = ef.sha256_of_file(str(bin_p))
|
||||||
|
state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"),
|
||||||
|
had_report=False)
|
||||||
|
|
||||||
|
# First scan: TXT not present yet → still skipped.
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(tmp_p), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(pending, [],
|
||||||
|
"no TXT present → no re-pair attempt")
|
||||||
|
|
||||||
|
# Now BW finally writes the TXT.
|
||||||
|
self._make(tmp_p, "M529LK44.AB0.TXT",
|
||||||
|
age_seconds=100, content=b"report")
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(tmp_p), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 1,
|
||||||
|
"TXT now present → re-pair attempt expected")
|
||||||
|
self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0")
|
||||||
|
self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT")
|
||||||
|
|
||||||
|
def test_re_pair_not_attempted_when_already_had_report(self):
|
||||||
|
"""Successful WITH-report forwards stay permanently skipped.
|
||||||
|
Adding more files later does NOT trigger a re-forward."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_p = Path(tmp)
|
||||||
|
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"x")
|
||||||
|
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"r")
|
||||||
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
||||||
|
state.mark_forwarded(ef.sha256_of_file(str(bin_p)),
|
||||||
|
"M529LK44.AB0", 1, had_report=True)
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(tmp_p), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(pending, [],
|
||||||
|
"had_report=True forwards stay skipped")
|
||||||
|
|
||||||
|
def test_legacy_state_entries_default_to_had_report_true(self):
|
||||||
|
"""Backward compat: state-file entries from before the
|
||||||
|
had_report field existed are treated as fully forwarded so
|
||||||
|
an upgrade doesn't re-forward every entry."""
|
||||||
|
import json
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_p = Path(tmp)
|
||||||
|
path = str(tmp_p / "fwd.json")
|
||||||
|
with open(path, "w") as f:
|
||||||
|
json.dump({
|
||||||
|
"version": 1,
|
||||||
|
"forwarded": {
|
||||||
|
"abc123": {
|
||||||
|
"filename": "M529LK01.AB0",
|
||||||
|
"size": 123,
|
||||||
|
"forwarded_at": "2025-01-01T00:00:00Z",
|
||||||
|
# No had_report field — legacy entry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, f)
|
||||||
|
state = ef.ForwardState(path)
|
||||||
|
self.assertIs(state.status("abc123"), True,
|
||||||
|
"legacy entry must default to 'fully forwarded'")
|
||||||
|
|
||||||
|
def test_state_status_returns_none_for_unknown_sha(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
|
||||||
|
self.assertIs(state.status("never-seen"), None)
|
||||||
|
|
||||||
|
def test_state_mark_with_had_report_false(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
|
||||||
|
state.mark_forwarded("xyz", "f.AB0", 100, had_report=False)
|
||||||
|
self.assertIs(state.status("xyz"), False)
|
||||||
|
# Subsequent re-mark with had_report=True promotes to done.
|
||||||
|
state.mark_forwarded("xyz", "f.AB0", 100, had_report=True)
|
||||||
|
self.assertIs(state.status("xyz"), True)
|
||||||
|
|
||||||
def test_defers_when_txt_missing_and_within_grace(self):
|
def test_defers_when_txt_missing_and_within_grace(self):
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
tmp_p = Path(tmp)
|
tmp_p = Path(tmp)
|
||||||
|
|||||||
Reference in New Issue
Block a user