Files
series3-watcher/series3_watcher.py
T
serversdown 770336e09f fix(forward): pair BW ACH ASCII reports using the _ASCII.TXT convention (v1.5.4)
Blastware's official Auto Call Home server writes per-event ASCII
reports as <stem>_<ext>_ASCII.TXT (e.g. N844L20G_630H_ASCII.TXT),
not <binary>.TXT (e.g. N844L20G.630H.TXT).  Versions v1.5.0–v1.5.3
only looked for the latter and silently shipped every binary alone,
so the SFM database lost the per-event Peak Acceleration / Peak
Displacement / ZC Freq / Time of Peak / Peak Vector Sum + time /
sensor self-check fields on every forwarded event.

Fix: pair-finding logic now tries the ACH-convention filename first
and falls back to <binary>.TXT for compatibility with operator-saved
manual exports and existing test fixtures.

  ach_report_name("M529LK44.AB0")    → "M529LK44_AB0_ASCII.TXT"
  legacy_report_name("M529LK44.AB0") → "M529LK44.AB0.TXT"

When both files exist (operator manually saved + ACH auto-exported),
ACH wins because that's the canonical name on modern BW deployments.
Both candidates checked case-insensitively against the cached
directory listing — no extra stat() calls.

6 new unit tests cover the new pairing logic, helper-function
correctness, and the precedence rule.  Total now 31 tests, all green.

Field-deploy note: re-running v1.5.4 on a folder where v1.5.0–v1.5.3
already ran will NOT re-forward historical events — the
sfm_forwarded.json state file remembers them by sha256.  To re-forward
historical events to populate SFM with the now-correctly-paired
reports, delete the state file before starting v1.5.4.
2026-05-10 20:10:38 +00:00

580 lines
22 KiB
Python

