""" Series 3 Watcher — v1.4.2 Environment: - Python 3.8 (Windows 7 compatible) - Runs on DL2 with Blastware 10 event path Key Features: - Config-driven paths, intervals, and logging - Compact console heartbeat with status per unit - Logging with retention auto-clean (days configurable) - Safe .MLG header sniff for unit IDs (BE#### / BA####) - Standardized SFM Telemetry JSON payload (source-agnostic) - Periodic HTTP heartbeat POST to SFM backend - Tray-friendly: run_watcher(state, stop_event) for background thread use """ import os import re import sys import time import json import threading import configparser import urllib.request import urllib.error from datetime import datetime, timezone, timedelta from typing import Dict, Any, Optional, Tuple from socket import gethostname # ---------------- Config ---------------- def load_config(path: str) -> Dict[str, Any]: """Load INI with tolerant inline comments and a required [agent] section.""" cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#")) cp.optionxform = str # preserve key case with open(path, "r", encoding="utf-8") as f: txt = f.read() # Ensure we have a section header if not re.search(r"^\s*\[", txt, flags=re.M): txt = "[agent]\n" + txt cp.read_string(txt) sec = cp["agent"] def get_str(k: str, dflt: str) -> str: return sec.get(k, dflt).strip() def get_int(k: str, dflt: int) -> int: try: return int(sec.get(k, str(dflt)).strip()) except Exception: return dflt def get_bool(k: str, dflt: bool) -> bool: v = sec.get(k, None) if v is None: return dflt return v.strip().lower() in ("1", "true", "on", "yes", "y") return { "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), "SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300), "OK_HOURS": float(get_int("OK_HOURS", 12)), "MISSING_HOURS": float(get_int("MISSING_HOURS", 24)), "ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True), "LOG_FILE": get_str("LOG_FILE", os.path.join( os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\", "Series3Watcher", "agent_logs", "series3_watcher.log" )), "LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30), "COLORIZE": get_bool("COLORIZE", False), # Win7 default off "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), "MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365), # API heartbeat / SFM telemetry "API_ENABLED": get_bool("API_ENABLED", False), "API_URL": get_str("API_URL", ""), "API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300), "SOURCE_ID": get_str("SOURCE_ID", gethostname()), "SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_watcher"), } # --------------- ANSI helpers --------------- def ansi(enabled: bool, code: str) -> str: return code if enabled else "" # --------------- Logging -------------------- def log_message(path: str, enabled: bool, msg: str) -> None: if not enabled: return try: d = os.path.dirname(path) or "." if not os.path.exists(d): os.makedirs(d) with open(path, "a", encoding="utf-8") as f: f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) except Exception: # Logging must never crash the watcher pass def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None: if not enabled or retention_days <= 0: return stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt") now = datetime.now(timezone.utc) last = None try: if os.path.exists(stamp_file): with open(stamp_file, "r", encoding="utf-8") as f: last = datetime.fromisoformat(f.read().strip()) except Exception: last = None if (last is None) or (now - last > timedelta(days=retention_days)): try: if os.path.exists(log_file): open(log_file, "w", encoding="utf-8").close() with open(stamp_file, "w", encoding="utf-8") as f: f.write(now.isoformat()) print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S"))) log_message(log_file, enabled, "Logs auto-cleared") except Exception: pass # --------------- .MLG sniff ------------------ UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: """Return BE####/BA#### from header bytes, or None.""" try: with open(path, "rb") as f: chunk = f.read(max(256, min(header_bytes, 65536))) m = UNIT_BYTES_RE.search(chunk) if not m: return None raw = m.group(0) cleaned = re.sub(rb"[^A-Z0-9]", b"", raw) try: return cleaned.decode("ascii").upper() except Exception: return None except Exception: return None # --------------- Scan helpers --------------- def fmt_last(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") def fmt_age(now_epoch: float, mtime: float) -> str: mins = int((now_epoch - mtime) // 60) if mins < 0: mins = 0 return "{}h {}m".format(mins // 60, mins % 60) def scan_latest( watch: str, header_bytes: int, cache: Dict[str, Tuple[float, str]], recent_cutoff: float, max_age_days: int, logger=None, ) -> Dict[str, Dict[str, Any]]: """ Return newest .MLG per unit, only for files newer than max_age_days: {uid: {'mtime': float, 'fname': str, 'path': str}} """ latest: Dict[str, Dict[str, Any]] = {} if not os.path.exists(watch): print("[WARN] Watch path not found:", watch) return latest now_ts = time.time() max_age_days = max(1, int(max_age_days)) # sanity floor max_age_seconds = max_age_days * 86400.0 try: with os.scandir(watch) as it: for e in it: if (not e.is_file()) or (not e.name.lower().endswith(".mlg")): continue fpath = e.path try: mtime = e.stat().st_mtime except Exception: continue # Skip very old events (beyond retention window) age_seconds = now_ts - mtime if age_seconds < 0: age_seconds = 0 if age_seconds > max_age_seconds: continue # too old, ignore this file cached = cache.get(fpath) if cached is not None and cached[0] == mtime: uid = cached[1] else: uid = sniff_unit_from_mlg(fpath, header_bytes) if not uid: # If unsniffable but very recent, log for later inspection if (recent_cutoff is not None) and (mtime >= recent_cutoff): if logger: logger("[unsniffable-recent] {}".format(fpath)) continue # skip file if no unit ID found in header cache[fpath] = (mtime, uid) if (uid not in latest) or (mtime > latest[uid]["mtime"]): latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath} except Exception as ex: print("[WARN] Scan error:", ex) return latest # --- API heartbeat / SFM telemetry helpers --- VERSION = "1.4.2" def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]: """Return the last n lines of the log file as a list of strings, or None on failure.""" if not log_file: return None try: with open(log_file, "r", errors="replace") as f: lines = f.readlines() return [l.rstrip("\n") for l in lines[-n:]] except Exception: return None def send_api_payload(payload: dict, api_url: str) -> Optional[dict]: """POST payload to API. Returns parsed JSON response dict, or None on failure.""" if not api_url: return None data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=5) as res: print("[API] POST success: {}".format(res.status)) try: return json.loads(res.read().decode("utf-8")) except Exception: return None except urllib.error.URLError as e: print("[API] POST failed: {}".format(e)) return None def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict: """ Build SFM Telemetry JSON v1 payload from latest-unit dict. Schema is source-agnostic and future-proof. """ now_iso = datetime.now(timezone.utc).isoformat() now_ts = time.time() payload = { "source_id": cfg.get("SOURCE_ID", gethostname()), "source_type": cfg.get("SOURCE_TYPE", "series3_watcher"), "timestamp": now_iso, "units": [], } for unit_id, info in units_dict.items(): mtime = info.get("mtime") if mtime is not None: last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() age_minutes = int(max(0, (now_ts - mtime) // 60)) else: last_event_iso = None age_minutes = None file_path = info.get("path") file_size = None if file_path: try: file_size = os.path.getsize(file_path) except Exception: file_size = None payload["units"].append( { "unit_id": unit_id, "last_event_time": last_event_iso, "age_minutes": age_minutes, "observation_method": "mlg_scan", "event_metadata": { "file_name": info.get("fname"), "file_path": file_path, "file_size_bytes": file_size, "event_number": None, "event_type": None, }, } ) return payload # --------------- Watcher loop (tray-friendly) ---------------- def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: """ Main watcher loop. Runs in a background thread when launched from the tray. state dict is written on every scan cycle: state["status"] — "ok" | "pending" | "missing" | "error" | "starting" state["units"] — list of dicts: {uid, status, age_hours, last, fname} state["last_scan"] — datetime of last successful scan (or None) state["last_error"] — last error string (or None) state["log_dir"] — directory containing the log file state["cfg"] — loaded config dict """ if getattr(sys, "frozen", False): here = os.path.dirname(os.path.abspath(sys.executable)) _appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or here config_dir = os.path.join(_appdata, "Series3Watcher") else: here = os.path.dirname(os.path.abspath(__file__)) or "." config_dir = here config_path = os.path.join(config_dir, "config.ini") state["status"] = "starting" state["units"] = [] state["last_scan"] = None state["last_error"] = None state["log_dir"] = None state["cfg"] = {} try: cfg = load_config(config_path) except Exception as e: state["status"] = "error" state["last_error"] = "Config load failed: {}".format(e) return state["cfg"] = cfg state["log_dir"] = os.path.dirname(cfg["LOG_FILE"]) or here WATCH_PATH = cfg["WATCH_PATH"] SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"]) OK_HOURS = float(cfg["OK_HOURS"]) MISSING_HOURS = float(cfg["MISSING_HOURS"]) ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"]) LOG_FILE = cfg["LOG_FILE"] LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"]) COLORIZE = bool(cfg["COLORIZE"]) MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"]) MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"]) C_OK = ansi(COLORIZE, "\033[92m") C_PEN = ansi(COLORIZE, "\033[93m") C_MIS = ansi(COLORIZE, "\033[91m") C_RST = ansi(COLORIZE, "\033[0m") print( "[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) ) ) log_message( LOG_FILE, ENABLE_LOGGING, "[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) ), ) sniff_cache: Dict[str, Tuple[float, str]] = {} last_api_ts: float = 0.0 while not stop_event.is_set(): try: now_local = datetime.now().isoformat() now_utc = datetime.now(timezone.utc).isoformat() print("-" * 110) print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) print("-" * 110) clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400) logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) latest = scan_latest( WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, MAX_EVENT_AGE_DAYS, logger_fn, ) now_epoch = time.time() # Log detected units to console and log file (info only, no status judgement) unit_list = [] if latest: print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS)) for uid in sorted(latest.keys()): info = latest[uid] age_hours = (now_epoch - info["mtime"]) / 3600.0 unit_list.append({ "uid": uid, "age_hours": age_hours, "last": fmt_last(info["mtime"]), "fname": info["fname"], }) line = ( "{uid:<8} Age: {age:<7} Last: {last} (File: {fname})".format( uid=uid, age=fmt_age(now_epoch, info["mtime"]), last=fmt_last(info["mtime"]), fname=info["fname"], ) ) print(line) log_message(LOG_FILE, ENABLE_LOGGING, line) else: print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS)) log_message( LOG_FILE, ENABLE_LOGGING, "[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS), ) # Update shared state for tray — status reflects watcher health, not unit ages state["status"] = "running" state["units"] = unit_list state["last_scan"] = datetime.now() state["last_error"] = None # ---- API heartbeat to SFM ---- if cfg.get("API_ENABLED", False): now_ts = time.time() interval = int(cfg.get("API_INTERVAL_SECONDS", 300)) if now_ts - last_api_ts >= interval: hb_payload = build_sfm_payload(latest, cfg) hb_payload["version"] = VERSION hb_payload["watcher_status"] = state.get("status", "unknown") hb_payload["log_tail"] = _read_log_tail(cfg.get("LOG_FILE", ""), 25) response = send_api_payload(hb_payload, cfg.get("API_URL", "")) last_api_ts = now_ts if response is not None: state["api_status"] = "ok" state["last_api"] = datetime.now() if response.get("update_available"): state["update_available"] = True else: state["api_status"] = "fail" else: state["api_status"] = "disabled" except Exception as e: err = "[loop-error] {}".format(e) print(err) log_message(LOG_FILE, ENABLE_LOGGING, err) state["status"] = "error" state["last_error"] = str(e) # Interruptible sleep: wake immediately if stop_event fires stop_event.wait(timeout=SCAN_INTERVAL) # --------------- Main (standalone) ------------------ def main() -> None: state = {} stop_event = threading.Event() try: run_watcher(state, stop_event) except KeyboardInterrupt: print("\nStopping...") stop_event.set() if __name__ == "__main__": main()