65b3af90ae
When a binary is forwarded WITHOUT its paired _ASCII.TXT (because
the TXT wasn't quiescent within the grace period — BW slow to
write, AV scanning, etc.), the old behaviour was to permanently
mark the binary as "done" in the state file, even though the TXT
might land seconds later. Result: that event lived in SFM forever
with broken-codec peak values and no project info.
Fix: state entries now carry a had_report flag. Forwards without
a TXT set had_report=False. On subsequent scans, the watcher
treats had_report=False entries as re-pair candidates — they get
re-forwarded once the TXT appears, and the SFM server's upsert
path (in seismo-relay's insert_events IntegrityError handler)
refreshes the DB row with the report's authoritative values.
Three status states in ForwardState.status(sha256):
None — never forwarded. First-forward path.
True — forwarded successfully WITH report (or legacy entry
without the had_report field). Permanently done.
False — forwarded WITHOUT report. Re-pair if TXT now exists.
Backward compat: legacy state-file entries (no had_report key)
default to True so existing deployments don't unexpectedly
re-forward every entry on upgrade.
Tests cover:
- re-pair when TXT appears after a had_report=False forward
- had_report=True entries stay skipped permanently
- legacy entries (missing field) treated as fully forwarded
- state.status() returns None for unknown sha
- re-marking had_report=False then True promotes to fully-done
36 watcher tests pass (was 31, +5 new).
834 lines
31 KiB
Python
834 lines
31 KiB
Python
"""
|
||
event_forwarder.py — forward Blastware event files to an SFM server.
|
||
|
||
Watches the same Blastware ACH folder the heartbeat path watches.
|
||
For each event binary that hasn't been forwarded yet, pairs it with
|
||
its `<binary>.TXT` report (when available) and POSTs both to SFM's
|
||
`/db/import/blastware_file` endpoint as one multipart request.
|
||
|
||
The receiving SFM server (seismo-relay v0.16+) detects paired binaries
|
||
and reports by filename, parses the .TXT into structured fields
|
||
(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak
|
||
Displacement / sensor self-check / monitor log), and persists every
|
||
field into the SFM database for sortable / filterable monthly-summary
|
||
review.
|
||
|
||
Design notes
|
||
────────────
|
||
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
|
||
Multipart encoding is hand-rolled.
|
||
- **Idempotent across restarts.** Forwarded files are tracked by
|
||
sha256 in a JSON state file (`.forwarded.json` next to config.ini).
|
||
Re-scanning the watch folder doesn't re-POST anything.
|
||
- **Default-off.** Callers must enable via config
|
||
(`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x
|
||
deployments that auto-update to the new version stay non-forwarding
|
||
until an operator flips the switch.
|
||
- **Quiescence guard.** Files modified within the last few seconds
|
||
are skipped — Blastware ACH writes the .TXT after the binary, so
|
||
we wait until both look stable before forwarding.
|
||
- **Best-effort report pairing.** When the .TXT hasn't appeared yet
|
||
but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the
|
||
binary is forwarded alone (the SFM endpoint accepts that and just
|
||
skips the rich fields — we'd rather get the binary indexed than
|
||
block forever waiting for a TXT that never arrived).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
import urllib.error
|
||
import urllib.request
|
||
from datetime import datetime, timezone
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# Default tuning. All overridable via config.ini SFM_* keys.
|
||
DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds
|
||
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds
|
||
DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout
|
||
STATE_SCHEMA_VERSION = 1
|
||
|
||
|
||
# ── Filename matching ─────────────────────────────────────────────────────────
|
||
#
|
||
# Blastware's filename scheme (confirmed in seismo-relay docs):
|
||
# prefix_letter (B–Z) + 3-digit serial-tail + 4-char base36 timestamp stem
|
||
# + "." + 3-or-4-char extension.
|
||
# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00.
|
||
#
|
||
# We accept lowercase too because some filesystems lower-case names.
|
||
_EVENT_FILENAME_RE = re.compile(
|
||
r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$"
|
||
)
|
||
|
||
# Filenames we explicitly skip even if they happen to match the regex.
|
||
_NON_EVENT_EXTS = {
|
||
".mlg", # monitor-log files (separate heartbeat path)
|
||
".txt", # ASCII reports — handled via pairing, not as primary files
|
||
".log",
|
||
".ini",
|
||
".dat",
|
||
".bak",
|
||
".tmp",
|
||
".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence)
|
||
".h5",
|
||
".sfm.json",
|
||
".json",
|
||
}
|
||
|
||
|
||
def is_event_binary(path: str) -> bool:
|
||
"""Return True if `path`'s basename looks like a Blastware event binary."""
|
||
name = os.path.basename(path)
|
||
if not _EVENT_FILENAME_RE.match(name):
|
||
return False
|
||
ext = os.path.splitext(name)[1].lower()
|
||
if ext in _NON_EVENT_EXTS:
|
||
return False
|
||
return True
|
||
|
||
|
||
def ach_report_name(binary_name: str) -> str:
|
||
"""BW ACH report-naming convention.
|
||
|
||
Blastware's official Auto Call Home server writes per-event ASCII
|
||
reports as ``<stem>_<ext>_ASCII.TXT`` — the ``.`` between stem and
|
||
ext is replaced with ``_`` and ``_ASCII.TXT`` is appended.
|
||
|
||
Examples:
|
||
``M529LK44.AB0`` → ``M529LK44_AB0_ASCII.TXT``
|
||
``N844L20G.630H`` → ``N844L20G_630H_ASCII.TXT``
|
||
``H907L1R7.PG0H`` → ``H907L1R7_PG0H_ASCII.TXT``
|
||
|
||
For a filename without a dot (defensive — shouldn't happen for real
|
||
BW events) we still append ``_ASCII.TXT``.
|
||
"""
|
||
stem, dot, ext = binary_name.rpartition(".")
|
||
if not dot:
|
||
return binary_name + "_ASCII.TXT"
|
||
return stem + "_" + ext + "_ASCII.TXT"
|
||
|
||
|
||
def legacy_report_name(binary_name: str) -> str:
|
||
"""Manual-export convention: ``<binary>.TXT`` (e.g. when an operator
|
||
saves an event report to text directly from BW's UI rather than
|
||
letting ACH auto-export it). Kept as a fallback so the codec-agent
|
||
test fixtures (``decode-re/5-8-26/event-c/M529LK44.AB0.TXT``) still
|
||
pair correctly."""
|
||
return binary_name + ".TXT"
|
||
|
||
|
||
def report_path_for(binary_path: str) -> str:
|
||
"""Legacy entry point — returns the manual-export path. Prefer
|
||
:func:`ach_report_name` for new BW deployments. Retained for
|
||
backward compatibility with any caller still on the old convention."""
|
||
return legacy_report_name(binary_path)
|
||
|
||
|
||
def is_histogram_event(filename: str) -> bool:
|
||
"""True if the filename's extension marks the file as a Full Histogram
|
||
event (BW filename scheme: 4-char extensions of the form ``AB0T`` where
|
||
``T = H``). Old-firmware events use 3-char extensions where waveform-vs-
|
||
histogram is not encoded in the name; we can't tell those apart and
|
||
return False (the conservative answer — we don't want to suppress
|
||
"no report" warnings on potentially-waveform old-firmware events).
|
||
|
||
Used purely for log clarity — when a forward goes through without a
|
||
paired TXT, the log distinguishes "histogram, no report expected"
|
||
(acceptable: BW may not have written one even though it normally
|
||
does for ACH-routed histograms) from "no report ⚠" on a waveform
|
||
(more suspicious: BW almost always writes the TXT for waveform events).
|
||
Forwarding logic itself doesn't depend on this check.
|
||
"""
|
||
name = os.path.basename(filename)
|
||
ext = os.path.splitext(name)[1].lstrip(".").upper()
|
||
return len(ext) == 4 and ext.endswith("H")
|
||
|
||
|
||
# ── State file ────────────────────────────────────────────────────────────────
|
||
|
||
|
||
class ForwardState:
|
||
"""Idempotency record: which event files have we already forwarded?
|
||
|
||
State file format (JSON):
|
||
|
||
{
|
||
"version": 1,
|
||
"forwarded": {
|
||
"<sha256>": {
|
||
"filename": "M529LK44.AB0",
|
||
"size": 4400,
|
||
"forwarded_at": "2026-05-08T...Z"
|
||
},
|
||
...
|
||
}
|
||
}
|
||
|
||
Keyed by sha256 (not filename) so that re-saved or re-uploaded
|
||
identical content is recognised as already-forwarded even if the
|
||
file moved or got renamed. Filename is preserved for human
|
||
inspection.
|
||
"""
|
||
|
||
def __init__(self, path: str):
|
||
self.path = path
|
||
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
|
||
self._load()
|
||
|
||
def _load(self) -> None:
|
||
try:
|
||
with open(self.path, "r", encoding="utf-8") as f:
|
||
d = json.load(f)
|
||
if not isinstance(d, dict):
|
||
raise ValueError("state file root is not an object")
|
||
if d.get("version") != STATE_SCHEMA_VERSION:
|
||
log.warning(
|
||
"forward state version mismatch (got %r, want %d) — starting fresh",
|
||
d.get("version"), STATE_SCHEMA_VERSION,
|
||
)
|
||
return
|
||
forwarded = d.get("forwarded")
|
||
if isinstance(forwarded, dict):
|
||
self._data["forwarded"] = forwarded
|
||
except FileNotFoundError:
|
||
pass
|
||
except (OSError, ValueError, json.JSONDecodeError) as exc:
|
||
log.warning("failed to load forward state from %s: %s", self.path, exc)
|
||
|
||
def _save(self) -> None:
|
||
tmp = self.path + ".tmp"
|
||
try:
|
||
with open(tmp, "w", encoding="utf-8") as f:
|
||
json.dump(self._data, f, indent=2, sort_keys=True)
|
||
f.flush()
|
||
os.fsync(f.fileno())
|
||
os.replace(tmp, self.path)
|
||
except OSError as exc:
|
||
log.warning("failed to save forward state to %s: %s", self.path, exc)
|
||
|
||
def is_forwarded(self, sha256: str) -> bool:
|
||
return sha256 in self._data["forwarded"]
|
||
|
||
def status(self, sha256: str) -> Optional[bool]:
|
||
"""Return forwarding status for *sha256*.
|
||
|
||
Returns:
|
||
None — never forwarded. Eligible for a fresh forward.
|
||
True — forwarded successfully with its paired report
|
||
(or in a legacy entry that pre-dates the
|
||
had_report field — assumed complete for safety).
|
||
NOT a candidate for re-forward.
|
||
False — forwarded WITHOUT its paired ``_ASCII.TXT``
|
||
(BW's TXT-write lagged past the grace period).
|
||
Eligible for re-forward IF the TXT now exists,
|
||
so the SFM server's upsert path can refresh the
|
||
DB row with the report's authoritative values.
|
||
|
||
Legacy state-file entries without a ``had_report`` key default
|
||
to ``True`` so an upgrade doesn't unexpectedly re-forward
|
||
every entry the operator has accumulated.
|
||
"""
|
||
entry = self._data["forwarded"].get(sha256)
|
||
if entry is None:
|
||
return None
|
||
return bool(entry.get("had_report", True))
|
||
|
||
def mark_forwarded(
|
||
self,
|
||
sha256: str,
|
||
filename: str,
|
||
size: int,
|
||
had_report: bool = True,
|
||
) -> None:
|
||
"""Record a successful forward.
|
||
|
||
Set ``had_report=False`` when the forward shipped the binary
|
||
without its paired ASCII report. Such entries are re-checked
|
||
on subsequent scans and re-forwarded once the TXT appears, so
|
||
SFM's upsert refreshes the DB row with the device-authoritative
|
||
peak/project values.
|
||
|
||
Idempotent: re-marking an existing sha256 with ``had_report=True``
|
||
is the explicit promotion path used when a re-pair succeeds.
|
||
"""
|
||
self._data["forwarded"][sha256] = {
|
||
"filename": filename,
|
||
"size": size,
|
||
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||
"had_report": had_report,
|
||
}
|
||
self._save()
|
||
|
||
def count(self) -> int:
|
||
return len(self._data["forwarded"])
|
||
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def sha256_of_file(path: str) -> str:
|
||
h = hashlib.sha256()
|
||
with open(path, "rb") as f:
|
||
for chunk in iter(lambda: f.read(65536), b""):
|
||
h.update(chunk)
|
||
return h.hexdigest()
|
||
|
||
|
||
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
|
||
"""Return True if the file's mtime is at least `quiescence_seconds`
|
||
in the past — i.e. no longer being written."""
|
||
try:
|
||
mtime = os.path.getmtime(path)
|
||
except OSError:
|
||
return False
|
||
return (now_ts - mtime) >= quiescence_seconds
|
||
|
||
|
||
# ── Scan pass ─────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def find_pending_events(
|
||
watch_dir: str,
|
||
state: ForwardState,
|
||
*,
|
||
max_age_days: int,
|
||
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||
max_per_pass: int = 0,
|
||
) -> List[Tuple[str, Optional[str]]]:
|
||
"""
|
||
Walk `watch_dir` and return the list of (binary_path, txt_path_or_None)
|
||
pairs that need forwarding.
|
||
|
||
Filtering rules:
|
||
- Filename must match the BW event filename regex.
|
||
- File must be quiescent (mtime >= quiescence_seconds in the past).
|
||
- File must not exceed `max_age_days` (matches the heartbeat
|
||
path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the
|
||
forwarder).
|
||
- File's sha256 must NOT already be in the forwarded state.
|
||
- If a `<binary>.TXT` exists and is quiescent, we pair them.
|
||
Otherwise, if the binary is older than
|
||
missing_report_grace_seconds, we forward without the TXT.
|
||
Younger binaries with a missing TXT are deferred — let BW
|
||
finish writing the report.
|
||
- When `max_per_pass > 0`, return at most that many pairs.
|
||
Older files (lower mtime) are forwarded first so backfill
|
||
proceeds chronologically. Use this to drip-feed a folder
|
||
with thousands of qualifying events instead of hammering
|
||
the SFM server with one giant burst.
|
||
"""
|
||
if not os.path.isdir(watch_dir):
|
||
log.warning("forward scan: watch dir not found: %s", watch_dir)
|
||
return []
|
||
|
||
now_ts = time.time()
|
||
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
||
|
||
pending: List[Tuple[str, Optional[str]]] = []
|
||
skipped_inflight = 0
|
||
skipped_already_forwarded = 0
|
||
|
||
try:
|
||
with os.scandir(watch_dir) as it:
|
||
entries = list(it)
|
||
except OSError as exc:
|
||
log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc)
|
||
return []
|
||
|
||
# Cache existence of TXT partners so we don't stat() each twice.
|
||
names = {e.name for e in entries if e.is_file()}
|
||
|
||
# Sort by mtime ASCENDING so chronological backfill happens oldest-first.
|
||
# When max_per_pass clamps the list, we always advance — we don't get
|
||
# stuck re-considering the same N newest files every scan.
|
||
def _mtime(entry: os.DirEntry) -> float:
|
||
try:
|
||
return entry.stat().st_mtime
|
||
except OSError:
|
||
return 0.0
|
||
|
||
entries = sorted(
|
||
(e for e in entries if e.is_file()),
|
||
key=_mtime,
|
||
)
|
||
|
||
for e in entries:
|
||
if not e.is_file():
|
||
continue
|
||
if not is_event_binary(e.path):
|
||
continue
|
||
|
||
try:
|
||
mtime = e.stat().st_mtime
|
||
size = e.stat().st_size
|
||
except OSError:
|
||
continue
|
||
|
||
# Out-of-window: too old or too fresh
|
||
if (now_ts - mtime) > max_age_seconds:
|
||
continue
|
||
if not _is_quiescent(e.path, now_ts, quiescence_seconds):
|
||
skipped_inflight += 1
|
||
continue
|
||
|
||
# Idempotency: skip if we already forwarded this content
|
||
# successfully. Three cases via state.status(digest):
|
||
# True — forwarded WITH report → permanently done, skip.
|
||
# False — forwarded WITHOUT report → re-pair candidate.
|
||
# Forward again only if a paired TXT is now present
|
||
# so SFM's upsert refreshes the DB row.
|
||
# None — never forwarded → normal first-forward path.
|
||
try:
|
||
digest = sha256_of_file(e.path)
|
||
except OSError as exc:
|
||
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
|
||
continue
|
||
fwd_status = state.status(digest)
|
||
if fwd_status is True:
|
||
skipped_already_forwarded += 1
|
||
continue
|
||
|
||
# TXT pairing — try BW ACH convention first
|
||
# (<stem>_<ext>_ASCII.TXT) and fall back to the manual-export
|
||
# convention (<binary>.TXT). Both checked case-insensitively
|
||
# against the cached directory listing. ACH wins when both
|
||
# exist — that's the format BW's official ACH server writes.
|
||
candidates = [ach_report_name(e.name), legacy_report_name(e.name)]
|
||
|
||
# Case-insensitive name lookup against the cached set.
|
||
names_lc_to_actual = None
|
||
txt_name: Optional[str] = None
|
||
for cand in candidates:
|
||
if cand in names:
|
||
txt_name = cand
|
||
break
|
||
# Build lower-case index lazily — most folders have very few
|
||
# TXT files relative to binaries, so the linear scan only
|
||
# fires when neither exact-case candidate matches.
|
||
if names_lc_to_actual is None:
|
||
names_lc_to_actual = {n.lower(): n for n in names}
|
||
actual = names_lc_to_actual.get(cand.lower())
|
||
if actual:
|
||
txt_name = actual
|
||
break
|
||
|
||
txt_path: Optional[str] = None
|
||
if txt_name:
|
||
candidate = os.path.join(watch_dir, txt_name)
|
||
if _is_quiescent(candidate, now_ts, quiescence_seconds):
|
||
txt_path = candidate
|
||
# else: TXT is mid-write; treat as not-yet-paired and defer.
|
||
|
||
if fwd_status is False:
|
||
# Previously forwarded WITHOUT report. We're here looking
|
||
# for a re-pair opportunity. If the TXT is now present
|
||
# and quiescent, include in pending for re-forward (the
|
||
# SFM server's upsert will refresh the DB row with the
|
||
# report's authoritative values). Otherwise skip — no
|
||
# point re-forwarding the same binary alone again.
|
||
if txt_path is None:
|
||
skipped_already_forwarded += 1
|
||
continue
|
||
elif txt_path is None:
|
||
# First-time forward and TXT not yet present. Wait for the
|
||
# grace period before forwarding alone.
|
||
if (now_ts - mtime) < missing_report_grace_seconds:
|
||
skipped_inflight += 1
|
||
continue
|
||
|
||
pending.append((e.path, txt_path))
|
||
# Stash size + digest on the tuple-replacement for use during forward;
|
||
# callers can re-derive but caching avoids a second sha256.
|
||
|
||
# Per-pass cap: once we have enough pending, stop scanning.
|
||
if max_per_pass and len(pending) >= max_per_pass:
|
||
break
|
||
|
||
log.debug(
|
||
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
|
||
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
|
||
)
|
||
return pending
|
||
|
||
|
||
# ── Multipart upload ──────────────────────────────────────────────────────────
|
||
|
||
|
||
def _encode_multipart(
|
||
parts: List[Tuple[str, str, str, bytes]],
|
||
) -> Tuple[bytes, str]:
|
||
"""Encode a list of (field_name, filename, content_type, data) tuples
|
||
as a multipart/form-data body. Returns (body_bytes, content_type
|
||
header value)."""
|
||
boundary = "----Series3WatcherBoundary" + os.urandom(8).hex()
|
||
chunks: List[bytes] = []
|
||
for field_name, filename, content_type, data in parts:
|
||
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
|
||
chunks.append(
|
||
(f'Content-Disposition: form-data; name="{field_name}"; '
|
||
f'filename="{filename}"\r\n').encode("ascii")
|
||
)
|
||
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
|
||
chunks.append(data)
|
||
chunks.append(b"\r\n")
|
||
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
|
||
body = b"".join(chunks)
|
||
content_type_hdr = f"multipart/form-data; boundary={boundary}"
|
||
return body, content_type_hdr
|
||
|
||
|
||
def _import_endpoint(sfm_url: str) -> str:
|
||
"""Compose the import endpoint URL from a base SFM URL."""
|
||
return sfm_url.rstrip("/") + "/db/import/blastware_file"
|
||
|
||
|
||
def forward_event_pair(
|
||
sfm_url: str,
|
||
binary_path: str,
|
||
txt_path: Optional[str],
|
||
*,
|
||
serial_hint: Optional[str] = None,
|
||
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||
) -> Dict[str, Any]:
|
||
"""POST a single event (binary + optional .TXT) to the SFM import
|
||
endpoint.
|
||
|
||
Returns a dict mirroring the per-file outcome the server returned
|
||
(see /db/import/blastware_file response.results[0]) on success, or
|
||
a dict with `status="error"` on transport/HTTP failure.
|
||
"""
|
||
binary_name = os.path.basename(binary_path)
|
||
with open(binary_path, "rb") as f:
|
||
binary_bytes = f.read()
|
||
|
||
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
|
||
if txt_path is not None:
|
||
with open(txt_path, "rb") as f:
|
||
txt_bytes = f.read()
|
||
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
|
||
|
||
body, content_type = _encode_multipart(parts)
|
||
|
||
url = _import_endpoint(sfm_url)
|
||
if serial_hint:
|
||
sep = "&" if "?" in url else "?"
|
||
url = f"{url}{sep}serial={serial_hint}"
|
||
|
||
req = urllib.request.Request(
|
||
url, data=body, method="POST",
|
||
headers={
|
||
"Content-Type": content_type,
|
||
"Content-Length": str(len(body)),
|
||
"User-Agent": "series3-watcher/sfm-forwarder",
|
||
"Accept": "application/json",
|
||
},
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8", errors="replace")
|
||
try:
|
||
payload = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
return {
|
||
"status": "error",
|
||
"filename": binary_name,
|
||
"detail": f"server returned non-JSON: {raw[:200]!r}",
|
||
}
|
||
# Server returns {"count":N, "results":[{...}]}. Pull our row out.
|
||
for entry in (payload.get("results") or []):
|
||
if entry.get("filename") == binary_name and entry.get("status") == "ok":
|
||
return entry
|
||
# No matching ok row → propagate the first error we find
|
||
for entry in (payload.get("results") or []):
|
||
if entry.get("filename") == binary_name:
|
||
return entry
|
||
return {
|
||
"status": "error",
|
||
"filename": binary_name,
|
||
"detail": f"unexpected server response: {payload!r}",
|
||
}
|
||
except urllib.error.HTTPError as exc:
|
||
try:
|
||
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
|
||
except Exception:
|
||
body_excerpt = ""
|
||
return {
|
||
"status": "error",
|
||
"filename": binary_name,
|
||
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
|
||
}
|
||
except urllib.error.URLError as exc:
|
||
return {
|
||
"status": "error",
|
||
"filename": binary_name,
|
||
"detail": f"connection error: {exc.reason}",
|
||
}
|
||
except (OSError, TimeoutError) as exc:
|
||
return {
|
||
"status": "error",
|
||
"filename": binary_name,
|
||
"detail": f"transport error: {exc}",
|
||
}
|
||
|
||
|
||
# ── Top-level orchestration ───────────────────────────────────────────────────
|
||
|
||
|
||
def forward_pending(
|
||
watch_dir: str,
|
||
sfm_url: str,
|
||
state: ForwardState,
|
||
*,
|
||
max_age_days: int,
|
||
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||
max_per_pass: int = 0,
|
||
logger: Optional[Any] = None,
|
||
) -> Dict[str, int]:
|
||
"""
|
||
Run one full pass: find pending events, POST each one, update state.
|
||
|
||
Returns a counts dict suitable for logging:
|
||
|
||
{
|
||
"scanned": <int>, # total event binaries seen
|
||
"forwarded": <int>, # successfully POSTed this pass
|
||
"errors": <int>, # POST failures (will retry next pass)
|
||
"with_report":<int>, # of forwarded, how many had a paired TXT
|
||
}
|
||
"""
|
||
def _log(msg: str) -> None:
|
||
if logger:
|
||
logger(msg)
|
||
else:
|
||
log.info(msg)
|
||
|
||
pending = find_pending_events(
|
||
watch_dir, state,
|
||
max_age_days=max_age_days,
|
||
quiescence_seconds=quiescence_seconds,
|
||
missing_report_grace_seconds=missing_report_grace_seconds,
|
||
max_per_pass=max_per_pass,
|
||
)
|
||
|
||
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
|
||
|
||
for binary_path, txt_path in pending:
|
||
result = forward_event_pair(
|
||
sfm_url, binary_path, txt_path,
|
||
timeout=timeout,
|
||
)
|
||
if result.get("status") == "ok":
|
||
try:
|
||
digest = sha256_of_file(binary_path)
|
||
size = os.path.getsize(binary_path)
|
||
# Record whether this forward shipped a paired TXT.
|
||
# Forwards without a TXT are flagged had_report=False so
|
||
# subsequent scans re-check whether the TXT has since
|
||
# appeared and trigger a re-forward (the SFM server's
|
||
# upsert path refreshes the DB row with the report's
|
||
# authoritative values).
|
||
state.mark_forwarded(
|
||
digest,
|
||
os.path.basename(binary_path),
|
||
size,
|
||
had_report=(txt_path is not None),
|
||
)
|
||
except OSError as exc:
|
||
_log(f"[forward] post-success state save failed for "
|
||
f"{os.path.basename(binary_path)}: {exc}")
|
||
counts["forwarded"] += 1
|
||
if txt_path:
|
||
counts["with_report"] += 1
|
||
|
||
# Differentiate three cases in the log so "no report" is only
|
||
# noisy when something's actually unexpected:
|
||
# - waveform + TXT → "+ <txt> attached"
|
||
# - waveform without TXT → "no report ⚠" (BW maybe didn't auto-export)
|
||
# - histogram (any flavour) → "(histogram, no report expected)"
|
||
if txt_path:
|
||
report_token = "+ {} attached".format(os.path.basename(txt_path))
|
||
elif is_histogram_event(binary_path):
|
||
report_token = "(histogram, no report expected)"
|
||
else:
|
||
report_token = "no report ⚠"
|
||
|
||
_log(
|
||
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
|
||
os.path.basename(binary_path),
|
||
result.get("filesize", 0),
|
||
report_token,
|
||
result.get("inserted", 0),
|
||
result.get("skipped", 0),
|
||
)
|
||
)
|
||
else:
|
||
counts["errors"] += 1
|
||
_log(
|
||
f"[forward] ERR {os.path.basename(binary_path)}: "
|
||
f"{result.get('detail', 'unknown error')}"
|
||
)
|
||
|
||
return counts
|
||
|
||
|
||
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
|
||
|
||
|
||
def seed_state_from_folder(
|
||
watch_dir: str,
|
||
state: ForwardState,
|
||
*,
|
||
max_age_days: int = 365,
|
||
logger: Optional[Any] = None,
|
||
) -> Dict[str, int]:
|
||
"""Walk `watch_dir` and mark every existing event binary as already
|
||
forwarded — without POSTing anything.
|
||
|
||
This is the right tool for a first deploy on a machine that already
|
||
has tens or hundreds of thousands of historical events in the BW
|
||
ACH folder. Run it ONCE before enabling SFM_FORWARD_ENABLED:
|
||
|
||
python event_forwarder.py --seed-state \
|
||
--watch "C:\\Blastware 10\\Event\\autocall home" \
|
||
--state "C:\\...\\sfm_forwarded.json" \
|
||
[--max-age-days 365]
|
||
|
||
The watcher then starts forwarding only events that appear AFTER
|
||
the seed run. Files older than `max_age_days` are still skipped
|
||
by the regular scan loop — we don't bother seeding them because
|
||
they wouldn't be forwarded anyway.
|
||
|
||
Returns a counts dict:
|
||
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
|
||
"""
|
||
def _log(msg: str) -> None:
|
||
if logger:
|
||
logger(msg)
|
||
else:
|
||
log.info(msg)
|
||
|
||
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
|
||
|
||
if not os.path.isdir(watch_dir):
|
||
_log(f"[seed] watch dir not found: {watch_dir}")
|
||
return counts
|
||
|
||
now_ts = time.time()
|
||
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
||
|
||
try:
|
||
with os.scandir(watch_dir) as it:
|
||
entries = [e for e in it if e.is_file()]
|
||
except OSError as exc:
|
||
_log(f"[seed] scandir failed on {watch_dir}: {exc}")
|
||
return counts
|
||
|
||
for e in entries:
|
||
if not is_event_binary(e.path):
|
||
continue
|
||
counts["scanned"] += 1
|
||
try:
|
||
mtime = e.stat().st_mtime
|
||
size = e.stat().st_size
|
||
except OSError:
|
||
continue
|
||
if (now_ts - mtime) > max_age_seconds:
|
||
counts["skipped_too_old"] += 1
|
||
continue
|
||
try:
|
||
digest = sha256_of_file(e.path)
|
||
except OSError as exc:
|
||
_log(f"[seed] sha256 failed for {e.path}: {exc}")
|
||
continue
|
||
if state.is_forwarded(digest):
|
||
counts["already_known"] += 1
|
||
continue
|
||
state.mark_forwarded(digest, e.name, size)
|
||
counts["seeded"] += 1
|
||
if counts["seeded"] % 1000 == 0:
|
||
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
|
||
|
||
_log(
|
||
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
|
||
f"already_known={counts['already_known']} "
|
||
f"skipped_too_old={counts['skipped_too_old']}"
|
||
)
|
||
return counts
|
||
|
||
|
||
# ── CLI entry point ─────────────────────────────────────────────────────────
|
||
|
||
|
||
def _main() -> int:
|
||
"""Command-line interface for one-shot operations.
|
||
|
||
Currently supports a single mode:
|
||
|
||
python event_forwarder.py --seed-state \
|
||
--watch "<path/to/BW autocall folder>" \
|
||
--state "<path/to/sfm_forwarded.json>" \
|
||
[--max-age-days 365]
|
||
|
||
which marks every existing in-window event binary as already
|
||
forwarded (without POSTing) so the watcher only forwards events
|
||
appearing AFTER the seed.
|
||
"""
|
||
import argparse
|
||
parser = argparse.ArgumentParser(
|
||
description="Series 3 Watcher — SFM event forwarder utilities",
|
||
)
|
||
parser.add_argument(
|
||
"--seed-state", action="store_true",
|
||
help="Mark every event binary in --watch as already-forwarded "
|
||
"(without POSTing). Use this BEFORE enabling SFM_FORWARD "
|
||
"on a machine with a large historical archive.",
|
||
)
|
||
parser.add_argument(
|
||
"--watch", required=True,
|
||
help="Path to the Blastware ACH folder.",
|
||
)
|
||
parser.add_argument(
|
||
"--state", required=True,
|
||
help="Path to the JSON state file. Will be created if missing.",
|
||
)
|
||
parser.add_argument(
|
||
"--max-age-days", type=int, default=365,
|
||
help="Only seed files newer than this many days (default 365).",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not args.seed_state:
|
||
parser.error("specify --seed-state (no other modes supported yet)")
|
||
|
||
print(f"[seed] watch_dir = {args.watch}")
|
||
print(f"[seed] state = {args.state}")
|
||
print(f"[seed] max_age = {args.max_age_days} days")
|
||
|
||
state = ForwardState(args.state)
|
||
print(f"[seed] state currently has {state.count()} entries")
|
||
seed_state_from_folder(
|
||
args.watch, state,
|
||
max_age_days=args.max_age_days,
|
||
logger=lambda m: print(m),
|
||
)
|
||
print(f"[seed] state now has {state.count()} entries")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
sys.exit(_main())
|