"""
Series 3 Watcher — v1.4.3
Environment:
- Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path
Key Features:
- Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
- Standardized SFM Telemetry JSON payload (source-agnostic)
- Periodic HTTP heartbeat POST to SFM backend
- Tray-friendly: run_watcher(state, stop_event) for background thread use
"""
import os
import re
import sys
import time
import json
import threading
import configparser
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple
from socket import gethostname
# ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [agent] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
# Ensure we have a section header
if not re.search(r"^\s*\[", txt, flags=re.M):
txt = "[agent]\n" + txt
cp.read_string(txt)
sec = cp["agent"]
def get_str(k: str, dflt: str) -> str:
return sec.get(k, dflt).strip()
def get_int(k: str, dflt: int) -> int:
try:
return int(sec.get(k, str(dflt)).strip())
except Exception:
return dflt
def get_bool(k: str, dflt: bool) -> bool:
v = sec.get(k, None)
if v is None:
return dflt
return v.strip().lower() in ("1", "true", "on", "yes", "y")
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
"Series3Watcher", "agent_logs", "series3_watcher.log"
)),
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
"MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365),
# API heartbeat / SFM telemetry
"API_ENABLED": get_bool("API_ENABLED", False),
"API_URL": get_str("API_URL", ""),
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_watcher"),
# Auto-updater source
"UPDATE_SOURCE": get_str("UPDATE_SOURCE", "gitea"),
"UPDATE_URL": get_str("UPDATE_URL", ""),
# SFM event forwarder — when enabled, forwards each Blastware
# event binary (+ paired .TXT report when present) to an SFM
# server's /db/import/blastware_file endpoint. Default-off so
# existing 1.4.x deployments don't change behaviour on
# auto-update; operators flip it on by setting SFM_URL +
# SFM_FORWARD_ENABLED=true in config.ini.
"SFM_FORWARD_ENABLED": get_bool("SFM_FORWARD_ENABLED", False),
"SFM_URL": get_str("SFM_URL", ""),
"SFM_FORWARD_INTERVAL_SECONDS": get_int("SFM_FORWARD_INTERVAL_SECONDS", 60),
# Files modified within the last N seconds are skipped (BW may
# still be writing them).
"SFM_QUIESCENCE_SECONDS": get_int("SFM_QUIESCENCE_SECONDS", 5),
# If a binary's .TXT report hasn't appeared after this many
# seconds, forward the binary alone rather than blocking
# forever.
"SFM_MISSING_REPORT_GRACE_SECONDS": get_int(
"SFM_MISSING_REPORT_GRACE_SECONDS", 60
),
# Per-request HTTP timeout (seconds).
"SFM_HTTP_TIMEOUT": get_int("SFM_HTTP_TIMEOUT", 60),
# State file for forwarded-sha256 idempotency tracking.
# Defaults next to the log file for easy operator access.
"SFM_STATE_FILE": get_str("SFM_STATE_FILE", ""),
# Per-pass cap — forward at most N events per scan tick.
# 0 = unlimited. Default 500 as a safety against accidentally
# backfilling tens of thousands of events in one burst on
# first deploy in a folder that's been accumulating for years.
# See README "First-time deployment" section.
"SFM_MAX_FORWARDS_PER_PASS": get_int("SFM_MAX_FORWARDS_PER_PASS", 500),
}
# --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
# Logging must never crash the watcher
pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0:
return
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
now = datetime.now(timezone.utc)
last = None
try:
if os.path.exists(stamp_file):
with open(stamp_file, "r", encoding="utf-8") as f:
last = datetime.fromisoformat(f.read().strip())
except Exception:
last = None
if (last is None) or (now - last > timedelta(days=retention_days)):
try:
if os.path.exists(log_file):
open(log_file, "w", encoding="utf-8").close()
with open(stamp_file, "w", encoding="utf-8") as f:
f.write(now.isoformat())
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
log_message(log_file, enabled, "Logs auto-cleared")
except Exception:
pass
# --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None."""
try:
with open(path, "rb") as f:
chunk = f.read(max(256, min(header_bytes, 65536)))
m = UNIT_BYTES_RE.search(chunk)
if not m:
return None
raw = m.group(0)
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
try:
return cleaned.decode("ascii").upper()
except Exception:
return None
except Exception:
return None
# --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60)
if mins < 0:
mins = 0
return "{}h {}m".format(mins // 60, mins % 60)
def scan_latest(
watch: str,
header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float,
max_age_days: int,
logger=None,
) -> Dict[str, Dict[str, Any]]:
"""
Return newest .MLG per unit, only for files newer than max_age_days:
{uid: {'mtime': float, 'fname': str, 'path': str}}
"""
latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch)
return latest
now_ts = time.time()
max_age_days = max(1, int(max_age_days)) # sanity floor
max_age_seconds = max_age_days * 86400.0
try:
with os.scandir(watch) as it:
for e in it:
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
continue
fpath = e.path
try:
mtime = e.stat().st_mtime
except Exception:
continue
# Skip very old events (beyond retention window)
age_seconds = now_ts - mtime
if age_seconds < 0:
age_seconds = 0
if age_seconds > max_age_seconds:
continue # too old, ignore this file
cached = cache.get(fpath)
if cached is not None and cached[0] == mtime:
uid = cached[1]
else:
uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid:
# If unsniffable but very recent, log for later inspection
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger:
logger("[unsniffable-recent] {}".format(fpath))
continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath}
except Exception as ex:
print("[WARN] Scan error:", ex)
return latest
# --- API heartbeat / SFM telemetry helpers ---
VERSION = "1.5.4"
def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]:
"""Return the last n lines of the log file as a list of strings, or None on failure."""
if not log_file:
return None
try:
with open(log_file, "r", errors="replace") as f:
lines = f.readlines()
return [l.rstrip("\n") for l in lines[-n:]]
except Exception:
return None
def send_api_payload(payload: dict, api_url: str) -> Optional[dict]:
"""POST payload to API. Returns parsed JSON response dict, or None on failure."""
if not api_url:
return None
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as res:
print("[API] POST success: {}".format(res.status))
try:
return json.loads(res.read().decode("utf-8"))
except Exception:
return None
except urllib.error.URLError as e:
print("[API] POST failed: {}".format(e))
return None
def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""
Build SFM Telemetry JSON v1 payload from latest-unit dict.
Schema is source-agnostic and future-proof.
"""
now_iso = datetime.now(timezone.utc).isoformat()
now_ts = time.time()
payload = {
"source_id": cfg.get("SOURCE_ID", gethostname()),
"source_type": cfg.get("SOURCE_TYPE", "series3_watcher"),
"timestamp": now_iso,
"units": [],
}
for unit_id, info in units_dict.items():
mtime = info.get("mtime")
if mtime is not None:
last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
age_minutes = int(max(0, (now_ts - mtime) // 60))
else:
last_event_iso = None
age_minutes = None
file_path = info.get("path")
file_size = None
if file_path:
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
payload["units"].append(
{
"unit_id": unit_id,
"last_event_time": last_event_iso,
"age_minutes": age_minutes,
"observation_method": "mlg_scan",
"event_metadata": {
"file_name": info.get("fname"),
"file_path": file_path,
"file_size_bytes": file_size,
"event_number": None,
"event_type": None,
},
}
)
return payload
# --------------- Watcher loop (tray-friendly) ----------------
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
"""
Main watcher loop. Runs in a background thread when launched from the tray.
state dict is written on every scan cycle:
state["status"] — "running" | "error" | "starting"
state["api_status"] — "ok" | "fail" | "disabled"
state["units"] — list of dicts: {uid, age_hours, last, fname}
state["last_scan"] — datetime of last successful scan (or None)
state["last_api"] — datetime of last successful API POST (or None)
state["last_error"] — last error string (or None)
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
"""
if getattr(sys, "frozen", False):
here = os.path.dirname(os.path.abspath(sys.executable))
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or here
config_dir = os.path.join(_appdata, "Series3Watcher")
else:
here = os.path.dirname(os.path.abspath(__file__)) or "."
config_dir = here
config_path = os.path.join(config_dir, "config.ini")
state["status"] = "starting"
state["units"] = []
state["last_scan"] = None
state["last_error"] = None
state["log_dir"] = None
state["cfg"] = {}
try:
cfg = load_config(config_path)
except Exception as e:
state["status"] = "error"
state["last_error"] = "Config load failed: {}".format(e)
return
state["cfg"] = cfg
state["log_dir"] = os.path.dirname(cfg["LOG_FILE"]) or here
WATCH_PATH = cfg["WATCH_PATH"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
LOG_FILE = cfg["LOG_FILE"]
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"])
MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"])
print(
"[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
)
)
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
),
)
sniff_cache: Dict[str, Tuple[float, str]] = {}
last_api_ts: float = 0.0
last_forward_ts: float = 0.0
# ---- SFM event-forwarder setup ----
# Default-off; only initialised when both flag and URL are set.
sfm_state = None
if cfg.get("SFM_FORWARD_ENABLED") and cfg.get("SFM_URL"):
try:
from event_forwarder import ForwardState
state_file = cfg.get("SFM_STATE_FILE") or os.path.join(
os.path.dirname(LOG_FILE) or here, "sfm_forwarded.json"
)
sfm_state = ForwardState(state_file)
print(
"[CFG] SFM_FORWARD_ENABLED=true SFM_URL={} state={} ({} already-forwarded)".format(
cfg.get("SFM_URL"), state_file, sfm_state.count(),
)
)
log_message(
LOG_FILE, ENABLE_LOGGING,
"[cfg] sfm forwarder enabled url={} state={} already_forwarded={}".format(
cfg.get("SFM_URL"), state_file, sfm_state.count(),
),
)
except Exception as e:
print("[WARN] SFM forwarder init failed: {}".format(e))
log_message(LOG_FILE, ENABLE_LOGGING,
"[warn] sfm forwarder init failed: {}".format(e))
sfm_state = None
else:
print("[CFG] SFM_FORWARD_ENABLED=false (event forwarding disabled)")
while not stop_event.is_set():
try:
now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110)
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400)
logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(
WATCH_PATH,
MLG_HEADER_BYTES,
sniff_cache,
recent_cutoff,
MAX_EVENT_AGE_DAYS,
logger_fn,
)
now_epoch = time.time()
# Log detected units to console and log file (info only, no status judgement)
unit_list = []
if latest:
print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS))
for uid in sorted(latest.keys()):
info = latest[uid]
age_hours = (now_epoch - info["mtime"]) / 3600.0
unit_list.append({
"uid": uid,
"age_hours": age_hours,
"last": fmt_last(info["mtime"]),
"fname": info["fname"],
})
line = (
"{uid:<8} Age: {age:<7} Last: {last} (File: {fname})".format(
uid=uid,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]),
fname=info["fname"],
)
)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line)
else:
print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS))
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS),
)
# Update shared state for tray — status reflects watcher health, not unit ages
state["status"] = "running"
state["units"] = unit_list
state["last_scan"] = datetime.now()
state["last_error"] = None
# ---- API heartbeat to SFM ----
if cfg.get("API_ENABLED", False):
now_ts = time.time()
interval = int(cfg.get("API_INTERVAL_SECONDS", 300))
if now_ts - last_api_ts >= interval:
hb_payload = build_sfm_payload(latest, cfg)
hb_payload["version"] = VERSION
hb_payload["watcher_status"] = state.get("status", "unknown")
hb_payload["log_tail"] = _read_log_tail(cfg.get("LOG_FILE", ""), 25)
response = send_api_payload(hb_payload, cfg.get("API_URL", ""))
last_api_ts = now_ts
if response is not None:
state["api_status"] = "ok"
state["last_api"] = datetime.now()
if response.get("update_available"):
state["update_available"] = True
else:
state["api_status"] = "fail"
else:
state["api_status"] = "disabled"
# ---- SFM event forwarder ----
# Same scan loop as the heartbeat, but on its own cadence
# (SFM_FORWARD_INTERVAL_SECONDS). Default-off — sfm_state
# is None unless config explicitly enabled it AND supplied
# an SFM_URL.
if sfm_state is not None:
now_ts = time.time()
fwd_interval = int(cfg.get("SFM_FORWARD_INTERVAL_SECONDS", 60))
if now_ts - last_forward_ts >= fwd_interval:
try:
from event_forwarder import forward_pending
counts = forward_pending(
WATCH_PATH,
cfg.get("SFM_URL", ""),
sfm_state,
max_age_days=MAX_EVENT_AGE_DAYS,
quiescence_seconds=int(cfg.get("SFM_QUIESCENCE_SECONDS", 5)),
missing_report_grace_seconds=int(
cfg.get("SFM_MISSING_REPORT_GRACE_SECONDS", 60)
),
timeout=int(cfg.get("SFM_HTTP_TIMEOUT", 60)),
max_per_pass=int(cfg.get("SFM_MAX_FORWARDS_PER_PASS", 500)),
logger=lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
last_forward_ts = now_ts
if counts["scanned"] > 0:
summary = (
"[forward] scanned={} forwarded={} "
"with_report={} errors={}".format(
counts["scanned"], counts["forwarded"],
counts["with_report"], counts["errors"],
)
)
print(summary)
log_message(LOG_FILE, ENABLE_LOGGING, summary)
state["sfm_status"] = "ok" if counts["errors"] == 0 else "errors"
state["last_forward"] = datetime.now()
state["last_forward_counts"] = counts
except Exception as e:
err = "[forward-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
state["sfm_status"] = "fail"
else:
state["sfm_status"] = "disabled"
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
state["status"] = "error"
state["last_error"] = str(e)
# Interruptible sleep: wake immediately if stop_event fires
stop_event.wait(timeout=SCAN_INTERVAL)
# --------------- Main (standalone) ------------------
def main() -> None:
state = {}
stop_event = threading.Event()
try:
run_watcher(state, stop_event)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if __name__ == "__main__":
main()