feat(forward): SFM event forwarder (v1.5.0)
When SFM_FORWARD_ENABLED=true and SFM_URL is set, every Blastware
event binary in the ACH watch folder is forwarded to an SFM server's
/db/import/blastware_file endpoint as a multipart POST. The paired
<binary>.TXT ASCII report (which Blastware's ACH writes alongside
each event) is shipped in the same request, letting the SFM server
index the full per-channel stats — PPV, ZC Freq, Time of Peak, Peak
Acceleration / Displacement, Peak Vector Sum + time, sensor
self-check Pass/Fail per channel, and monitor-log timestamps —
without depending on the still-undecoded BW waveform body codec.
New module event_forwarder.py:
- is_event_binary() filename matcher (BW's <P><serial3><stem>.<ext>
scheme; rejects .MLG, .TXT, .log, .ini, .h5, etc.)
- ForwardState (.json file keyed by sha256 — idempotent across
restarts and auto-updates)
- find_pending_events() with quiescence + grace-period guards
- Hand-rolled multipart encoder (stdlib-only)
- forward_event_pair() / forward_pending() — POST loop with
structured per-event outcomes
Wired into series3_watcher.run_watcher() on its own cadence
(SFM_FORWARD_INTERVAL_SECONDS, default 60s) so it doesn't slow the
existing 5-min heartbeat scan.
Default-off: existing 1.4.x deployments keep their old behaviour
after auto-updating until an operator sets SFM_URL +
SFM_FORWARD_ENABLED=true and restarts.
17 unit tests in test_event_forwarder.py cover filename matching,
state idempotency, scan logic (quiescence, grace, max age,
already-forwarded, .TXT pairing), multipart byte shape, and an
end-to-end POST against a tiny stdlib http.server fake.
Bumps version 1.4.4 → 1.5.0 (minor — additive feature, no API break).
Requires SFM server v0.16+ for the paired-.TXT import endpoint.
This commit is contained in:
@@ -0,0 +1,504 @@
|
||||
"""
|
||||
event_forwarder.py — forward Blastware event files to an SFM server.
|
||||
|
||||
Watches the same Blastware ACH folder the heartbeat path watches.
|
||||
For each event binary that hasn't been forwarded yet, pairs it with
|
||||
its `<binary>.TXT` report (when available) and POSTs both to SFM's
|
||||
`/db/import/blastware_file` endpoint as one multipart request.
|
||||
|
||||
The receiving SFM server (seismo-relay v0.16+) detects paired binaries
|
||||
and reports by filename, parses the .TXT into structured fields
|
||||
(per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak
|
||||
Displacement / sensor self-check / monitor log), and persists every
|
||||
field into the SFM database for sortable / filterable monthly-summary
|
||||
review.
|
||||
|
||||
Design notes
|
||||
────────────
|
||||
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
|
||||
Multipart encoding is hand-rolled.
|
||||
- **Idempotent across restarts.** Forwarded files are tracked by
|
||||
sha256 in a JSON state file (`.forwarded.json` next to config.ini).
|
||||
Re-scanning the watch folder doesn't re-POST anything.
|
||||
- **Default-off.** Callers must enable via config
|
||||
(`SFM_FORWARD_ENABLED=true` + `SFM_URL=...`). Existing 1.4.x
|
||||
deployments that auto-update to the new version stay non-forwarding
|
||||
until an operator flips the switch.
|
||||
- **Quiescence guard.** Files modified within the last few seconds
|
||||
are skipped — Blastware ACH writes the .TXT after the binary, so
|
||||
we wait until both look stable before forwarding.
|
||||
- **Best-effort report pairing.** When the .TXT hasn't appeared yet
|
||||
but the binary is older than `MISSING_REPORT_GRACE_SECONDS`, the
|
||||
binary is forwarded alone (the SFM endpoint accepts that and just
|
||||
skips the rich fields — we'd rather get the binary indexed than
|
||||
block forever waiting for a TXT that never arrived).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Default tuning. All overridable via config.ini SFM_* keys.
|
||||
DEFAULT_QUIESCENCE_SECONDS = 5 # don't touch a file modified in the last N seconds
|
||||
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 # forward without .TXT if it hasn't shown up after N seconds
|
||||
DEFAULT_HTTP_TIMEOUT = 60.0 # per-request timeout
|
||||
STATE_SCHEMA_VERSION = 1
|
||||
|
||||
|
||||
# ── Filename matching ─────────────────────────────────────────────────────────
|
||||
#
|
||||
# Blastware's filename scheme (confirmed in seismo-relay docs):
|
||||
# prefix_letter (B–Z) + 3-digit serial-tail + 4-char base36 timestamp stem
|
||||
# + "." + 3-or-4-char extension.
|
||||
# Examples: M529LK44.AB0, S353L4H0.3M0W, P036L318.C80H, M529LIY6.N00.
|
||||
#
|
||||
# We accept lowercase too because some filesystems lower-case names.
|
||||
_EVENT_FILENAME_RE = re.compile(
|
||||
r"^[A-Za-z][0-9]{3}[A-Za-z0-9]{4}\.[A-Za-z0-9]{3,4}$"
|
||||
)
|
||||
|
||||
# Filenames we explicitly skip even if they happen to match the regex.
|
||||
_NON_EVENT_EXTS = {
|
||||
".mlg", # monitor-log files (separate heartbeat path)
|
||||
".txt", # ASCII reports — handled via pairing, not as primary files
|
||||
".log",
|
||||
".ini",
|
||||
".dat",
|
||||
".bak",
|
||||
".tmp",
|
||||
".pkl", # SFM A5 pickles (shouldn't appear in a BW folder, but defence)
|
||||
".h5",
|
||||
".sfm.json",
|
||||
".json",
|
||||
}
|
||||
|
||||
|
||||
def is_event_binary(path: str) -> bool:
|
||||
"""Return True if `path`'s basename looks like a Blastware event binary."""
|
||||
name = os.path.basename(path)
|
||||
if not _EVENT_FILENAME_RE.match(name):
|
||||
return False
|
||||
ext = os.path.splitext(name)[1].lower()
|
||||
if ext in _NON_EVENT_EXTS:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def report_path_for(binary_path: str) -> str:
|
||||
"""Return the conventional `<binary>.TXT` partner path."""
|
||||
return binary_path + ".TXT"
|
||||
|
||||
|
||||
# ── State file ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class ForwardState:
|
||||
"""Idempotency record: which event files have we already forwarded?
|
||||
|
||||
State file format (JSON):
|
||||
|
||||
{
|
||||
"version": 1,
|
||||
"forwarded": {
|
||||
"<sha256>": {
|
||||
"filename": "M529LK44.AB0",
|
||||
"size": 4400,
|
||||
"forwarded_at": "2026-05-08T...Z"
|
||||
},
|
||||
...
|
||||
}
|
||||
}
|
||||
|
||||
Keyed by sha256 (not filename) so that re-saved or re-uploaded
|
||||
identical content is recognised as already-forwarded even if the
|
||||
file moved or got renamed. Filename is preserved for human
|
||||
inspection.
|
||||
"""
|
||||
|
||||
def __init__(self, path: str):
|
||||
self.path = path
|
||||
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
|
||||
self._load()
|
||||
|
||||
def _load(self) -> None:
|
||||
try:
|
||||
with open(self.path, "r", encoding="utf-8") as f:
|
||||
d = json.load(f)
|
||||
if not isinstance(d, dict):
|
||||
raise ValueError("state file root is not an object")
|
||||
if d.get("version") != STATE_SCHEMA_VERSION:
|
||||
log.warning(
|
||||
"forward state version mismatch (got %r, want %d) — starting fresh",
|
||||
d.get("version"), STATE_SCHEMA_VERSION,
|
||||
)
|
||||
return
|
||||
forwarded = d.get("forwarded")
|
||||
if isinstance(forwarded, dict):
|
||||
self._data["forwarded"] = forwarded
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except (OSError, ValueError, json.JSONDecodeError) as exc:
|
||||
log.warning("failed to load forward state from %s: %s", self.path, exc)
|
||||
|
||||
def _save(self) -> None:
|
||||
tmp = self.path + ".tmp"
|
||||
try:
|
||||
with open(tmp, "w", encoding="utf-8") as f:
|
||||
json.dump(self._data, f, indent=2, sort_keys=True)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, self.path)
|
||||
except OSError as exc:
|
||||
log.warning("failed to save forward state to %s: %s", self.path, exc)
|
||||
|
||||
def is_forwarded(self, sha256: str) -> bool:
|
||||
return sha256 in self._data["forwarded"]
|
||||
|
||||
def mark_forwarded(self, sha256: str, filename: str, size: int) -> None:
|
||||
self._data["forwarded"][sha256] = {
|
||||
"filename": filename,
|
||||
"size": size,
|
||||
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||||
}
|
||||
self._save()
|
||||
|
||||
def count(self) -> int:
|
||||
return len(self._data["forwarded"])
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def sha256_of_file(path: str) -> str:
|
||||
h = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(65536), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
|
||||
"""Return True if the file's mtime is at least `quiescence_seconds`
|
||||
in the past — i.e. no longer being written."""
|
||||
try:
|
||||
mtime = os.path.getmtime(path)
|
||||
except OSError:
|
||||
return False
|
||||
return (now_ts - mtime) >= quiescence_seconds
|
||||
|
||||
|
||||
# ── Scan pass ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def find_pending_events(
|
||||
watch_dir: str,
|
||||
state: ForwardState,
|
||||
*,
|
||||
max_age_days: int,
|
||||
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||||
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||||
) -> List[Tuple[str, Optional[str]]]:
|
||||
"""
|
||||
Walk `watch_dir` and return the list of (binary_path, txt_path_or_None)
|
||||
pairs that need forwarding.
|
||||
|
||||
Filtering rules:
|
||||
- Filename must match the BW event filename regex.
|
||||
- File must be quiescent (mtime >= quiescence_seconds in the past).
|
||||
- File must not exceed `max_age_days` (matches the heartbeat
|
||||
path's MAX_EVENT_AGE_DAYS — keeps deep archives out of the
|
||||
forwarder).
|
||||
- File's sha256 must NOT already be in the forwarded state.
|
||||
- If a `<binary>.TXT` exists and is quiescent, we pair them.
|
||||
Otherwise, if the binary is older than
|
||||
missing_report_grace_seconds, we forward without the TXT.
|
||||
Younger binaries with a missing TXT are deferred — let BW
|
||||
finish writing the report.
|
||||
"""
|
||||
if not os.path.isdir(watch_dir):
|
||||
log.warning("forward scan: watch dir not found: %s", watch_dir)
|
||||
return []
|
||||
|
||||
now_ts = time.time()
|
||||
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
||||
|
||||
pending: List[Tuple[str, Optional[str]]] = []
|
||||
skipped_inflight = 0
|
||||
skipped_already_forwarded = 0
|
||||
|
||||
try:
|
||||
with os.scandir(watch_dir) as it:
|
||||
entries = list(it)
|
||||
except OSError as exc:
|
||||
log.warning("forward scan: scandir failed on %s: %s", watch_dir, exc)
|
||||
return []
|
||||
|
||||
# Cache existence of TXT partners so we don't stat() each twice.
|
||||
names = {e.name for e in entries if e.is_file()}
|
||||
|
||||
for e in entries:
|
||||
if not e.is_file():
|
||||
continue
|
||||
if not is_event_binary(e.path):
|
||||
continue
|
||||
|
||||
try:
|
||||
mtime = e.stat().st_mtime
|
||||
size = e.stat().st_size
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
# Out-of-window: too old or too fresh
|
||||
if (now_ts - mtime) > max_age_seconds:
|
||||
continue
|
||||
if not _is_quiescent(e.path, now_ts, quiescence_seconds):
|
||||
skipped_inflight += 1
|
||||
continue
|
||||
|
||||
# Idempotency: skip if we already forwarded this content
|
||||
try:
|
||||
digest = sha256_of_file(e.path)
|
||||
except OSError as exc:
|
||||
log.warning("forward scan: sha256 failed for %s: %s", e.path, exc)
|
||||
continue
|
||||
if state.is_forwarded(digest):
|
||||
skipped_already_forwarded += 1
|
||||
continue
|
||||
|
||||
# TXT pairing
|
||||
txt_name = e.name + ".TXT"
|
||||
# Case-insensitive match on the .TXT suffix
|
||||
if txt_name not in names:
|
||||
txt_name_lc = txt_name.lower()
|
||||
txt_name = next((n for n in names if n.lower() == txt_name_lc), None)
|
||||
|
||||
txt_path: Optional[str] = None
|
||||
if txt_name:
|
||||
candidate = os.path.join(watch_dir, txt_name)
|
||||
if _is_quiescent(candidate, now_ts, quiescence_seconds):
|
||||
txt_path = candidate
|
||||
# else: TXT is mid-write; treat as not-yet-paired and defer.
|
||||
|
||||
if txt_path is None:
|
||||
# No TXT (or not yet quiescent). Wait for the grace
|
||||
# period before forwarding alone.
|
||||
if (now_ts - mtime) < missing_report_grace_seconds:
|
||||
skipped_inflight += 1
|
||||
continue
|
||||
|
||||
pending.append((e.path, txt_path))
|
||||
# Stash size + digest on the tuple-replacement for use during forward;
|
||||
# callers can re-derive but caching avoids a second sha256.
|
||||
|
||||
log.debug(
|
||||
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d",
|
||||
len(pending), skipped_inflight, skipped_already_forwarded,
|
||||
)
|
||||
return pending
|
||||
|
||||
|
||||
# ── Multipart upload ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _encode_multipart(
|
||||
parts: List[Tuple[str, str, str, bytes]],
|
||||
) -> Tuple[bytes, str]:
|
||||
"""Encode a list of (field_name, filename, content_type, data) tuples
|
||||
as a multipart/form-data body. Returns (body_bytes, content_type
|
||||
header value)."""
|
||||
boundary = "----Series3WatcherBoundary" + os.urandom(8).hex()
|
||||
chunks: List[bytes] = []
|
||||
for field_name, filename, content_type, data in parts:
|
||||
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
|
||||
chunks.append(
|
||||
(f'Content-Disposition: form-data; name="{field_name}"; '
|
||||
f'filename="{filename}"\r\n').encode("ascii")
|
||||
)
|
||||
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
|
||||
chunks.append(data)
|
||||
chunks.append(b"\r\n")
|
||||
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
|
||||
body = b"".join(chunks)
|
||||
content_type_hdr = f"multipart/form-data; boundary={boundary}"
|
||||
return body, content_type_hdr
|
||||
|
||||
|
||||
def _import_endpoint(sfm_url: str) -> str:
|
||||
"""Compose the import endpoint URL from a base SFM URL."""
|
||||
return sfm_url.rstrip("/") + "/db/import/blastware_file"
|
||||
|
||||
|
||||
def forward_event_pair(
|
||||
sfm_url: str,
|
||||
binary_path: str,
|
||||
txt_path: Optional[str],
|
||||
*,
|
||||
serial_hint: Optional[str] = None,
|
||||
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||||
) -> Dict[str, Any]:
|
||||
"""POST a single event (binary + optional .TXT) to the SFM import
|
||||
endpoint.
|
||||
|
||||
Returns a dict mirroring the per-file outcome the server returned
|
||||
(see /db/import/blastware_file response.results[0]) on success, or
|
||||
a dict with `status="error"` on transport/HTTP failure.
|
||||
"""
|
||||
binary_name = os.path.basename(binary_path)
|
||||
with open(binary_path, "rb") as f:
|
||||
binary_bytes = f.read()
|
||||
|
||||
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
|
||||
if txt_path is not None:
|
||||
with open(txt_path, "rb") as f:
|
||||
txt_bytes = f.read()
|
||||
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
|
||||
|
||||
body, content_type = _encode_multipart(parts)
|
||||
|
||||
url = _import_endpoint(sfm_url)
|
||||
if serial_hint:
|
||||
sep = "&" if "?" in url else "?"
|
||||
url = f"{url}{sep}serial={serial_hint}"
|
||||
|
||||
req = urllib.request.Request(
|
||||
url, data=body, method="POST",
|
||||
headers={
|
||||
"Content-Type": content_type,
|
||||
"Content-Length": str(len(body)),
|
||||
"User-Agent": "series3-watcher/sfm-forwarder",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
raw = resp.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return {
|
||||
"status": "error",
|
||||
"filename": binary_name,
|
||||
"detail": f"server returned non-JSON: {raw[:200]!r}",
|
||||
}
|
||||
# Server returns {"count":N, "results":[{...}]}. Pull our row out.
|
||||
for entry in (payload.get("results") or []):
|
||||
if entry.get("filename") == binary_name and entry.get("status") == "ok":
|
||||
return entry
|
||||
# No matching ok row → propagate the first error we find
|
||||
for entry in (payload.get("results") or []):
|
||||
if entry.get("filename") == binary_name:
|
||||
return entry
|
||||
return {
|
||||
"status": "error",
|
||||
"filename": binary_name,
|
||||
"detail": f"unexpected server response: {payload!r}",
|
||||
}
|
||||
except urllib.error.HTTPError as exc:
|
||||
try:
|
||||
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
|
||||
except Exception:
|
||||
body_excerpt = ""
|
||||
return {
|
||||
"status": "error",
|
||||
"filename": binary_name,
|
||||
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
|
||||
}
|
||||
except urllib.error.URLError as exc:
|
||||
return {
|
||||
"status": "error",
|
||||
"filename": binary_name,
|
||||
"detail": f"connection error: {exc.reason}",
|
||||
}
|
||||
except (OSError, TimeoutError) as exc:
|
||||
return {
|
||||
"status": "error",
|
||||
"filename": binary_name,
|
||||
"detail": f"transport error: {exc}",
|
||||
}
|
||||
|
||||
|
||||
# ── Top-level orchestration ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def forward_pending(
|
||||
watch_dir: str,
|
||||
sfm_url: str,
|
||||
state: ForwardState,
|
||||
*,
|
||||
max_age_days: int,
|
||||
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||||
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||||
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||||
logger: Optional[Any] = None,
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
Run one full pass: find pending events, POST each one, update state.
|
||||
|
||||
Returns a counts dict suitable for logging:
|
||||
|
||||
{
|
||||
"scanned": <int>, # total event binaries seen
|
||||
"forwarded": <int>, # successfully POSTed this pass
|
||||
"errors": <int>, # POST failures (will retry next pass)
|
||||
"with_report":<int>, # of forwarded, how many had a paired TXT
|
||||
}
|
||||
"""
|
||||
def _log(msg: str) -> None:
|
||||
if logger:
|
||||
logger(msg)
|
||||
else:
|
||||
log.info(msg)
|
||||
|
||||
pending = find_pending_events(
|
||||
watch_dir, state,
|
||||
max_age_days=max_age_days,
|
||||
quiescence_seconds=quiescence_seconds,
|
||||
missing_report_grace_seconds=missing_report_grace_seconds,
|
||||
)
|
||||
|
||||
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
|
||||
|
||||
for binary_path, txt_path in pending:
|
||||
result = forward_event_pair(
|
||||
sfm_url, binary_path, txt_path,
|
||||
timeout=timeout,
|
||||
)
|
||||
if result.get("status") == "ok":
|
||||
try:
|
||||
digest = sha256_of_file(binary_path)
|
||||
size = os.path.getsize(binary_path)
|
||||
state.mark_forwarded(digest, os.path.basename(binary_path), size)
|
||||
except OSError as exc:
|
||||
_log(f"[forward] post-success state save failed for "
|
||||
f"{os.path.basename(binary_path)}: {exc}")
|
||||
counts["forwarded"] += 1
|
||||
if txt_path:
|
||||
counts["with_report"] += 1
|
||||
_log(
|
||||
f"[forward] OK {os.path.basename(binary_path)} "
|
||||
f"({result.get('filesize', 0)}B, "
|
||||
f"{'with' if txt_path else 'no'} report, "
|
||||
f"inserted={result.get('inserted', 0)}, "
|
||||
f"skipped={result.get('skipped', 0)})"
|
||||
)
|
||||
else:
|
||||
counts["errors"] += 1
|
||||
_log(
|
||||
f"[forward] ERR {os.path.basename(binary_path)}: "
|
||||
f"{result.get('detail', 'unknown error')}"
|
||||
)
|
||||
|
||||
return counts
|
||||
Reference in New Issue
Block a user