815c643fb2
Two safety nets for first-deploy on Blastware ACH machines that
have accumulated tens or hundreds of thousands of historical events
in the watch folder.
1. SFM_MAX_FORWARDS_PER_PASS (default 500, 0=unlimited)
---------------------------------------------------
Cap on the number of events forwarded per scan tick. At the
60-second default interval that's ~30K events/hour throughput —
the SFM server gets a steady drip instead of one giant burst.
Scan now sorts by mtime ascending so backfill advances
chronologically (oldest first) and successive scans always
make progress instead of re-considering the same N newest files.
Wired into:
- event_forwarder.find_pending_events / forward_pending
- series3_watcher.run_watcher loop
- config-template.ini
- settings_dialog SFM Forward tab (new "Max Events Per Pass"
spinbox, validated in _on_save)
2. event_forwarder.py --seed-state CLI
-----------------------------------
One-shot mode that walks the watch folder, sha256s every in-window
event binary, and marks them all as already-forwarded WITHOUT
POSTing anything. Run before flipping SFM_FORWARD_ENABLED=true
to skip the historical backfill entirely — the watcher then only
forwards events that appear AFTER the seed.
Usage:
python event_forwarder.py --seed-state \
--watch "C:\Blastware 10\Event\autocall home" \
--state "C:\...\sfm_forwarded.json" \
[--max-age-days 365]
7 new unit tests:
- max_per_pass cap enforcement (=N, =0 unlimited, oldest-first
ordering)
- seed-state mode (in-window seeding, max-age skip,
end-to-end skip-after-seed, idempotent re-runs)
README adds a "First-time deployment" section walking through both
options. Bumps to v1.5.2.
523 lines
22 KiB
Python
523 lines
22 KiB
Python
"""
|
|
test_event_forwarder.py — unit tests for the SFM event forwarder.
|
|
|
|
Covers:
|
|
- is_event_binary() filename matching (positive + negative cases)
|
|
- ForwardState load/save round-trip + idempotency check
|
|
- find_pending_events() pairing + quiescence + grace-period logic
|
|
- _encode_multipart() byte-level shape (boundary + headers)
|
|
- forward_event_pair() end-to-end against a tiny stdlib HTTP server
|
|
that mimics the SFM /db/import/blastware_file endpoint
|
|
|
|
Stdlib only — runs with `python -m pytest test_event_forwarder.py`
|
|
on Python 3.8+ (the watcher's compat target).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import http.server
|
|
import json
|
|
import os
|
|
import socket
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
import event_forwarder as ef
|
|
|
|
|
|
# ── is_event_binary() ────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestIsEventBinary(unittest.TestCase):
|
|
|
|
def test_recognises_typical_blastware_filenames(self):
|
|
for name in [
|
|
"M529LK44.AB0",
|
|
"M529LKVQ.6S0",
|
|
"M529LKVQ.6S0W",
|
|
"S353L4H0.3M0W",
|
|
"P036L318.C80H",
|
|
"M529LIY6.N00",
|
|
]:
|
|
self.assertTrue(ef.is_event_binary(name), name)
|
|
|
|
def test_rejects_lowercase_extensions_we_explicitly_exclude(self):
|
|
for name in ["BE11529.MLG", "M529LK44.AB0.TXT", "agent.log",
|
|
"config.ini", "foo.bak", "bar.tmp",
|
|
"something.h5", "noise.json"]:
|
|
self.assertFalse(ef.is_event_binary(name), name)
|
|
|
|
def test_rejects_non_matching_filenames(self):
|
|
for name in ["", "no_extension",
|
|
"TooShort.AB0", # stem must be 8 chars
|
|
"TOOLONG12345.AB0", # stem must be 8 chars
|
|
"M529LK44.A", # ext too short
|
|
"M529LK44.ABCDE", # ext too long
|
|
"M52.AB0", # stem too short
|
|
"1234ABCD.AB0"]: # first char must be letter
|
|
self.assertFalse(ef.is_event_binary(name), name)
|
|
|
|
|
|
# ── ForwardState ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestForwardState(unittest.TestCase):
|
|
|
|
def test_round_trip_persists_marked_entries(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
path = os.path.join(tmp, "fwd.json")
|
|
s = ef.ForwardState(path)
|
|
self.assertFalse(s.is_forwarded("abc123"))
|
|
s.mark_forwarded("abc123", "M529LK44.AB0", 4400)
|
|
self.assertTrue(s.is_forwarded("abc123"))
|
|
|
|
# Re-load from disk
|
|
s2 = ef.ForwardState(path)
|
|
self.assertTrue(s2.is_forwarded("abc123"))
|
|
self.assertEqual(s2.count(), 1)
|
|
|
|
def test_corrupt_state_file_starts_fresh(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
path = os.path.join(tmp, "fwd.json")
|
|
with open(path, "w") as f:
|
|
f.write("not valid json {{{")
|
|
s = ef.ForwardState(path)
|
|
self.assertEqual(s.count(), 0)
|
|
|
|
def test_version_mismatch_starts_fresh(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
path = os.path.join(tmp, "fwd.json")
|
|
with open(path, "w") as f:
|
|
json.dump({"version": 999, "forwarded": {"x": {}}}, f)
|
|
s = ef.ForwardState(path)
|
|
self.assertEqual(s.count(), 0)
|
|
|
|
|
|
# ── find_pending_events() ────────────────────────────────────────────────────
|
|
|
|
|
|
class TestFindPendingEvents(unittest.TestCase):
|
|
|
|
def _make(self, dir_path: Path, name: str, age_seconds: float = 100,
|
|
content: bytes = b"x") -> Path:
|
|
"""Create a file with controlled mtime."""
|
|
p = dir_path / name
|
|
p.write_bytes(content)
|
|
# Set mtime to simulate age
|
|
target = time.time() - age_seconds
|
|
os.utime(p, (target, target))
|
|
return p
|
|
|
|
def test_returns_pair_when_both_files_present_and_quiescent(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
|
|
txt_p = self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 1)
|
|
self.assertEqual(os.path.basename(pending[0][0]), "M529LK44.AB0")
|
|
self.assertEqual(os.path.basename(pending[0][1]), "M529LK44.AB0.TXT")
|
|
|
|
def test_skips_if_already_forwarded(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
bin_p = self._make(tmp_p, "M529LK44.AB0", age_seconds=120, content=b"binary")
|
|
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=100, content=b"report")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
digest = ef.sha256_of_file(str(bin_p))
|
|
state.mark_forwarded(digest, "M529LK44.AB0", len(b"binary"))
|
|
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0)
|
|
|
|
def test_skips_if_too_fresh_to_be_quiescent(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK44.AB0", age_seconds=1, content=b"binary")
|
|
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=1, content=b"report")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0)
|
|
|
|
def test_forwards_alone_after_grace_when_txt_missing(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK44.AB0", age_seconds=200, content=b"binary")
|
|
# No .TXT created.
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 1)
|
|
bin_path, txt_path = pending[0]
|
|
self.assertEqual(os.path.basename(bin_path), "M529LK44.AB0")
|
|
self.assertIsNone(txt_path)
|
|
|
|
def test_defers_when_txt_missing_and_within_grace(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK44.AB0", age_seconds=15, content=b"binary")
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0)
|
|
|
|
def test_skips_old_files_beyond_max_age_days(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
# 10 days old, but max_age_days=1 → should be excluded
|
|
self._make(tmp_p, "M529LK44.AB0", age_seconds=10 * 86400,
|
|
content=b"binary")
|
|
self._make(tmp_p, "M529LK44.AB0.TXT", age_seconds=10 * 86400,
|
|
content=b"report")
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=1,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0)
|
|
|
|
def test_ignores_mlg_and_other_non_event_files(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg")
|
|
self._make(tmp_p, "agent.log", age_seconds=120, content=b"log")
|
|
self._make(tmp_p, "config.ini", age_seconds=120, content=b"cfg")
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0)
|
|
|
|
def test_max_per_pass_caps_returned_count(self):
|
|
"""When max_per_pass is set, return at most that many pairs."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
# Create 5 distinct event binaries with paired .TXTs
|
|
for i, name in enumerate(
|
|
["M529LK01.AB0", "M529LK02.AB0", "M529LK03.AB0",
|
|
"M529LK04.AB0", "M529LK05.AB0"],
|
|
):
|
|
self._make(tmp_p, name, age_seconds=120 + i,
|
|
content=("bin-" + str(i)).encode())
|
|
self._make(tmp_p, name + ".TXT", age_seconds=110 + i,
|
|
content=b"report")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
max_per_pass=2,
|
|
)
|
|
self.assertEqual(len(pending), 2)
|
|
|
|
def test_max_per_pass_zero_means_unlimited(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
for i in range(4):
|
|
self._make(tmp_p, "M529LK0{}.AB0".format(i),
|
|
age_seconds=120 + i,
|
|
content=("bin-" + str(i)).encode())
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30,
|
|
quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
max_per_pass=0,
|
|
)
|
|
self.assertEqual(len(pending), 4)
|
|
|
|
def test_max_per_pass_returns_oldest_first(self):
|
|
"""Backfill should advance chronologically — oldest qualifying
|
|
files first. This way successive scans always make progress
|
|
instead of getting stuck re-considering the same N newest files."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
# ages: 200s (oldest), 150s, 100s, 50s (skipped — within grace)
|
|
ages = [200, 150, 100, 50]
|
|
for i, age in enumerate(ages):
|
|
self._make(tmp_p, "M529LK0{}.AB0".format(i),
|
|
age_seconds=age, content=("c" + str(i)).encode())
|
|
self._make(tmp_p, "M529LK0{}.AB0.TXT".format(i),
|
|
age_seconds=age - 10, content=b"r")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "fwd.json"))
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30, quiescence_seconds=5,
|
|
missing_report_grace_seconds=60, max_per_pass=2,
|
|
)
|
|
# Oldest two should be M529LK00 (200s) and M529LK01 (150s)
|
|
names = [os.path.basename(p[0]) for p in pending]
|
|
self.assertEqual(names, ["M529LK00.AB0", "M529LK01.AB0"])
|
|
|
|
|
|
# ── Seed-state mode ──────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestSeedStateFromFolder(unittest.TestCase):
|
|
|
|
def _make(self, dir_path: Path, name: str, age_seconds: float = 100,
|
|
content: bytes = b"x") -> Path:
|
|
p = dir_path / name
|
|
p.write_bytes(content)
|
|
target = time.time() - age_seconds
|
|
os.utime(p, (target, target))
|
|
return p
|
|
|
|
def test_seeds_every_in_window_event_without_posting(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
for i in range(3):
|
|
self._make(tmp_p, "M529LK0{}.AB0".format(i),
|
|
age_seconds=120 + i, content=("e" + str(i)).encode())
|
|
# Plus a non-event file we should ignore
|
|
self._make(tmp_p, "BE11529.MLG", age_seconds=120, content=b"mlg")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "seed.json"))
|
|
counts = ef.seed_state_from_folder(
|
|
str(tmp_p), state, max_age_days=30,
|
|
)
|
|
self.assertEqual(counts["scanned"], 3)
|
|
self.assertEqual(counts["seeded"], 3)
|
|
self.assertEqual(counts["already_known"], 0)
|
|
self.assertEqual(state.count(), 3)
|
|
|
|
def test_seed_skips_files_beyond_max_age_days(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"new")
|
|
self._make(tmp_p, "M529LK02.AB0", age_seconds=10 * 86400,
|
|
content=b"in-window") # 10d < 30d cutoff
|
|
self._make(tmp_p, "M529LK03.AB0", age_seconds=400 * 86400,
|
|
content=b"way-old") # 400d > 30d cutoff
|
|
|
|
state = ef.ForwardState(str(tmp_p / "seed.json"))
|
|
counts = ef.seed_state_from_folder(
|
|
str(tmp_p), state, max_age_days=30,
|
|
)
|
|
self.assertEqual(counts["seeded"], 2)
|
|
self.assertEqual(counts["skipped_too_old"], 1)
|
|
|
|
def test_seeded_files_are_then_skipped_by_normal_scan(self):
|
|
"""End-to-end: seed once, then a normal scan should produce
|
|
zero pending events for the seeded files."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x")
|
|
self._make(tmp_p, "M529LK01.AB0.TXT", age_seconds=110, content=b"r")
|
|
self._make(tmp_p, "M529LK02.AB0", age_seconds=120, content=b"y")
|
|
self._make(tmp_p, "M529LK02.AB0.TXT", age_seconds=110, content=b"r")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "seed.json"))
|
|
ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
|
|
|
|
pending = ef.find_pending_events(
|
|
str(tmp_p), state,
|
|
max_age_days=30, quiescence_seconds=5,
|
|
missing_report_grace_seconds=60,
|
|
)
|
|
self.assertEqual(len(pending), 0,
|
|
"seed should have marked everything already-forwarded")
|
|
|
|
def test_seed_is_idempotent(self):
|
|
"""Re-running seed twice doesn't duplicate entries or POST anything."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
self._make(tmp_p, "M529LK01.AB0", age_seconds=120, content=b"x")
|
|
|
|
state = ef.ForwardState(str(tmp_p / "seed.json"))
|
|
counts1 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
|
|
counts2 = ef.seed_state_from_folder(str(tmp_p), state, max_age_days=30)
|
|
self.assertEqual(counts1["seeded"], 1)
|
|
self.assertEqual(counts2["seeded"], 0)
|
|
self.assertEqual(counts2["already_known"], 1)
|
|
self.assertEqual(state.count(), 1)
|
|
|
|
|
|
# ── Multipart encoder ────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestMultipartEncoder(unittest.TestCase):
|
|
|
|
def test_encodes_two_parts_with_proper_boundary(self):
|
|
body, content_type = ef._encode_multipart([
|
|
("files", "a.bin", "application/octet-stream", b"\x01\x02"),
|
|
("files", "a.txt", "text/plain", b"hello"),
|
|
])
|
|
# Content-Type header carries the boundary
|
|
self.assertTrue(content_type.startswith("multipart/form-data; boundary="))
|
|
boundary = content_type.split("boundary=", 1)[1]
|
|
self.assertIn(boundary.encode("ascii"), body)
|
|
|
|
# Body shape
|
|
text = body.decode("latin-1")
|
|
self.assertIn(f'name="files"; filename="a.bin"', text)
|
|
self.assertIn(f'name="files"; filename="a.txt"', text)
|
|
self.assertIn("Content-Type: application/octet-stream", text)
|
|
self.assertIn("Content-Type: text/plain", text)
|
|
# Trailing close boundary present
|
|
self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--"))
|
|
|
|
|
|
# ── End-to-end forward_event_pair against a fake server ──────────────────────
|
|
|
|
|
|
class _FakeImportHandler(http.server.BaseHTTPRequestHandler):
|
|
"""Mimics seismo-relay's POST /db/import/blastware_file response."""
|
|
received = [] # class-level capture for test inspection
|
|
|
|
def do_POST(self):
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
body = self.rfile.read(length)
|
|
ctype = self.headers.get("Content-Type", "")
|
|
|
|
# Crude multipart split — enough to count parts and grab filenames.
|
|
parts = body.split(b"--" + ctype.split("boundary=")[-1].encode())
|
|
# Locate filename= occurrences — that's our part count
|
|
filenames = []
|
|
for p in parts:
|
|
for line in p.split(b"\r\n"):
|
|
if b'filename="' in line:
|
|
fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0]
|
|
filenames.append(fn.decode("latin-1"))
|
|
|
|
self.__class__.received.append({
|
|
"path": self.path,
|
|
"ctype": ctype,
|
|
"filenames": filenames,
|
|
})
|
|
|
|
# Build a faux SFM response: success for the first .bin-style filename
|
|
results = []
|
|
binary_fn = next(
|
|
(fn for fn in filenames if not fn.lower().endswith(".txt")),
|
|
None,
|
|
)
|
|
if binary_fn:
|
|
results.append({
|
|
"filename": binary_fn,
|
|
"status": "ok",
|
|
"stored_filename": binary_fn,
|
|
"filesize": len(body),
|
|
"sha256": "00" * 32,
|
|
"report_attached": any(fn.lower().endswith(".txt") for fn in filenames),
|
|
"inserted": 1,
|
|
"skipped": 0,
|
|
})
|
|
|
|
payload = json.dumps({"count": len(results), "results": results}).encode()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(payload)))
|
|
self.end_headers()
|
|
self.wfile.write(payload)
|
|
|
|
def log_message(self, *_a, **_kw): # silence the test runner
|
|
pass
|
|
|
|
|
|
def _start_fake_server() -> tuple[http.server.HTTPServer, str]:
|
|
"""Start an HTTPServer on a random local port; return (server, base_url)."""
|
|
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler)
|
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
|
host, port = server.server_address
|
|
return server, f"http://{host}:{port}"
|
|
|
|
|
|
class TestForwardEventPair(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
_FakeImportHandler.received = []
|
|
self.server, self.base_url = _start_fake_server()
|
|
|
|
def tearDown(self):
|
|
self.server.shutdown()
|
|
self.server.server_close()
|
|
|
|
def test_post_with_paired_report(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_p = Path(tmp)
|
|
bin_p = tmp_p / "M529LK44.AB0"
|
|
txt_p = tmp_p / "M529LK44.AB0.TXT"
|
|
bin_p.write_bytes(b"\x10\x20\x30 binary")
|
|
txt_p.write_bytes(b'"Serial Number : BE11529"\n')
|
|
|
|
result = ef.forward_event_pair(
|
|
self.base_url, str(bin_p), str(txt_p), timeout=5.0,
|
|
)
|
|
self.assertEqual(result["status"], "ok")
|
|
self.assertEqual(result["filename"], "M529LK44.AB0")
|
|
self.assertTrue(result["report_attached"])
|
|
|
|
self.assertEqual(len(_FakeImportHandler.received), 1)
|
|
req = _FakeImportHandler.received[0]
|
|
self.assertEqual(req["path"], "/db/import/blastware_file")
|
|
self.assertIn("M529LK44.AB0", req["filenames"])
|
|
self.assertIn("M529LK44.AB0.TXT", req["filenames"])
|
|
|
|
def test_post_without_report(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
bin_p = Path(tmp) / "M529LK44.AB0"
|
|
bin_p.write_bytes(b"binary only")
|
|
|
|
result = ef.forward_event_pair(
|
|
self.base_url, str(bin_p), None, timeout=5.0,
|
|
)
|
|
self.assertEqual(result["status"], "ok")
|
|
self.assertFalse(result["report_attached"])
|
|
req = _FakeImportHandler.received[0]
|
|
self.assertEqual(req["filenames"], ["M529LK44.AB0"])
|
|
|
|
def test_post_propagates_serial_hint_in_query(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
bin_p = Path(tmp) / "M529LK44.AB0"
|
|
bin_p.write_bytes(b"x")
|
|
ef.forward_event_pair(
|
|
self.base_url, str(bin_p), None,
|
|
serial_hint="BE11529", timeout=5.0,
|
|
)
|
|
req = _FakeImportHandler.received[0]
|
|
self.assertIn("serial=BE11529", req["path"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|