824 lines
29 KiB
Python
824 lines
29 KiB
Python
"""
|
|
event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event
|
|
files to a seismo-relay SFM server.
|
|
|
|
Walks the same `THORDATA_PATH/<Project>/<Unit>/` tree the heartbeat path
|
|
scans. For each event binary that hasn't been forwarded yet, pairs it
|
|
with its `<unit>/TXT/<basename>.txt` ASCII report (when available) and
|
|
POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one
|
|
multipart request.
|
|
|
|
This is a port of `series3-watcher/event_forwarder.py` adapted for the
|
|
Thor file layout. Key differences from the series3 forwarder:
|
|
|
|
- **Filenames are literal `<SERIAL>_<YYYYMMDDHHMMSS>.IDFH|.IDFW`** —
|
|
no base36 stem; the serial is right there in the prefix.
|
|
- **TXT sidecars live in a `TXT/` subfolder** next to the binaries
|
|
(Thor's exporter writes them there, not alongside the binary).
|
|
- **IDFH and IDFW are forwarded as independent events.** Each has
|
|
its own sha256-keyed state entry and its own POST. A single
|
|
timestamp can produce both a histogram (IDFH) and a waveform (IDFW),
|
|
and the seismo-relay endpoint dedupes on (serial, timestamp, kind),
|
|
so treating them as separate rows is the right model.
|
|
|
|
Design notes
|
|
────────────
|
|
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
|
|
Multipart encoding is hand-rolled.
|
|
- **Idempotent across restarts.** Forwarded files are tracked by
|
|
sha256 in a JSON state file (default: `<log_dir>/thor_forwarded.json`).
|
|
Re-scanning the watch tree doesn't re-POST anything.
|
|
- **Default-off.** Callers must enable via config
|
|
(`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x
|
|
deployments that auto-update stay non-forwarding until an operator
|
|
flips the switch.
|
|
- **Quiescence guard.** Files modified within the last few seconds
|
|
are skipped — Thor writes the .txt after the binary, so we wait
|
|
until both look stable before forwarding.
|
|
- **Best-effort report pairing.** When the .txt hasn't appeared yet
|
|
but the binary is older than `missing_report_grace_seconds`, the
|
|
binary is forwarded alone (seismo-relay accepts that and just skips
|
|
the rich fields — we'd rather get the binary indexed than block
|
|
forever waiting for a TXT that may never arrive, e.g. operator
|
|
hasn't enabled the TXT export in Thor).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Default tuning. All overridable via config.json sfm_* keys.
|
|
DEFAULT_QUIESCENCE_SECONDS = 5
|
|
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60
|
|
DEFAULT_HTTP_TIMEOUT = 60.0
|
|
STATE_SCHEMA_VERSION = 1
|
|
|
|
|
|
# ── Filename matching ─────────────────────────────────────────────────────────
|
|
#
|
|
# Thor (Micromate Series IV) filename scheme:
|
|
# <SERIAL>_<YYYYMMDDHHMMSS>.<KIND>
|
|
# where SERIAL is the literal device serial (e.g. UM11719, BE9439),
|
|
# and KIND is IDFH (histogram) or IDFW (waveform).
|
|
#
|
|
# Examples:
|
|
# UM11719_20231219163444.IDFH
|
|
# UM11719_20231219162723.IDFW
|
|
# BE9439_20200713124251.IDFH
|
|
_EVENT_FILENAME_RE = re.compile(
|
|
r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Filenames we explicitly skip even if they look event-shaped.
|
|
_NON_EVENT_EXTS = {
|
|
".mlg", # monitor-log files (separate heartbeat path)
|
|
".txt", # ASCII reports — paired in, not primary
|
|
".csv", # operator-facing derivative
|
|
".html", # operator-facing derivative
|
|
".pdf", # operator-facing derivative
|
|
".xml", # operator-facing derivative
|
|
".cdb", # IDFW.CDB cache-database variant — skip
|
|
".log",
|
|
".ini",
|
|
".json",
|
|
".sfm.json",
|
|
".bak",
|
|
".tmp",
|
|
}
|
|
|
|
|
|
def is_event_binary(path: str) -> bool:
|
|
"""Return True if `path`'s basename looks like a Thor event binary."""
|
|
name = os.path.basename(path)
|
|
lname = name.lower()
|
|
# Explicit reject for compound extensions like .IDFW.CDB
|
|
if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"):
|
|
return False
|
|
if not _EVENT_FILENAME_RE.match(name):
|
|
return False
|
|
ext = os.path.splitext(name)[1].lower()
|
|
if ext in _NON_EVENT_EXTS:
|
|
return False
|
|
return True
|
|
|
|
|
|
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]:
|
|
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` -> (serial, timestamp, kind).
|
|
|
|
`kind` is the upper-case extension without the dot — "IDFH" or "IDFW".
|
|
Returns None if the filename doesn't match.
|
|
"""
|
|
m = _EVENT_FILENAME_RE.match(name)
|
|
if not m:
|
|
return None
|
|
serial = m.group(1).upper()
|
|
try:
|
|
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
|
|
except ValueError:
|
|
return None
|
|
kind = m.group(3).upper()
|
|
return serial, ts, kind
|
|
|
|
|
|
def idf_report_name(binary_name: str) -> str:
|
|
"""Thor TXT-export convention: append `.txt` to the binary basename.
|
|
|
|
UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt
|
|
"""
|
|
return binary_name + ".txt"
|
|
|
|
|
|
def idf_report_path(binary_path: str) -> str:
|
|
"""Compute the expected TXT sidecar path for a Thor event binary.
|
|
|
|
Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the
|
|
unit directory (verified against captured example-data). The
|
|
returned path is the *expected* location — caller is responsible
|
|
for checking that it actually exists.
|
|
"""
|
|
unit_dir = os.path.dirname(binary_path)
|
|
name = os.path.basename(binary_path)
|
|
return os.path.join(unit_dir, "TXT", idf_report_name(name))
|
|
|
|
|
|
def is_histogram_event(filename: str) -> bool:
|
|
"""True if this is an .IDFH (histogram) event. Used purely for
|
|
log clarity — Thor doesn't always export a TXT for histograms, so
|
|
a missing-report warning is suppressed for them."""
|
|
name = os.path.basename(filename)
|
|
return name.lower().endswith(".idfh")
|
|
|
|
|
|
def serial_from_filename(name: str) -> Optional[str]:
|
|
"""Extract the serial-number prefix from a Thor event filename."""
|
|
parsed = parse_event_filename(name)
|
|
if parsed is None:
|
|
return None
|
|
return parsed[0]
|
|
|
|
|
|
# ── State file ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class ForwardState:
|
|
"""Idempotency record: which event files have we already forwarded?
|
|
|
|
State file format (JSON):
|
|
|
|
{
|
|
"version": 1,
|
|
"forwarded": {
|
|
"<sha256>": {
|
|
"filename": "UM11719_20231219163444.IDFW",
|
|
"size": 8800,
|
|
"forwarded_at": "2026-05-19T...Z",
|
|
"had_report": true
|
|
},
|
|
...
|
|
}
|
|
}
|
|
|
|
Keyed by sha256 (not filename) so identical content is recognised
|
|
as already-forwarded even if the file moved or got renamed.
|
|
"""
|
|
|
|
def __init__(self, path: str):
|
|
self.path = path
|
|
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
try:
|
|
with open(self.path, "r", encoding="utf-8") as f:
|
|
d = json.load(f)
|
|
if not isinstance(d, dict):
|
|
raise ValueError("state file root is not an object")
|
|
if d.get("version") != STATE_SCHEMA_VERSION:
|
|
log.warning(
|
|
"forward state version mismatch (got %r, want %d) — starting fresh",
|
|
d.get("version"), STATE_SCHEMA_VERSION,
|
|
)
|
|
return
|
|
forwarded = d.get("forwarded")
|
|
if isinstance(forwarded, dict):
|
|
self._data["forwarded"] = forwarded
|
|
except FileNotFoundError:
|
|
pass
|
|
except (OSError, ValueError, json.JSONDecodeError) as exc:
|
|
log.warning("failed to load forward state from %s: %s", self.path, exc)
|
|
|
|
def _save(self) -> None:
|
|
tmp = self.path + ".tmp"
|
|
try:
|
|
d = os.path.dirname(self.path)
|
|
if d and not os.path.isdir(d):
|
|
os.makedirs(d, exist_ok=True)
|
|
with open(tmp, "w", encoding="utf-8") as f:
|
|
json.dump(self._data, f, indent=2, sort_keys=True)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp, self.path)
|
|
except OSError as exc:
|
|
log.warning("failed to save forward state to %s: %s", self.path, exc)
|
|
|
|
def is_forwarded(self, sha256: str) -> bool:
|
|
return sha256 in self._data["forwarded"]
|
|
|
|
def status(self, sha256: str) -> Optional[bool]:
|
|
"""Return forwarding status for *sha256*.
|
|
|
|
Returns:
|
|
None — never forwarded. Eligible for a fresh forward.
|
|
True — forwarded successfully with its paired report.
|
|
NOT a candidate for re-forward.
|
|
False — forwarded WITHOUT its paired `.txt`. Eligible for
|
|
re-forward IF the TXT now exists, so seismo-relay's
|
|
upsert refreshes the DB row with authoritative
|
|
device-side values.
|
|
|
|
Legacy entries without a `had_report` key default to True so
|
|
an upgrade doesn't unexpectedly re-forward every entry.
|
|
"""
|
|
entry = self._data["forwarded"].get(sha256)
|
|
if entry is None:
|
|
return None
|
|
return bool(entry.get("had_report", True))
|
|
|
|
def mark_forwarded(
|
|
self,
|
|
sha256: str,
|
|
filename: str,
|
|
size: int,
|
|
had_report: bool = True,
|
|
) -> None:
|
|
"""Record a successful forward.
|
|
|
|
Set `had_report=False` when the forward shipped the binary
|
|
without its paired ASCII report. Such entries are re-checked
|
|
on subsequent scans and re-forwarded once the TXT appears.
|
|
|
|
Idempotent: re-marking an existing sha256 with `had_report=True`
|
|
is the explicit promotion path used when a re-pair succeeds.
|
|
"""
|
|
self._data["forwarded"][sha256] = {
|
|
"filename": filename,
|
|
"size": size,
|
|
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
|
"had_report": had_report,
|
|
}
|
|
self._save()
|
|
|
|
def count(self) -> int:
|
|
return len(self._data["forwarded"])
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def sha256_of_file(path: str) -> str:
|
|
h = hashlib.sha256()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
|
|
"""Return True if the file's mtime is at least `quiescence_seconds`
|
|
in the past — i.e. no longer being written."""
|
|
try:
|
|
mtime = os.path.getmtime(path)
|
|
except OSError:
|
|
return False
|
|
return (now_ts - mtime) >= quiescence_seconds
|
|
|
|
|
|
def _iter_unit_dirs(thordata_root: str):
|
|
"""Yield (project_name, unit_name, unit_path) for every Thor unit
|
|
folder beneath `thordata_root`.
|
|
|
|
THORDATA layout: <root>/<Project>/<UnitSerial>/
|
|
"""
|
|
if not os.path.isdir(thordata_root):
|
|
return
|
|
try:
|
|
projects = os.listdir(thordata_root)
|
|
except OSError:
|
|
return
|
|
for proj in projects:
|
|
proj_path = os.path.join(thordata_root, proj)
|
|
if not os.path.isdir(proj_path):
|
|
continue
|
|
try:
|
|
units = os.listdir(proj_path)
|
|
except OSError:
|
|
continue
|
|
for unit in units:
|
|
unit_path = os.path.join(proj_path, unit)
|
|
if not os.path.isdir(unit_path):
|
|
continue
|
|
yield proj, unit, unit_path
|
|
|
|
|
|
def _find_txt_in_unit(unit_path: str, binary_name: str,
|
|
now_ts: float, quiescence_seconds: float,
|
|
_txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]:
|
|
"""Look up the matching `.txt` sidecar for `binary_name` inside
|
|
`<unit_path>/TXT/`. Case-insensitive on Windows-shaped paths.
|
|
|
|
Returns the absolute path if a quiescent sidecar exists, else None.
|
|
`_txt_cache` is mutated to memoize the lower-case → actual-name
|
|
map for each unit so repeated lookups in one scan don't restat.
|
|
"""
|
|
expected = idf_report_name(binary_name)
|
|
|
|
if unit_path not in _txt_cache:
|
|
txt_dir = os.path.join(unit_path, "TXT")
|
|
listing: Dict[str, str] = {}
|
|
if os.path.isdir(txt_dir):
|
|
try:
|
|
for n in os.listdir(txt_dir):
|
|
listing[n.lower()] = n
|
|
except OSError:
|
|
pass
|
|
_txt_cache[unit_path] = listing
|
|
|
|
listing = _txt_cache[unit_path]
|
|
actual = listing.get(expected.lower())
|
|
if not actual:
|
|
return None
|
|
candidate = os.path.join(unit_path, "TXT", actual)
|
|
if _is_quiescent(candidate, now_ts, quiescence_seconds):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
# ── Scan pass ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def find_pending_events(
|
|
thordata_root: str,
|
|
state: ForwardState,
|
|
*,
|
|
max_age_days: int,
|
|
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
|
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
|
max_per_pass: int = 0,
|
|
) -> List[Tuple[str, Optional[str]]]:
|
|
"""
|
|
Walk `thordata_root` and return the list of (binary_path, txt_path_or_None)
|
|
pairs that need forwarding.
|
|
|
|
Filtering rules:
|
|
- Filename must match the Thor event filename regex.
|
|
- File must be quiescent (mtime >= quiescence_seconds in the past).
|
|
- File must not exceed `max_age_days`.
|
|
- File's sha256 must NOT already be in the forwarded state
|
|
(unless it was forwarded without its TXT and the TXT is now
|
|
present — see ForwardState.status).
|
|
- If a `<unit>/TXT/<basename>.txt` exists and is quiescent,
|
|
we pair them. Otherwise, if the binary is older than
|
|
missing_report_grace_seconds, we forward without the TXT.
|
|
Younger binaries with a missing TXT are deferred.
|
|
- When `max_per_pass > 0`, return at most that many pairs.
|
|
Older files (lower mtime) are forwarded first so backfill
|
|
proceeds chronologically.
|
|
"""
|
|
if not os.path.isdir(thordata_root):
|
|
log.warning("forward scan: thordata root not found: %s", thordata_root)
|
|
return []
|
|
|
|
now_ts = time.time()
|
|
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
|
|
|
# First pass: collect every event-binary entry with mtime for sorting.
|
|
candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path)
|
|
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
|
|
try:
|
|
entries = list(os.scandir(unit_path))
|
|
except OSError:
|
|
continue
|
|
for e in entries:
|
|
if not e.is_file():
|
|
continue
|
|
if not is_event_binary(e.path):
|
|
continue
|
|
try:
|
|
mtime = e.stat().st_mtime
|
|
except OSError:
|
|
continue
|
|
if (now_ts - mtime) > max_age_seconds:
|
|
continue
|
|
candidates.append((mtime, unit_path, e.path))
|
|
|
|
# Sort oldest-first so backfill is chronological.
|
|
candidates.sort(key=lambda t: t[0])
|
|
|
|
pending: List[Tuple[str, Optional[str]]] = []
|
|
skipped_inflight = 0
|
|
skipped_already_forwarded = 0
|
|
txt_cache: Dict[str, Dict[str, str]] = {}
|
|
|
|
for mtime, unit_path, binary_path in candidates:
|
|
if not _is_quiescent(binary_path, now_ts, quiescence_seconds):
|
|
skipped_inflight += 1
|
|
continue
|
|
|
|
try:
|
|
digest = sha256_of_file(binary_path)
|
|
except OSError as exc:
|
|
log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc)
|
|
continue
|
|
fwd_status = state.status(digest)
|
|
if fwd_status is True:
|
|
skipped_already_forwarded += 1
|
|
continue
|
|
|
|
binary_name = os.path.basename(binary_path)
|
|
txt_path = _find_txt_in_unit(
|
|
unit_path, binary_name, now_ts, quiescence_seconds, txt_cache,
|
|
)
|
|
|
|
if fwd_status is False:
|
|
# Previously forwarded WITHOUT report. Re-forward only
|
|
# if the TXT is now present so seismo-relay's upsert can
|
|
# refresh the row with authoritative device values.
|
|
if txt_path is None:
|
|
skipped_already_forwarded += 1
|
|
continue
|
|
elif txt_path is None:
|
|
# First-time forward and TXT not yet present. Wait for
|
|
# the grace period before forwarding alone.
|
|
if (now_ts - mtime) < missing_report_grace_seconds:
|
|
skipped_inflight += 1
|
|
continue
|
|
|
|
pending.append((binary_path, txt_path))
|
|
|
|
if max_per_pass and len(pending) >= max_per_pass:
|
|
break
|
|
|
|
log.debug(
|
|
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
|
|
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
|
|
)
|
|
return pending
|
|
|
|
|
|
# ── Multipart upload ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _encode_multipart(
|
|
parts: List[Tuple[str, str, str, bytes]],
|
|
) -> Tuple[bytes, str]:
|
|
"""Encode a list of (field_name, filename, content_type, data) tuples
|
|
as a multipart/form-data body. Returns (body_bytes, content_type
|
|
header value)."""
|
|
boundary = "----ThorWatcherBoundary" + os.urandom(8).hex()
|
|
chunks: List[bytes] = []
|
|
for field_name, filename, content_type, data in parts:
|
|
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
|
|
chunks.append(
|
|
(f'Content-Disposition: form-data; name="{field_name}"; '
|
|
f'filename="{filename}"\r\n').encode("ascii")
|
|
)
|
|
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
|
|
chunks.append(data)
|
|
chunks.append(b"\r\n")
|
|
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
|
|
body = b"".join(chunks)
|
|
content_type_hdr = f"multipart/form-data; boundary={boundary}"
|
|
return body, content_type_hdr
|
|
|
|
|
|
def _import_endpoint(sfm_url: str) -> str:
|
|
"""Compose the import endpoint URL from a base SFM URL."""
|
|
return sfm_url.rstrip("/") + "/db/import/idf_file"
|
|
|
|
|
|
def forward_event_pair(
|
|
sfm_url: str,
|
|
binary_path: str,
|
|
txt_path: Optional[str],
|
|
*,
|
|
serial_hint: Optional[str] = None,
|
|
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
|
) -> Dict[str, Any]:
|
|
"""POST a single event (binary + optional .txt) to the SFM import
|
|
endpoint.
|
|
|
|
Returns a dict mirroring the per-file outcome the server returned
|
|
on success, or a dict with `status="error"` on transport/HTTP
|
|
failure.
|
|
"""
|
|
binary_name = os.path.basename(binary_path)
|
|
with open(binary_path, "rb") as f:
|
|
binary_bytes = f.read()
|
|
|
|
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
|
|
if txt_path is not None:
|
|
with open(txt_path, "rb") as f:
|
|
txt_bytes = f.read()
|
|
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
|
|
|
|
body, content_type = _encode_multipart(parts)
|
|
|
|
# Auto-derive serial from filename if caller didn't supply one.
|
|
if not serial_hint:
|
|
serial_hint = serial_from_filename(binary_name)
|
|
|
|
url = _import_endpoint(sfm_url)
|
|
if serial_hint:
|
|
sep = "&" if "?" in url else "?"
|
|
url = f"{url}{sep}serial={serial_hint}"
|
|
|
|
req = urllib.request.Request(
|
|
url, data=body, method="POST",
|
|
headers={
|
|
"Content-Type": content_type,
|
|
"Content-Length": str(len(body)),
|
|
"User-Agent": "thor-watcher/sfm-forwarder",
|
|
"Accept": "application/json",
|
|
},
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read().decode("utf-8", errors="replace")
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"status": "error",
|
|
"filename": binary_name,
|
|
"detail": f"server returned non-JSON: {raw[:200]!r}",
|
|
}
|
|
for entry in (payload.get("results") or []):
|
|
if entry.get("filename") == binary_name and entry.get("status") == "ok":
|
|
return entry
|
|
for entry in (payload.get("results") or []):
|
|
if entry.get("filename") == binary_name:
|
|
return entry
|
|
return {
|
|
"status": "error",
|
|
"filename": binary_name,
|
|
"detail": f"unexpected server response: {payload!r}",
|
|
}
|
|
except urllib.error.HTTPError as exc:
|
|
try:
|
|
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
|
|
except Exception:
|
|
body_excerpt = ""
|
|
return {
|
|
"status": "error",
|
|
"filename": binary_name,
|
|
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
|
|
}
|
|
except urllib.error.URLError as exc:
|
|
return {
|
|
"status": "error",
|
|
"filename": binary_name,
|
|
"detail": f"connection error: {exc.reason}",
|
|
}
|
|
except (OSError, TimeoutError) as exc:
|
|
return {
|
|
"status": "error",
|
|
"filename": binary_name,
|
|
"detail": f"transport error: {exc}",
|
|
}
|
|
|
|
|
|
# ── Top-level orchestration ───────────────────────────────────────────────────
|
|
|
|
|
|
def forward_pending(
|
|
thordata_root: str,
|
|
sfm_url: str,
|
|
state: ForwardState,
|
|
*,
|
|
max_age_days: int,
|
|
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
|
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
|
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
|
max_per_pass: int = 0,
|
|
logger: Optional[Any] = None,
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Run one full pass: find pending events, POST each one, update state.
|
|
|
|
Returns a counts dict suitable for logging:
|
|
|
|
{
|
|
"scanned": <int>, # event binaries selected for forward
|
|
"forwarded": <int>, # successfully POSTed this pass
|
|
"errors": <int>, # POST failures (will retry next pass)
|
|
"with_report":<int>, # of forwarded, how many had a paired TXT
|
|
}
|
|
"""
|
|
def _log(msg: str) -> None:
|
|
if logger:
|
|
logger(msg)
|
|
else:
|
|
log.info(msg)
|
|
|
|
pending = find_pending_events(
|
|
thordata_root, state,
|
|
max_age_days=max_age_days,
|
|
quiescence_seconds=quiescence_seconds,
|
|
missing_report_grace_seconds=missing_report_grace_seconds,
|
|
max_per_pass=max_per_pass,
|
|
)
|
|
|
|
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
|
|
|
|
for binary_path, txt_path in pending:
|
|
result = forward_event_pair(
|
|
sfm_url, binary_path, txt_path,
|
|
timeout=timeout,
|
|
)
|
|
if result.get("status") == "ok":
|
|
try:
|
|
digest = sha256_of_file(binary_path)
|
|
size = os.path.getsize(binary_path)
|
|
state.mark_forwarded(
|
|
digest,
|
|
os.path.basename(binary_path),
|
|
size,
|
|
had_report=(txt_path is not None),
|
|
)
|
|
except OSError as exc:
|
|
_log(f"[forward] post-success state save failed for "
|
|
f"{os.path.basename(binary_path)}: {exc}")
|
|
counts["forwarded"] += 1
|
|
if txt_path:
|
|
counts["with_report"] += 1
|
|
|
|
if txt_path:
|
|
report_token = "+ {} attached".format(os.path.basename(txt_path))
|
|
elif is_histogram_event(binary_path):
|
|
report_token = "(histogram, no report expected)"
|
|
else:
|
|
report_token = "no report"
|
|
|
|
_log(
|
|
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
|
|
os.path.basename(binary_path),
|
|
result.get("filesize", 0),
|
|
report_token,
|
|
result.get("inserted", 0),
|
|
result.get("skipped", 0),
|
|
)
|
|
)
|
|
else:
|
|
counts["errors"] += 1
|
|
_log(
|
|
f"[forward] ERR {os.path.basename(binary_path)}: "
|
|
f"{result.get('detail', 'unknown error')}"
|
|
)
|
|
|
|
return counts
|
|
|
|
|
|
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
|
|
|
|
|
|
def seed_state_from_folder(
|
|
thordata_root: str,
|
|
state: ForwardState,
|
|
*,
|
|
max_age_days: int = 365,
|
|
logger: Optional[Any] = None,
|
|
) -> Dict[str, int]:
|
|
"""Walk `thordata_root` and mark every existing event binary as
|
|
already forwarded — without POSTing anything.
|
|
|
|
Run this ONCE before enabling sfm_forward_enabled on a machine
|
|
with a large historical archive. The watcher then starts
|
|
forwarding only events that appear AFTER the seed run.
|
|
|
|
Returns a counts dict:
|
|
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
|
|
"""
|
|
def _log(msg: str) -> None:
|
|
if logger:
|
|
logger(msg)
|
|
else:
|
|
log.info(msg)
|
|
|
|
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
|
|
|
|
if not os.path.isdir(thordata_root):
|
|
_log(f"[seed] thordata root not found: {thordata_root}")
|
|
return counts
|
|
|
|
now_ts = time.time()
|
|
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
|
|
|
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
|
|
try:
|
|
entries = [e for e in os.scandir(unit_path) if e.is_file()]
|
|
except OSError:
|
|
continue
|
|
for e in entries:
|
|
if not is_event_binary(e.path):
|
|
continue
|
|
counts["scanned"] += 1
|
|
try:
|
|
mtime = e.stat().st_mtime
|
|
size = e.stat().st_size
|
|
except OSError:
|
|
continue
|
|
if (now_ts - mtime) > max_age_seconds:
|
|
counts["skipped_too_old"] += 1
|
|
continue
|
|
try:
|
|
digest = sha256_of_file(e.path)
|
|
except OSError as exc:
|
|
_log(f"[seed] sha256 failed for {e.path}: {exc}")
|
|
continue
|
|
if state.is_forwarded(digest):
|
|
counts["already_known"] += 1
|
|
continue
|
|
state.mark_forwarded(digest, e.name, size)
|
|
counts["seeded"] += 1
|
|
if counts["seeded"] % 1000 == 0:
|
|
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
|
|
|
|
_log(
|
|
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
|
|
f"already_known={counts['already_known']} "
|
|
f"skipped_too_old={counts['skipped_too_old']}"
|
|
)
|
|
return counts
|
|
|
|
|
|
# ── CLI entry point ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def _main() -> int:
|
|
"""Command-line interface for one-shot operations.
|
|
|
|
python event_forwarder.py --seed-state \\
|
|
--thordata "C:\\THORDATA" \\
|
|
--state "<path/to/thor_forwarded.json>" \\
|
|
[--max-age-days 365]
|
|
"""
|
|
import argparse
|
|
parser = argparse.ArgumentParser(
|
|
description="Thor Watcher — SFM event forwarder utilities",
|
|
)
|
|
parser.add_argument(
|
|
"--seed-state", action="store_true",
|
|
help="Mark every event binary in --thordata as already-forwarded "
|
|
"(without POSTing). Use this BEFORE enabling sfm_forward "
|
|
"on a machine with a large historical archive.",
|
|
)
|
|
parser.add_argument(
|
|
"--thordata", required=True,
|
|
help="Path to the THORDATA root folder.",
|
|
)
|
|
parser.add_argument(
|
|
"--state", required=True,
|
|
help="Path to the JSON state file. Will be created if missing.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-age-days", type=int, default=365,
|
|
help="Only seed files newer than this many days (default 365).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if not args.seed_state:
|
|
parser.error("specify --seed-state (no other modes supported yet)")
|
|
|
|
print(f"[seed] thordata = {args.thordata}")
|
|
print(f"[seed] state = {args.state}")
|
|
print(f"[seed] max_age = {args.max_age_days} days")
|
|
|
|
state = ForwardState(args.state)
|
|
print(f"[seed] state currently has {state.count()} entries")
|
|
seed_state_from_folder(
|
|
args.thordata, state,
|
|
max_age_days=args.max_age_days,
|
|
logger=lambda m: print(m),
|
|
)
|
|
print(f"[seed] state now has {state.count()} entries")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
sys.exit(_main())
|