""" test_event_forwarder.py — unit tests for the SFM event forwarder. Covers: - is_event_binary() filename matching (positive + negative cases) - ForwardState load/save round-trip + idempotency check - find_pending_events() pairing + quiescence + grace-period logic - _encode_multipart() byte-level shape (boundary + headers) - forward_event_pair() end-to-end against a tiny stdlib HTTP server that mimics the SFM /db/import/blastware_file endpoint Stdlib only — runs with `python -m pytest test_event_forwarder.py` on Python 3.8+ (the watcher's compat target). """ from __future__ import annotations import http.server import json import os import socket import tempfile import threading import time import unittest from pathlib import Path import event_forwarder as ef # ── is_event_binary() ──────────────────────────────────────────────────────── class TestIsEventBinary(unittest.TestCase): def test_recognises_typical_blastware_filenames(self): for name in [ "M529LK44.AB0", "M529LKVQ.6S0", "M529LKVQ.6S0W", "S353L4H0.3M0W", "P036L318.C80H", "M529LIY6.N00", ]: self.assertTrue(ef.is_event_binary(name), name) def test_rejects_lowercase_extensions_we_explicitly_exclude(self): for name in ["BE11529.MLG", "M529LK44.AB0.TXT", "agent.log", "config.ini", "foo.bak", "bar.tmp", "something.h5", "noise.json"]: self.assertFalse(ef.is_event_binary(name), name) def test_ach_report_name(self): """BW ACH convention: .__ASCII.TXT""" cases = [ ("M529LK44.AB0", "M529LK44_AB0_ASCII.TXT"), ("N844L20G.630H", "N844L20G_630H_ASCII.TXT"), ("I145L64P.GD0W", "I145L64P_GD0W_ASCII.TXT"), ("H907L1R7.PG0H", "H907L1R7_PG0H_ASCII.TXT"), ] for binary, expected in cases: self.assertEqual(ef.ach_report_name(binary), expected, binary) def test_legacy_report_name(self): """Manual-export convention: .TXT""" self.assertEqual(ef.legacy_report_name("M529LK44.AB0"), "M529LK44.AB0.TXT") def test_is_histogram_event(self): # 4-char extension ending in H = histogram for name in ["H907L1R7.PG0H", "S353L4H0.8S0H", "P036L318.C80H"]: self.assertTrue(ef.is_histogram_event(name), name) # 4-char extension ending in W = waveform for name in ["S353L4H0.3M0W", "M529LKVQ.6S0W", "P036L318.C80W"]: self.assertFalse(ef.is_histogram_event(name), name) # 3-char old-firmware extensions can't be classified — return False for name in ["M529LK44.AB0", "M529LIY6.N00", "M529LJ8V.490"]: self.assertFalse(ef.is_histogram_event(name), name) def test_rejects_non_matching_filenames(self): for name in ["", "no_extension", "TooShort.AB0", # stem must be 8 chars "TOOLONG12345.AB0", # stem must be 8 chars "M529LK44.A", # ext too short "M529LK44.ABCDE", # ext too long "M52.AB0", # stem too short "1234ABCD.AB0"]: # first char must be letter self.assertFalse(ef.is_event_binary(name), name) # ── ForwardState ───────────────────────────────────────────────────────────── class TestForwardState(unittest.TestCase): def test_round_trip_persists_marked_entries(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") s = ef.ForwardState(path) self.assertFalse(s.is_forwarded("abc123")) s.mark_forwarded("abc123", "M529LK44.AB0", 4400) self.assertTrue(s.is_forwarded("abc123")) # Re-load from disk s2 = ef.ForwardState(path) self.assertTrue(s2.is_forwarded("abc123")) self.assertEqual(s2.count(), 1) def test_corrupt_state_file_starts_fresh(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") with open(path, "w") as f: f.write("not valid json {{{") s = ef.ForwardState(path) self.assertEqual(s.count(), 0) def test_version_mismatch_starts_fresh(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") with open(path, "w") as f: json.dump({"version": 999, "forwarded": {"x": {}}}, f) s = ef.ForwardState(path) self.assertEqual(s.count(), 0) # ── find_pending_events() ──────────────────────────────────────────────────── class TestFindPendingEvents(unittest.TestCase): def _make(self, dir_path: Path, name: str, age_seconds: float = 100, content: bytes = b"x") -> Path: """Create a file with controlled mtime.""" p = dir_path / name p.write_bytes(content) # Set mtime to simulate age target = time.time() - age_seconds os.utime(p, (target, target)) return p def test_returns_pair_when_both_files_present_and_quiescent(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") txt_p = self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0") self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") def test_pairs_with_ach_underscore_ascii_naming(self): """BW ACH writes M529LK44.AB0 + M529LK44_AB0_ASCII.TXT. The watcher must pair these even though the .TXT filename doesn't carry a literal copy of the binary's name.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "N844L20G.630H", age_seconds=120, content=b"binary") self._make(tmp_p, "N844L20G_630H_ASCII.TXT", age_seconds=100, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][0]), "N844L20G.630H") self.assertEqual(os.path.basename(pending[0][1]), "N844L20G_630H_ASCII.TXT") def test_pairs_with_ach_underscore_ascii_naming_for_waveform(self): """Same as above but for new-firmware waveform events (extension ends in W).""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "I145L64P.GD0W", age_seconds=120, content=b"binary") self._make(tmp_p, "I145L64P_GD0W_ASCII.TXT", age_seconds=100, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][1]), "I145L64P_GD0W_ASCII.TXT") def test_pairing_prefers_ach_naming_when_both_exist(self): """If a folder has BOTH conventions (operator manually exported AND ACH also auto-exported), ACH wins because that's the canonical name in modern BW deployments.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") # Both partner files present self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual") self._make(tmp_p, "M529LK44_AB0_ASCII.TXT", age_seconds=100, content=b"ach") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][1]), "M529LK44_AB0_ASCII.TXT") def test_pairing_falls_back_to_dot_txt_when_ach_absent(self): """If only the manual-export filename exists, the legacy convention still works (preserves codec-agent test fixtures).""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"manual") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT") def test_skips_if_already_forwarded(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary") self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) digest = ef.sha256_of_file(str(bin_p)) state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_skips_if_too_fresh_to_be_quiescent(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK44.AB0", age_seconds=1, content=b"binary") self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=1, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_forwards_alone_after_grace_when_txt_missing(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"binary") # No .TXT created. state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) bin_path, txt_path = pending[0] self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0") self.assertIsNone(txt_path) def test_defers_when_txt_missing_and_within_grace(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK44.AB0", age_seconds=15, content=b"binary") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_skips_old_files_beyond_max_age_days(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) # 10 days old, but max_age_days=1 → should be excluded self._make(tmp_p, "M529LK44.AB0", age_seconds=10 * 86400, content=b"binary") self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=10 * 86400, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=1, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_ignores_mlg_and_other_non_event_files(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") self._make(tmp_p, "agent.log", age_seconds=120, content=b"log") self._make(tmp_p, "config.ini", age_seconds=120, content=b"cfg") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_max_per_pass_caps_returned_count(self): """When max_per_pass is set, return at most that many pairs.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) # Create 5 distinct event binaries with paired .TXTs for i, name in enumerate( ["M529LK01.AB0", "M529LK02.AB0", "M529LK03.AB0", "M529LK04.AB0", "M529LK05.AB0"], ): self._make(tmp_p, name, age_seconds=120 + i, content=("bin-" + str(i)).encode()) self._make(tmp_p, name + ".TXT", age_seconds=110 + i, content=b"report") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, max_per_pass=2, ) self.assertEqual(len(pending), 2) def test_max_per_pass_zero_means_unlimited(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) for i in range(4): self._make(tmp_p, "M529LK0{}.AB0".format(i), age_seconds=120 + i, content=("bin-" + str(i)).encode()) state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, max_per_pass=0, ) self.assertEqual(len(pending), 4) def test_max_per_pass_returns_oldest_first(self): """Backfill should advance chronologically — oldest qualifying files first. This way successive scans always make progress instead of getting stuck re-considering the same N newest files.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) # ages: 200s (oldest), 150s, 100s, 50s (skipped — within grace) ages = [200, 150, 100, 50] for i, age in enumerate(ages): self._make(tmp_p, "M529LK0{}.AB0".format(i), age_seconds=age, content=("c" + str(i)).encode()) self._make(tmp_p, "M529LK0{}.AB0.TXT".format(i), age_seconds=age - 10, content=b"r") state = ef.ForwardState(str(tmp_p / "fwd.json")) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, max_per_pass=2, ) # Oldest two should be M529LK00 (200s) and M529LK01 (150s) names = [os.path.basename(p[0]) for p in pending] self.assertEqual(names, ["M529LK00.AB0", "M529LK01.AB0"]) # ── Seed-state mode ────────────────────────────────────────────────────────── class TestSeedStateFromFolder(unittest.TestCase): def _make(self, dir_path: Path, name: str, age_seconds: float = 100, content: bytes = b"x") -> Path: p = dir_path / name p.write_bytes(content) target = time.time() - age_seconds os.utime(p, (target, target)) return p def test_seeds_every_in_window_event_without_posting(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) for i in range(3): self._make(tmp_p, "M529LK0{}.AB0".format(i), age_seconds=120 + i, content=("e" + str(i)).encode()) # Plus a non-event file we should ignore self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg") state = ef.ForwardState(str(tmp_p / "seed.json")) counts = ef.seed_state_from_folder( str(tmp_p), state, max_age_days=30, ) self.assertEqual(counts["scanned"], 3) self.assertEqual(counts["seeded"], 3) self.assertEqual(counts["already_known"], 0) self.assertEqual(state.count(), 3) def test_seed_skips_files_beyond_max_age_days(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"new") self._make(tmp_p, "M529LK02.AB0", age_seconds=10 * 86400, content=b"in-window") # 10d < 30d cutoff self._make(tmp_p, "M529LK03.AB0", age_seconds=400 * 86400, content=b"way-old") # 400d > 30d cutoff state = ef.ForwardState(str(tmp_p / "seed.json")) counts = ef.seed_state_from_folder( str(tmp_p), state, max_age_days=30, ) self.assertEqual(counts["seeded"], 2) self.assertEqual(counts["skipped_too_old"], 1) def test_seeded_files_are_then_skipped_by_normal_scan(self): """End-to-end: seed once, then a normal scan should produce zero pending events for the seeded files.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") self._make(tmp_p, "M529LK01.AB0.TXT", age_seconds=110, content=b"r") self._make(tmp_p, "M529LK02.AB0", age_seconds=120, content=b"y") self._make(tmp_p, "M529LK02.AB0.TXT", age_seconds=110, content=b"r") state = ef.ForwardState(str(tmp_p / "seed.json")) ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) pending = ef.find_pending_events( str(tmp_p), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0, "seed should have marked everything already-forwarded") def test_seed_is_idempotent(self): """Re-running seed twice doesn't duplicate entries or POST anything.""" with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x") state = ef.ForwardState(str(tmp_p / "seed.json")) counts1 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) counts2 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30) self.assertEqual(counts1["seeded"], 1) self.assertEqual(counts2["seeded"], 0) self.assertEqual(counts2["already_known"], 1) self.assertEqual(state.count(), 1) # ── Multipart encoder ──────────────────────────────────────────────────────── class TestMultipartEncoder(unittest.TestCase): def test_encodes_two_parts_with_proper_boundary(self): body, content_type = ef._encode_multipart([ ("files", "a.bin", "application/octet-stream", b"\x01\x02"), ("files", "a.txt", "text/plain", b"hello"), ]) # Content-Type header carries the boundary self.assertTrue(content_type.startswith("multipart/form-data; boundary=")) boundary = content_type.split("boundary=", 1)[1] self.assertIn(boundary.encode("ascii"), body) # Body shape text = body.decode("latin-1") self.assertIn(f'name="files"; filename="a.bin"', text) self.assertIn(f'name="files"; filename="a.txt"', text) self.assertIn("Content-Type: application/octet-stream", text) self.assertIn("Content-Type: text/plain", text) # Trailing close boundary present self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--")) # ── End-to-end forward_event_pair against a fake server ────────────────────── class _FakeImportHandler(http.server.BaseHTTPRequestHandler): """Mimics seismo-relay's POST /db/import/blastware_file response.""" received = [] # class-level capture for test inspection def do_POST(self): length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) ctype = self.headers.get("Content-Type", "") # Crude multipart split — enough to count parts and grab filenames. parts = body.split(b"--" + ctype.split("boundary=")[-1].encode()) # Locate filename= occurrences — that's our part count filenames = [] for p in parts: for line in p.split(b"\r\n"): if b'filename="' in line: fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0] filenames.append(fn.decode("latin-1")) self.__class__.received.append({ "path": self.path, "ctype": ctype, "filenames": filenames, }) # Build a faux SFM response: success for the first .bin-style filename results = [] binary_fn = next( (fn for fn in filenames if not fn.lower().endswith(".txt")), None, ) if binary_fn: results.append({ "filename": binary_fn, "status": "ok", "stored_filename": binary_fn, "filesize": len(body), "sha256": "00" * 32, "report_attached": any(fn.lower().endswith(".txt") for fn in filenames), "inserted": 1, "skipped": 0, }) payload = json.dumps({"count": len(results), "results": results}).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def log_message(self, *_a, **_kw): # silence the test runner pass def _start_fake_server() -> tuple[http.server.HTTPServer, str]: """Start an HTTPServer on a random local port; return (server, base_url).""" server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler) threading.Thread(target=server.serve_forever, daemon=True).start() host, port = server.server_address return server, f"http://{host}:{port}" class TestForwardEventPair(unittest.TestCase): def setUp(self): _FakeImportHandler.received = [] self.server, self.base_url = _start_fake_server() def tearDown(self): self.server.shutdown() self.server.server_close() def test_post_with_paired_report(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) bin_p = tmp_p / "M529LK44.AB0" txt_p = tmp_p / "M529LK44.AB0.TXT" bin_p.write_bytes(b"\x10\x20\x30 binary") txt_p.write_bytes(b'"Serial Number : BE11529"\n') result = ef.forward_event_pair( self.base_url, str(bin_p), str(txt_p), timeout=5.0, ) self.assertEqual(result["status"], "ok") self.assertEqual(result["filename"], "M529LK44.AB0") self.assertTrue(result["report_attached"]) self.assertEqual(len(_FakeImportHandler.received), 1) req = _FakeImportHandler.received[0] self.assertEqual(req["path"], "/db/import/blastware_file") self.assertIn("M529LK44.AB0", req["filenames"]) self.assertIn("M529LK44.AB0.TXT", req["filenames"]) def test_post_without_report(self): with tempfile.TemporaryDirectory() as tmp: bin_p = Path(tmp) / "M529LK44.AB0" bin_p.write_bytes(b"binary only") result = ef.forward_event_pair( self.base_url, str(bin_p), None, timeout=5.0, ) self.assertEqual(result["status"], "ok") self.assertFalse(result["report_attached"]) req = _FakeImportHandler.received[0] self.assertEqual(req["filenames"], ["M529LK44.AB0"]) def test_post_propagates_serial_hint_in_query(self): with tempfile.TemporaryDirectory() as tmp: bin_p = Path(tmp) / "M529LK44.AB0" bin_p.write_bytes(b"x") ef.forward_event_pair( self.base_url, str(bin_p), None, serial_hint="BE11529", timeout=5.0, ) req = _FakeImportHandler.received[0] self.assertIn("serial=BE11529", req["path"]) if __name__ == "__main__": unittest.main()