Files
series3-watcher/event_forwarder.py
T
serversdown 65b3af90ae feat(forward): re-pair late-arriving TXTs on subsequent scans
When a binary is forwarded WITHOUT its paired _ASCII.TXT (because
the TXT wasn't quiescent within the grace period — BW slow to
write, AV scanning, etc.), the old behaviour was to permanently
mark the binary as "done" in the state file, even though the TXT
might land seconds later.  Result: that event lived in SFM forever
with broken-codec peak values and no project info.

Fix: state entries now carry a had_report flag.  Forwards without
a TXT set had_report=False.  On subsequent scans, the watcher
treats had_report=False entries as re-pair candidates — they get
re-forwarded once the TXT appears, and the SFM server's upsert
path (in seismo-relay's insert_events IntegrityError handler)
refreshes the DB row with the report's authoritative values.

Three status states in ForwardState.status(sha256):
  None  — never forwarded.  First-forward path.
  True  — forwarded successfully WITH report (or legacy entry
          without the had_report field).  Permanently done.
  False — forwarded WITHOUT report.  Re-pair if TXT now exists.

Backward compat: legacy state-file entries (no had_report key)
default to True so existing deployments don't unexpectedly
re-forward every entry on upgrade.

Tests cover:
  - re-pair when TXT appears after a had_report=False forward
  - had_report=True entries stay skipped permanently
  - legacy entries (missing field) treated as fully forwarded
  - state.status() returns None for unknown sha
  - re-marking had_report=False then True promotes to fully-done

36 watcher tests pass (was 31, +5 new).
2026-05-11 16:22:53 +00:00

834 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
event_forwarder.py — forward Blastware event files to an SFM server.
Watches the same Blastware ACH folder the heartbeat path watches.
For each event binary that hasn't been forwarded yet, pairs it with
its `<binary>.TXT` report (when available) and POSTs both to SFM's
`/db/import/blastware_file` endpoint as one multipart request.
The receiving SFM server (seismo-relay v0.16+) detects paired binaries
and reports by filename, parses the .TXT into structured fields
(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak
Displacement / sensor self-check / monitor log), and persists every
field into the SFM database for sortable / filterable monthly-summary
review.
Design notes
────────────
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
Multipart encoding is hand-rolled.
- **Idempotent across restarts.** Forwarded files are tracked by
sha256 in a JSON state file (`.forwarded.json` next to config.ini).
Re-scanning the watch folder doesn't re-POST anything.
- **Default-off.** Callers must enable via config
(`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x
deployments that auto-update to the new version stay non-forwarding
until an operator flips the switch.
- **Quiescence guard.** Files modified within the last few seconds
are skipped — Blastware ACH writes the .TXT after the binary, so
we wait until both look stable before forwarding.
- **Best-effort report pairing.** When the .TXT hasn't appeared yet
but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the
binary is forwarded alone (the SFM endpoint accepts that and just
skips the rich fields — we'd rather get the binary indexed than
block forever waiting for a TXT that never arrived).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
log = logging.getLogger(__name__)
# Default tuning. All overridable via config.ini SFM_* keys.
DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds
DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout
STATE_SCHEMA_VERSION = 1
# ── Filename matching ─────────────────────────────────────────────────────────
#
# Blastware's filename scheme (confirmed in seismo-relay docs):
# prefix_letter (BZ) + 3-digit serial-tail + 4-char base36 timestamp stem
# + "." + 3-or-4-char extension.
# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00.
#
# We accept lowercase too because some filesystems lower-case names.
_EVENT_FILENAME_RE = re.compile(
r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$"
)
# Filenames we explicitly skip even if they happen to match the regex.
_NON_EVENT_EXTS = {
".mlg", # monitor-log files (separate heartbeat path)
".txt", # ASCII reports — handled via pairing, not as primary files
".log",
".ini",
".dat",
".bak",
".tmp",
".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence)
".h5",
".sfm.json",
".json",
}
def is_event_binary(path: str) -> bool:
"""Return True if `path`'s basename looks like a Blastware event binary."""
name = os.path.basename(path)
if not _EVENT_FILENAME_RE.match(name):
return False
ext = os.path.splitext(name)[1].lower()
if ext in _NON_EVENT_EXTS:
return False
return True
def ach_report_name(binary_name: str) -> str:
"""BW ACH report-naming convention.
Blastware's official Auto Call Home server writes per-event ASCII
reports as ``<stem>_<ext>_ASCII.TXT`` — the ``.`` between stem and
ext is replaced with ``_`` and ``_ASCII.TXT`` is appended.
Examples:
``M529LK44.AB0`` → ``M529LK44_AB0_ASCII.TXT``
``N844L20G.630H`` → ``N844L20G_630H_ASCII.TXT``
``H907L1R7.PG0H`` → ``H907L1R7_PG0H_ASCII.TXT``
For a filename without a dot (defensive — shouldn't happen for real
BW events) we still append ``_ASCII.TXT``.
"""
stem, dot, ext = binary_name.rpartition(".")
if not dot:
return binary_name + "_ASCII.TXT"
return stem + "_" + ext + "_ASCII.TXT"
def legacy_report_name(binary_name: str) -> str:
"""Manual-export convention: ``<binary>.TXT`` (e.g. when an operator
saves an event report to text directly from BW's UI rather than
letting ACH auto-export it). Kept as a fallback so the codec-agent
test fixtures (``decode-re/5-8-26/event-c/M529LK44.AB0.TXT``) still
pair correctly."""
return binary_name + ".TXT"
def report_path_for(binary_path: str) -> str:
"""Legacy entry point — returns the manual-export path. Prefer
:func:`ach_report_name` for new BW deployments. Retained for
backward compatibility with any caller still on the old convention."""
return legacy_report_name(binary_path)
def is_histogram_event(filename: str) -> bool:
"""True if the filename's extension marks the file as a Full Histogram
event (BW filename scheme: 4-char extensions of the form ``AB0T`` where
``T = H``). Old-firmware events use 3-char extensions where waveform-vs-
histogram is not encoded in the name; we can't tell those apart and
return False (the conservative answer — we don't want to suppress
"no report" warnings on potentially-waveform old-firmware events).
Used purely for log clarity — when a forward goes through without a
paired TXT, the log distinguishes "histogram, no report expected"
(acceptable: BW may not have written one even though it normally
does for ACH-routed histograms) from "no report ⚠" on a waveform
(more suspicious: BW almost always writes the TXT for waveform events).
Forwarding logic itself doesn't depend on this check.
"""
name = os.path.basename(filename)
ext = os.path.splitext(name)[1].lstrip(".").upper()
return len(ext) == 4 and ext.endswith("H")
# ── State file ────────────────────────────────────────────────────────────────
class ForwardState:
"""Idempotency record: which event files have we already forwarded?
State file format (JSON):
{
"version": 1,
"forwarded": {
"<sha256>": {
"filename": "M529LK44.AB0",
"size": 4400,
"forwarded_at": "2026-05-08T...Z"
},
...
}
}
Keyed by sha256 (not filename) so that re-saved or re-uploaded
identical content is recognised as already-forwarded even if the
file moved or got renamed. Filename is preserved for human
inspection.
"""
def __init__(self, path: str):
self.path = path
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
self._load()
def _load(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as f:
d = json.load(f)
if not isinstance(d, dict):
raise ValueError("state file root is not an object")
if d.get("version") != STATE_SCHEMA_VERSION:
log.warning(
"forward state version mismatch (got %r, want %d) — starting fresh",
d.get("version"), STATE_SCHEMA_VERSION,
)
return
forwarded = d.get("forwarded")
if isinstance(forwarded, dict):
self._data["forwarded"] = forwarded
except FileNotFoundError:
pass
except (OSError, ValueError, json.JSONDecodeError) as exc:
log.warning("failed to load forward state from %s: %s", self.path, exc)
def _save(self) -> None:
tmp = self.path + ".tmp"
try:
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except OSError as exc:
log.warning("failed to save forward state to %s: %s", self.path, exc)
def is_forwarded(self, sha256: str) -> bool:
return sha256 in self._data["forwarded"]
def status(self, sha256: str) -> Optional[bool]:
"""Return forwarding status for *sha256*.
Returns:
None — never forwarded. Eligible for a fresh forward.
True — forwarded successfully with its paired report
(or in a legacy entry that pre-dates the
had_report field — assumed complete for safety).
NOT a candidate for re-forward.
False — forwarded WITHOUT its paired ``_ASCII.TXT``
(BW's TXT-write lagged past the grace period).
Eligible for re-forward IF the TXT now exists,
so the SFM server's upsert path can refresh the
DB row with the report's authoritative values.
Legacy state-file entries without a ``had_report`` key default
to ``True`` so an upgrade doesn't unexpectedly re-forward
every entry the operator has accumulated.
"""
entry = self._data["forwarded"].get(sha256)
if entry is None:
return None
return bool(entry.get("had_report", True))
def mark_forwarded(
self,
sha256: str,
filename: str,
size: int,
had_report: bool = True,
) -> None:
"""Record a successful forward.
Set ``had_report=False`` when the forward shipped the binary
without its paired ASCII report. Such entries are re-checked
on subsequent scans and re-forwarded once the TXT appears, so
SFM's upsert refreshes the DB row with the device-authoritative
peak/project values.
Idempotent: re-marking an existing sha256 with ``had_report=True``
is the explicit promotion path used when a re-pair succeeds.
"""
self._data["forwarded"][sha256] = {
"filename": filename,
"size": size,
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"had_report": had_report,
}
self._save()
def count(self) -> int:
return len(self._data["forwarded"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def sha256_of_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
"""Return True if the file's mtime is at least `quiescence_seconds`
in the past — i.e. no longer being written."""
try:
mtime = os.path.getmtime(path)
except OSError:
return False
return (now_ts - mtime) >= quiescence_seconds
# ── Scan pass ─────────────────────────────────────────────────────────────────
def find_pending_events(
watch_dir: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
max_per_pass: int = 0,
) -> List[Tuple[str, Optional[str]]]:
"""
Walk `watch_dir` and return the list of (binary_path, txt_path_or_None)
pairs that need forwarding.
Filtering rules:
- Filename must match the BW event filename regex.
- File must be quiescent (mtime >= quiescence_seconds in the past).
- File must not exceed `max_age_days` (matches the heartbeat
path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the
forwarder).
- File's sha256 must NOT already be in the forwarded state.
- If a `<binary>.TXT` exists and is quiescent, we pair them.
Otherwise, if the binary is older than
missing_report_grace_seconds, we forward without the TXT.
Younger binaries with a missing TXT are deferred — let BW
finish writing the report.
- When `max_per_pass > 0`, return at most that many pairs.
Older files (lower mtime) are forwarded first so backfill
proceeds chronologically. Use this to drip-feed a folder
with thousands of qualifying events instead of hammering
the SFM server with one giant burst.
"""
if not os.path.isdir(watch_dir):
log.warning("forward scan: watch dir not found: %s", watch_dir)
return []
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
pending: List[Tuple[str, Optional[str]]] = []
skipped_inflight = 0
skipped_already_forwarded = 0
try:
with os.scandir(watch_dir) as it:
entries = list(it)
except OSError as exc:
log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc)
return []
# Cache existence of TXT partners so we don't stat() each twice.
names = {e.name for e in entries if e.is_file()}
# Sort by mtime ASCENDING so chronological backfill happens oldest-first.
# When max_per_pass clamps the list, we always advance — we don't get
# stuck re-considering the same N newest files every scan.
def _mtime(entry: os.DirEntry) -> float:
try:
return entry.stat().st_mtime
except OSError:
return 0.0
entries = sorted(
(e for e in entries if e.is_file()),
key=_mtime,
)
for e in entries:
if not e.is_file():
continue
if not is_event_binary(e.path):
continue
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
# Out-of-window: too old or too fresh
if (now_ts - mtime) > max_age_seconds:
continue
if not _is_quiescent(e.path, now_ts, quiescence_seconds):
skipped_inflight += 1
continue
# Idempotency: skip if we already forwarded this content
# successfully. Three cases via state.status(digest):
# True — forwarded WITH report → permanently done, skip.
# False — forwarded WITHOUT report → re-pair candidate.
# Forward again only if a paired TXT is now present
# so SFM's upsert refreshes the DB row.
# None — never forwarded → normal first-forward path.
try:
digest = sha256_of_file(e.path)
except OSError as exc:
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
continue
fwd_status = state.status(digest)
if fwd_status is True:
skipped_already_forwarded += 1
continue
# TXT pairing — try BW ACH convention first
# (<stem>_<ext>_ASCII.TXT) and fall back to the manual-export
# convention (<binary>.TXT). Both checked case-insensitively
# against the cached directory listing. ACH wins when both
# exist — that's the format BW's official ACH server writes.
candidates = [ach_report_name(e.name), legacy_report_name(e.name)]
# Case-insensitive name lookup against the cached set.
names_lc_to_actual = None
txt_name: Optional[str] = None
for cand in candidates:
if cand in names:
txt_name = cand
break
# Build lower-case index lazily — most folders have very few
# TXT files relative to binaries, so the linear scan only
# fires when neither exact-case candidate matches.
if names_lc_to_actual is None:
names_lc_to_actual = {n.lower(): n for n in names}
actual = names_lc_to_actual.get(cand.lower())
if actual:
txt_name = actual
break
txt_path: Optional[str] = None
if txt_name:
candidate = os.path.join(watch_dir, txt_name)
if _is_quiescent(candidate, now_ts, quiescence_seconds):
txt_path = candidate
# else: TXT is mid-write; treat as not-yet-paired and defer.
if fwd_status is False:
# Previously forwarded WITHOUT report. We're here looking
# for a re-pair opportunity. If the TXT is now present
# and quiescent, include in pending for re-forward (the
# SFM server's upsert will refresh the DB row with the
# report's authoritative values). Otherwise skip — no
# point re-forwarding the same binary alone again.
if txt_path is None:
skipped_already_forwarded += 1
continue
elif txt_path is None:
# First-time forward and TXT not yet present. Wait for the
# grace period before forwarding alone.
if (now_ts - mtime) < missing_report_grace_seconds:
skipped_inflight += 1
continue
pending.append((e.path, txt_path))
# Stash size + digest on the tuple-replacement for use during forward;
# callers can re-derive but caching avoids a second sha256.
# Per-pass cap: once we have enough pending, stop scanning.
if max_per_pass and len(pending) >= max_per_pass:
break
log.debug(
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
)
return pending
# ── Multipart upload ──────────────────────────────────────────────────────────
def _encode_multipart(
parts: List[Tuple[str, str, str, bytes]],
) -> Tuple[bytes, str]:
"""Encode a list of (field_name, filename, content_type, data) tuples
as a multipart/form-data body. Returns (body_bytes, content_type
header value)."""
boundary = "----Series3WatcherBoundary" + os.urandom(8).hex()
chunks: List[bytes] = []
for field_name, filename, content_type, data in parts:
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
chunks.append(
(f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode("ascii")
)
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
chunks.append(data)
chunks.append(b"\r\n")
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
body = b"".join(chunks)
content_type_hdr = f"multipart/form-data; boundary={boundary}"
return body, content_type_hdr
def _import_endpoint(sfm_url: str) -> str:
"""Compose the import endpoint URL from a base SFM URL."""
return sfm_url.rstrip("/") + "/db/import/blastware_file"
def forward_event_pair(
sfm_url: str,
binary_path: str,
txt_path: Optional[str],
*,
serial_hint: Optional[str] = None,
timeout: float = DEFAULT_HTTP_TIMEOUT,
) -> Dict[str, Any]:
"""POST a single event (binary + optional .TXT) to the SFM import
endpoint.
Returns a dict mirroring the per-file outcome the server returned
(see /db/import/blastware_file response.results[0]) on success, or
a dict with `status="error"` on transport/HTTP failure.
"""
binary_name = os.path.basename(binary_path)
with open(binary_path, "rb") as f:
binary_bytes = f.read()
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
if txt_path is not None:
with open(txt_path, "rb") as f:
txt_bytes = f.read()
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
body, content_type = _encode_multipart(parts)
url = _import_endpoint(sfm_url)
if serial_hint:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}serial={serial_hint}"
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Content-Type": content_type,
"Content-Length": str(len(body)),
"User-Agent": "series3-watcher/sfm-forwarder",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return {
"status": "error",
"filename": binary_name,
"detail": f"server returned non-JSON: {raw[:200]!r}",
}
# Server returns {"count":N, "results":[{...}]}. Pull our row out.
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name and entry.get("status") == "ok":
return entry
# No matching ok row → propagate the first error we find
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name:
return entry
return {
"status": "error",
"filename": binary_name,
"detail": f"unexpected server response: {payload!r}",
}
except urllib.error.HTTPError as exc:
try:
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
except Exception:
body_excerpt = ""
return {
"status": "error",
"filename": binary_name,
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
}
except urllib.error.URLError as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"connection error: {exc.reason}",
}
except (OSError, TimeoutError) as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"transport error: {exc}",
}
# ── Top-level orchestration ───────────────────────────────────────────────────
def forward_pending(
watch_dir: str,
sfm_url: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
timeout: float = DEFAULT_HTTP_TIMEOUT,
max_per_pass: int = 0,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""
Run one full pass: find pending events, POST each one, update state.
Returns a counts dict suitable for logging:
{
"scanned": <int>, # total event binaries seen
"forwarded": <int>, # successfully POSTed this pass
"errors": <int>, # POST failures (will retry next pass)
"with_report":<int>, # of forwarded, how many had a paired TXT
}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
pending = find_pending_events(
watch_dir, state,
max_age_days=max_age_days,
quiescence_seconds=quiescence_seconds,
missing_report_grace_seconds=missing_report_grace_seconds,
max_per_pass=max_per_pass,
)
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
for binary_path, txt_path in pending:
result = forward_event_pair(
sfm_url, binary_path, txt_path,
timeout=timeout,
)
if result.get("status") == "ok":
try:
digest = sha256_of_file(binary_path)
size = os.path.getsize(binary_path)
# Record whether this forward shipped a paired TXT.
# Forwards without a TXT are flagged had_report=False so
# subsequent scans re-check whether the TXT has since
# appeared and trigger a re-forward (the SFM server's
# upsert path refreshes the DB row with the report's
# authoritative values).
state.mark_forwarded(
digest,
os.path.basename(binary_path),
size,
had_report=(txt_path is not None),
)
except OSError as exc:
_log(f"[forward] post-success state save failed for "
f"{os.path.basename(binary_path)}: {exc}")
counts["forwarded"] += 1
if txt_path:
counts["with_report"] += 1
# Differentiate three cases in the log so "no report" is only
# noisy when something's actually unexpected:
# - waveform + TXT → "+ <txt> attached"
# - waveform without TXT → "no report ⚠" (BW maybe didn't auto-export)
# - histogram (any flavour) → "(histogram, no report expected)"
if txt_path:
report_token = "+ {} attached".format(os.path.basename(txt_path))
elif is_histogram_event(binary_path):
report_token = "(histogram, no report expected)"
else:
report_token = "no report ⚠"
_log(
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
os.path.basename(binary_path),
result.get("filesize", 0),
report_token,
result.get("inserted", 0),
result.get("skipped", 0),
)
)
else:
counts["errors"] += 1
_log(
f"[forward] ERR {os.path.basename(binary_path)}: "
f"{result.get('detail', 'unknown error')}"
)
return counts
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
def seed_state_from_folder(
watch_dir: str,
state: ForwardState,
*,
max_age_days: int = 365,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""Walk `watch_dir` and mark every existing event binary as already
forwarded — without POSTing anything.
This is the right tool for a first deploy on a machine that already
has tens or hundreds of thousands of historical events in the BW
ACH folder. Run it ONCE before enabling SFM_FORWARD_ENABLED:
python event_forwarder.py --seed-state \
--watch "C:\\Blastware 10\\Event\\autocall home" \
--state "C:\\...\\sfm_forwarded.json" \
[--max-age-days 365]
The watcher then starts forwarding only events that appear AFTER
the seed run. Files older than `max_age_days` are still skipped
by the regular scan loop — we don't bother seeding them because
they wouldn't be forwarded anyway.
Returns a counts dict:
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
if not os.path.isdir(watch_dir):
_log(f"[seed] watch dir not found: {watch_dir}")
return counts
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
try:
with os.scandir(watch_dir) as it:
entries = [e for e in it if e.is_file()]
except OSError as exc:
_log(f"[seed] scandir failed on {watch_dir}: {exc}")
return counts
for e in entries:
if not is_event_binary(e.path):
continue
counts["scanned"] += 1
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
counts["skipped_too_old"] += 1
continue
try:
digest = sha256_of_file(e.path)
except OSError as exc:
_log(f"[seed] sha256 failed for {e.path}: {exc}")
continue
if state.is_forwarded(digest):
counts["already_known"] += 1
continue
state.mark_forwarded(digest, e.name, size)
counts["seeded"] += 1
if counts["seeded"] % 1000 == 0:
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
_log(
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
f"already_known={counts['already_known']} "
f"skipped_too_old={counts['skipped_too_old']}"
)
return counts
# ── CLI entry point ─────────────────────────────────────────────────────────
def _main() -> int:
"""Command-line interface for one-shot operations.
Currently supports a single mode:
python event_forwarder.py --seed-state \
--watch "<path/to/BW autocall folder>" \
--state "<path/to/sfm_forwarded.json>" \
[--max-age-days 365]
which marks every existing in-window event binary as already
forwarded (without POSTing) so the watcher only forwards events
appearing AFTER the seed.
"""
import argparse
parser = argparse.ArgumentParser(
description="Series 3 Watcher — SFM event forwarder utilities",
)
parser.add_argument(
"--seed-state", action="store_true",
help="Mark every event binary in --watch as already-forwarded "
"(without POSTing). Use this BEFORE enabling SFM_FORWARD "
"on a machine with a large historical archive.",
)
parser.add_argument(
"--watch", required=True,
help="Path to the Blastware ACH folder.",
)
parser.add_argument(
"--state", required=True,
help="Path to the JSON state file. Will be created if missing.",
)
parser.add_argument(
"--max-age-days", type=int, default=365,
help="Only seed files newer than this many days (default 365).",
)
args = parser.parse_args()
if not args.seed_state:
parser.error("specify --seed-state (no other modes supported yet)")
print(f"[seed] watch_dir = {args.watch}")
print(f"[seed] state = {args.state}")
print(f"[seed] max_age = {args.max_age_days} days")
state = ForwardState(args.state)
print(f"[seed] state currently has {state.count()} entries")
seed_state_from_folder(
args.watch, state,
max_age_days=args.max_age_days,
logger=lambda m: print(m),
)
print(f"[seed] state now has {state.count()} entries")
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())