""" test_event_forwarder.py — unit tests for Thor Watcher's SFM event forwarder. Covers: - is_event_binary() filename matching (positive + negative cases) - parse_event_filename() / serial_from_filename() - idf_report_path() — the TXT/ subfolder convention - ForwardState load/save round-trip + idempotency check - find_pending_events() against the THORDATA/// tree, plus quiescence + grace-period + re-pair logic - _encode_multipart() byte-level shape (boundary + headers) - forward_event_pair() end-to-end against a tiny stdlib HTTP server that mimics seismo-relay's POST /db/import/idf_file endpoint - seed_state_from_folder() walks the tree without POSTing Stdlib only — runs with `python -m pytest test_event_forwarder.py` on Python 3.8+ (the watcher's compat target). """ from __future__ import annotations import http.server import json import os import tempfile import threading import time import unittest from pathlib import Path import event_forwarder as ef # ── Helpers ─────────────────────────────────────────────────────────────────── def _make_thordata(root: Path, project: str, unit: str) -> Path: """Create a THORDATA/// folder pair; return unit_dir.""" unit_dir = root / project / unit unit_dir.mkdir(parents=True, exist_ok=True) return unit_dir def _touch_with_age(p: Path, age_seconds: float, content: bytes = b"x") -> Path: """Create a file with controlled mtime.""" p.write_bytes(content) target = time.time() - age_seconds os.utime(p, (target, target)) return p def _make_event(unit_dir: Path, name: str, age_seconds: float = 100, content: bytes = b"x") -> Path: return _touch_with_age(unit_dir / name, age_seconds, content) def _make_txt(unit_dir: Path, base_name: str, age_seconds: float = 100, content: bytes = b"r") -> Path: txt_dir = unit_dir / "TXT" txt_dir.mkdir(exist_ok=True) return _touch_with_age(txt_dir / ef.idf_report_name(base_name), age_seconds, content) # ── is_event_binary() ──────────────────────────────────────────────────────── class TestIsEventBinary(unittest.TestCase): def test_recognises_typical_thor_filenames(self): for name in [ "UM11719_20231219163444.IDFH", "UM11719_20231219162723.IDFW", "BE9439_20200713124251.IDFH", "UM13981_20220808082418.IDFH", # case-insensitive "um11719_20231219163444.idfh", ]: self.assertTrue(ef.is_event_binary(name), name) def test_rejects_non_event_extensions(self): for name in [ "UM11719_20231219163436.MLG", # monitor log "UM11719_20231219163444.IDFH.txt", # report sidecar "UM11719_20231219164135.IDFW.CDB", # cache database variant "UM11719_20231219164135.IDFH.CDB", "agent.log", "config.json", "foo.bak", "bar.tmp", "UM11719_20231219163444.csv", "UM11719_20231219163444.pdf", "UM11719_20231219163444.html", "UM11719_20231219163444.xml", ]: self.assertFalse(ef.is_event_binary(name), name) def test_rejects_malformed_filenames(self): for name in [ "", "no_extension", "UM_20231219163444.IDFH", # missing serial digits "1234_20231219163444.IDFH", # serial must start with letters "UM11719_2023121916.IDFH", # short timestamp "UM11719_20231219163444.IDFX", # wrong kind "UM11719-20231219163444.IDFH", # wrong separator ]: self.assertFalse(ef.is_event_binary(name), name) def test_parse_event_filename(self): from datetime import datetime parsed = ef.parse_event_filename("UM11719_20231219163444.IDFW") self.assertIsNotNone(parsed) serial, ts, kind = parsed self.assertEqual(serial, "UM11719") self.assertEqual(ts, datetime(2023, 12, 19, 16, 34, 44)) self.assertEqual(kind, "IDFW") def test_serial_from_filename(self): self.assertEqual(ef.serial_from_filename("UM11719_20231219163444.IDFH"), "UM11719") self.assertEqual(ef.serial_from_filename("BE9439_20200713124251.IDFH"), "BE9439") self.assertIsNone(ef.serial_from_filename("not_an_event.bin")) def test_idf_report_path_uses_txt_subfolder(self): binary = "/foo/THORDATA/Project A/UM11719/UM11719_20231219163444.IDFW" self.assertEqual( ef.idf_report_path(binary), os.path.join("/foo/THORDATA/Project A/UM11719", "TXT", "UM11719_20231219163444.IDFW.txt"), ) def test_is_histogram_event(self): self.assertTrue(ef.is_histogram_event("UM11719_20231219163444.IDFH")) self.assertTrue(ef.is_histogram_event("um11719_20231219163444.idfh")) self.assertFalse(ef.is_histogram_event("UM11719_20231219162723.IDFW")) # ── ForwardState ───────────────────────────────────────────────────────────── class TestForwardState(unittest.TestCase): def test_round_trip_persists_marked_entries(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") s = ef.ForwardState(path) self.assertFalse(s.is_forwarded("abc123")) s.mark_forwarded("abc123", "UM11719_20231219163444.IDFW", 8800) self.assertTrue(s.is_forwarded("abc123")) # Re-load from disk s2 = ef.ForwardState(path) self.assertTrue(s2.is_forwarded("abc123")) self.assertEqual(s2.count(), 1) def test_corrupt_state_file_starts_fresh(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") with open(path, "w") as f: f.write("not valid json {{{") s = ef.ForwardState(path) self.assertEqual(s.count(), 0) def test_version_mismatch_starts_fresh(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") with open(path, "w") as f: json.dump({"version": 999, "forwarded": {"x": {}}}, f) s = ef.ForwardState(path) self.assertEqual(s.count(), 0) def test_legacy_entries_default_to_had_report_true(self): with tempfile.TemporaryDirectory() as tmp: path = os.path.join(tmp, "fwd.json") with open(path, "w") as f: json.dump({ "version": 1, "forwarded": { "abc123": { "filename": "UM11719_20231219163444.IDFW", "size": 123, "forwarded_at": "2025-01-01T00:00:00Z", # No had_report field — legacy entry } } }, f) state = ef.ForwardState(path) self.assertIs(state.status("abc123"), True) def test_status_returns_none_for_unknown_sha(self): with tempfile.TemporaryDirectory() as tmp: state = ef.ForwardState(str(Path(tmp) / "fwd.json")) self.assertIs(state.status("never-seen"), None) def test_mark_with_had_report_false_then_promote(self): with tempfile.TemporaryDirectory() as tmp: state = ef.ForwardState(str(Path(tmp) / "fwd.json")) state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100, had_report=False) self.assertIs(state.status("xyz"), False) state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100, had_report=True) self.assertIs(state.status("xyz"), True) # ── find_pending_events() ──────────────────────────────────────────────────── class TestFindPendingEvents(unittest.TestCase): def test_returns_pair_when_both_files_present_and_quiescent(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"binary") _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"report") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][0]), "UM11719_20231219163444.IDFW") self.assertEqual(os.path.basename(pending[0][1]), "UM11719_20231219163444.IDFW.txt") def test_idfh_and_idfw_are_separate_events(self): """A single timestamp produces both .IDFH and .IDFW — they forward as two independent events with their own state entries.""" with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"histogram") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"waveform") _make_txt(unit_dir, "UM11719_20231219163444.IDFH", age_seconds=100, content=b"hreport") _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"wreport") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 2) names = sorted(os.path.basename(p[0]) for p in pending) self.assertEqual(names, [ "UM11719_20231219163444.IDFH", "UM11719_20231219163444.IDFW", ]) def test_pairing_when_txt_is_in_unit_root_does_not_match(self): """Sidecars MUST live in the TXT/ subfolder. A stray .txt next to the binary is not the canonical location and should not be picked up as a sidecar.""" with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"bin") # .txt is in the unit dir, not unit/TXT/ _touch_with_age(unit_dir / "UM11719_20231219163444.IDFW.txt", age_seconds=100, content=b"misplaced") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) # Forward proceeds (grace period elapsed), but WITHOUT pairing self.assertEqual(len(pending), 1) self.assertIsNone(pending[0][1]) def test_skips_if_already_forwarded(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"binary") _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"report") state = ef.ForwardState(str(root / "fwd.json")) digest = ef.sha256_of_file(str(bin_p)) state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_skips_if_too_fresh_to_be_quiescent(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=1, content=b"binary") _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=1, content=b"report") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_forwards_alone_after_grace_when_txt_missing(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFH", age_seconds=200, content=b"binary") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) bin_path, txt_path = pending[0] self.assertEqual(os.path.basename(bin_path), "UM11719_20231219163444.IDFH") self.assertIsNone(txt_path) def test_re_pair_after_late_arriving_txt(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"binary") state = ef.ForwardState(str(root / "fwd.json")) digest = ef.sha256_of_file(str(bin_p)) state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary"), had_report=False) # First scan: TXT not present → still skipped. pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(pending, []) # TXT finally appears. _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"report") pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 1) self.assertEqual(os.path.basename(pending[0][1]), "UM11719_20231219163444.IDFW.txt") def test_defers_when_txt_missing_and_within_grace(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=15, content=b"binary") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_skips_old_files_beyond_max_age_days(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=10 * 86400, content=b"binary") _make_txt(unit_dir, "UM11719_20231219163444.IDFW", age_seconds=10 * 86400, content=b"report") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=1, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_ignores_mlg_and_other_non_event_files(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_dir = _make_thordata(root, "Project A", "UM11719") _make_event(unit_dir, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg") _make_event(unit_dir, "UM11719_20231219164135.IDFW.CDB", age_seconds=120, content=b"cache") _touch_with_age(unit_dir / "agent.log", age_seconds=120, content=b"log") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_walks_multiple_projects_and_units(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit_a = _make_thordata(root, "Project A", "UM11719") unit_b = _make_thordata(root, "Project B", "BE9439") _make_event(unit_a, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"a") _make_event(unit_b, "BE9439_20200713131747.IDFW", age_seconds=200, content=b"b") _make_txt(unit_a, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"ar") _make_txt(unit_b, "BE9439_20200713131747.IDFW", age_seconds=100, content=b"br") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=10000, # BE event is from 2020 quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 2) names = sorted(os.path.basename(p[0]) for p in pending) self.assertEqual(names, [ "BE9439_20200713131747.IDFW", "UM11719_20231219163444.IDFW", ]) def test_max_per_pass_caps_returned_count(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") for i in range(5): name = "UM11719_2023121916344{}.IDFW".format(i) _make_event(unit, name, age_seconds=120 + i, content=("bin-" + str(i)).encode()) _make_txt(unit, name, age_seconds=110 + i, content=b"report") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, max_per_pass=2, ) self.assertEqual(len(pending), 2) def test_max_per_pass_returns_oldest_first(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") ages = [200, 150, 100, 50] for i, age in enumerate(ages): name = "UM11719_2023121916344{}.IDFW".format(i) _make_event(unit, name, age_seconds=age, content=("c" + str(i)).encode()) _make_txt(unit, name, age_seconds=max(1, age - 10), content=b"r") state = ef.ForwardState(str(root / "fwd.json")) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, max_per_pass=2, ) names = [os.path.basename(p[0]) for p in pending] # Oldest two should be index 0 (200s) and 1 (150s) self.assertEqual(names, [ "UM11719_20231219163440.IDFW", "UM11719_20231219163441.IDFW", ]) # ── Seed-state mode ────────────────────────────────────────────────────────── class TestSeedStateFromFolder(unittest.TestCase): def test_seeds_every_in_window_event_without_posting(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") for i in range(3): _make_event(unit, "UM11719_2023121916344{}.IDFW".format(i), age_seconds=120 + i, content=("e" + str(i)).encode()) # Ignored _make_event(unit, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg") state = ef.ForwardState(str(root / "seed.json")) counts = ef.seed_state_from_folder(str(root), state, max_age_days=30) self.assertEqual(counts["scanned"], 3) self.assertEqual(counts["seeded"], 3) self.assertEqual(counts["already_known"], 0) self.assertEqual(state.count(), 3) def test_seeded_files_are_then_skipped_by_normal_scan(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") _make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x") _make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=110, content=b"r") _make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"y") _make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=110, content=b"r") state = ef.ForwardState(str(root / "seed.json")) ef.seed_state_from_folder(str(root), state, max_age_days=30) pending = ef.find_pending_events( str(root), state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, ) self.assertEqual(len(pending), 0) def test_seed_is_idempotent(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") _make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x") state = ef.ForwardState(str(root / "seed.json")) counts1 = ef.seed_state_from_folder(str(root), state, max_age_days=30) counts2 = ef.seed_state_from_folder(str(root), state, max_age_days=30) self.assertEqual(counts1["seeded"], 1) self.assertEqual(counts2["seeded"], 0) self.assertEqual(counts2["already_known"], 1) self.assertEqual(state.count(), 1) # ── Multipart encoder ──────────────────────────────────────────────────────── class TestMultipartEncoder(unittest.TestCase): def test_encodes_two_parts_with_proper_boundary(self): body, content_type = ef._encode_multipart([ ("files", "a.bin", "application/octet-stream", b"\x01\x02"), ("files", "a.txt", "text/plain", b"hello"), ]) self.assertTrue(content_type.startswith("multipart/form-data; boundary=")) boundary = content_type.split("boundary=", 1)[1] self.assertIn(boundary.encode("ascii"), body) text = body.decode("latin-1") self.assertIn('name="files"; filename="a.bin"', text) self.assertIn('name="files"; filename="a.txt"', text) self.assertIn("Content-Type: application/octet-stream", text) self.assertIn("Content-Type: text/plain", text) self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--")) # ── End-to-end forward_event_pair against a fake server ────────────────────── class _FakeImportHandler(http.server.BaseHTTPRequestHandler): """Mimics seismo-relay's POST /db/import/idf_file response.""" received = [] # class-level capture for test inspection def do_POST(self): length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) ctype = self.headers.get("Content-Type", "") parts = body.split(b"--" + ctype.split("boundary=")[-1].encode()) filenames = [] for p in parts: for line in p.split(b"\r\n"): if b'filename="' in line: fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0] filenames.append(fn.decode("latin-1")) self.__class__.received.append({ "path": self.path, "ctype": ctype, "filenames": filenames, }) results = [] binary_fn = next( (fn for fn in filenames if not fn.lower().endswith(".txt")), None, ) if binary_fn: results.append({ "filename": binary_fn, "status": "ok", "stored_filename": binary_fn, "filesize": len(body), "sha256": "00" * 32, "report_attached": any(fn.lower().endswith(".txt") for fn in filenames), "inserted": 1, "skipped": 0, }) payload = json.dumps({"count": len(results), "results": results}).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def log_message(self, *_a, **_kw): # silence the test runner pass def _start_fake_server(): server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler) threading.Thread(target=server.serve_forever, daemon=True).start() host, port = server.server_address return server, f"http://{host}:{port}" class TestForwardEventPair(unittest.TestCase): def setUp(self): _FakeImportHandler.received = [] self.server, self.base_url = _start_fake_server() def tearDown(self): self.server.shutdown() self.server.server_close() def test_post_with_paired_report(self): with tempfile.TemporaryDirectory() as tmp: tmp_p = Path(tmp) bin_p = tmp_p / "UM11719_20231219163444.IDFW" txt_p = tmp_p / "UM11719_20231219163444.IDFW.txt" bin_p.write_bytes(b"\x10\x20\x30 binary") txt_p.write_bytes(b'"SerialNumber : UM11719"\n') result = ef.forward_event_pair( self.base_url, str(bin_p), str(txt_p), timeout=5.0, ) self.assertEqual(result["status"], "ok") self.assertEqual(result["filename"], "UM11719_20231219163444.IDFW") self.assertTrue(result["report_attached"]) self.assertEqual(len(_FakeImportHandler.received), 1) req = _FakeImportHandler.received[0] # Path includes the serial-hint auto-extracted from the filename self.assertTrue(req["path"].startswith("/db/import/idf_file")) self.assertIn("serial=UM11719", req["path"]) self.assertIn("UM11719_20231219163444.IDFW", req["filenames"]) self.assertIn("UM11719_20231219163444.IDFW.txt", req["filenames"]) def test_post_without_report(self): with tempfile.TemporaryDirectory() as tmp: bin_p = Path(tmp) / "UM11719_20231219163444.IDFH" bin_p.write_bytes(b"binary only") result = ef.forward_event_pair( self.base_url, str(bin_p), None, timeout=5.0, ) self.assertEqual(result["status"], "ok") self.assertFalse(result["report_attached"]) req = _FakeImportHandler.received[0] self.assertEqual(req["filenames"], ["UM11719_20231219163444.IDFH"]) def test_explicit_serial_hint_overrides_auto(self): with tempfile.TemporaryDirectory() as tmp: bin_p = Path(tmp) / "UM11719_20231219163444.IDFW" bin_p.write_bytes(b"x") ef.forward_event_pair( self.base_url, str(bin_p), None, serial_hint="OVERRIDE99", timeout=5.0, ) req = _FakeImportHandler.received[0] self.assertIn("serial=OVERRIDE99", req["path"]) # ── forward_pending() smoke test ───────────────────────────────────────────── class TestForwardPending(unittest.TestCase): """End-to-end: tree → find → POST → state-update → no re-POST.""" def setUp(self): _FakeImportHandler.received = [] self.server, self.base_url = _start_fake_server() def tearDown(self): self.server.shutdown() self.server.server_close() def test_pass_then_re_pass_is_idempotent(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) unit = _make_thordata(root, "Project A", "UM11719") _make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"binary") _make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"report") _make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=200, content=b"histogram") _make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=100, content=b"hreport") state = ef.ForwardState(str(root / "fwd.json")) counts = ef.forward_pending( str(root), self.base_url, state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, timeout=5.0, ) self.assertEqual(counts["scanned"], 2) self.assertEqual(counts["forwarded"], 2) self.assertEqual(counts["errors"], 0) self.assertEqual(counts["with_report"], 2) self.assertEqual(state.count(), 2) self.assertEqual(len(_FakeImportHandler.received), 2) # Re-pass: nothing pending; no new POSTs. counts2 = ef.forward_pending( str(root), self.base_url, state, max_age_days=30, quiescence_seconds=5, missing_report_grace_seconds=60, timeout=5.0, ) self.assertEqual(counts2["scanned"], 0) self.assertEqual(counts2["forwarded"], 0) self.assertEqual(len(_FakeImportHandler.received), 2) if __name__ == "__main__": unittest.main()