diff --git a/series3_emitter.py b/series3_emitter.py index 50b06c3..e8218e2 100644 --- a/series3_emitter.py +++ b/series3_emitter.py @@ -1,36 +1,34 @@ """ -Series 3 Emitter — v1.1.0 +Series 3 Emitter — v1.2.0 Environment: - Python 3.8 (Windows 7 compatible) - Runs on DL2 with Blastware 10 event path Key Features: -- Atomic roster downloads from Dropbox (no partial files) -- Automatic roster refresh from Dropbox at configurable interval -- Automatic hot-reload into memory when roster CSV changes -- Failsafe reload: keeps previous roster if new file is invalid or empty - Config-driven paths, intervals, and logging - Compact console heartbeat with status per unit - Logging with retention auto-clean (days configurable) - Safe .MLG header sniff for unit IDs (BE#### / BA####) -- NEW in v1.1.0: - - Standardized SFM Telemetry JSON payload (source-agnostic) - - Periodic HTTP heartbeat POST to SFM backend +- Standardized SFM Telemetry JSON payload (source-agnostic) +- Periodic HTTP heartbeat POST to SFM backend +- NEW in v1.2.0: + - No local roster / CSV dependency + - Only scans .MLG files newer than MAX_EVENT_AGE_DAYS """ import os import re -import csv import time import json import configparser import urllib.request import urllib.error from datetime import datetime, timezone, timedelta -from typing import Dict, Any, Optional, Tuple, Set, List +from typing import Dict, Any, Optional, Tuple from socket import gethostname + # ---------------- Config ---------------- def load_config(path: str) -> Dict[str, Any]: """Load INI with tolerant inline comments and a required [emitter] section.""" @@ -61,9 +59,6 @@ def load_config(path: str) -> Dict[str, Any]: return { "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), - "ROSTER_FILE": get_str("ROSTER_FILE", r"C:\SeismoEmitter\series3_roster.csv"), - "ROSTER_URL": get_str("ROSTER_URL", ""), - "ROSTER_REFRESH_MIN_SECONDS": get_int("ROSTER_REFRESH_MIN_SECONDS", 300), "SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300), "OK_HOURS": float(get_int("OK_HOURS", 12)), "MISSING_HOURS": float(get_int("MISSING_HOURS", 24)), @@ -73,6 +68,7 @@ def load_config(path: str) -> Dict[str, Any]: "COLORIZE": get_bool("COLORIZE", False), # Win7 default off "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), + "MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365), # API heartbeat / SFM telemetry "API_ENABLED": get_bool("API_ENABLED", False), @@ -99,6 +95,7 @@ def log_message(path: str, enabled: bool, msg: str) -> None: with open(path, "a", encoding="utf-8") as f: f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) except Exception: + # Logging must never crash the emitter pass @@ -126,79 +123,6 @@ def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> N pass -# --------------- Roster --------------------- -def normalize_id(uid: str) -> str: - if uid is None: - return "" - return uid.replace(" ", "").strip().upper() - - -def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]: - """ - CSV tolerant of commas in notes: device_id, active, notes... - Returns: active, bench, ignored, notes_by_unit - """ - active: Set[str] = set() - bench: Set[str] = set() - ignored: Set[str] = set() - notes_by_unit: Dict[str, str] = {} - - if not os.path.exists(path): - print("[WARN] Roster not found:", path) - return active, bench, ignored, notes_by_unit - - try: - with open(path, "r", encoding="utf-8-sig", newline="") as f: - rdr = csv.reader(f) - try: - headers = next(rdr) - except StopIteration: - return active, bench, ignored, notes_by_unit - - headers = [(h or "").strip().lower() for h in headers] - - def idx_of(name: str, fallbacks: List[str]) -> Optional[int]: - if name in headers: - return headers.index(name) - for fb in fallbacks: - if fb in headers: - return headers.index(fb) - return None - - i_id = idx_of("device_id", ["unitid", "id"]) - i_ac = idx_of("active", []) - i_no = idx_of("notes", ["note", "location"]) - - if i_id is None or i_ac is None: - print("[WARN] Roster missing device_id/active columns") - return active, bench, ignored, notes_by_unit - - for row in rdr: - if len(row) <= max(i_id, i_ac): - continue - uid = normalize_id(row[i_id]) - note = "" - if i_no is not None: - extra = row[i_no:] - note = ",".join([c or "" for c in extra]).strip().rstrip(",") - if not uid: - continue - notes_by_unit[uid] = note - - flag = (row[i_ac] or "").strip().lower() - if flag in ("yes", "y", "true", "1", "on"): - active.add(uid) - elif flag in ("no", "n", "off", "0"): - bench.add(uid) - elif flag in ("ignore", "retired", "old"): - ignored.add(uid) - - except Exception as e: - print("[WARN] Roster read error:", e) - - return active, bench, ignored, notes_by_unit - - # --------------- .MLG sniff ------------------ UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") @@ -237,17 +161,23 @@ def scan_latest( watch: str, header_bytes: int, cache: Dict[str, Tuple[float, str]], - recent_cutoff: float = None, + recent_cutoff: float, + max_age_days: int, logger=None, ) -> Dict[str, Dict[str, Any]]: """ - Return newest .MLG per unit: + Return newest .MLG per unit, only for files newer than max_age_days: {uid: {'mtime': float, 'fname': str, 'path': str}} """ latest: Dict[str, Dict[str, Any]] = {} if not os.path.exists(watch): print("[WARN] Watch path not found:", watch) return latest + + now_ts = time.time() + max_age_days = max(1, int(max_age_days)) # sanity floor + max_age_seconds = max_age_days * 86400.0 + try: with os.scandir(watch) as it: for e in it: @@ -259,12 +189,20 @@ def scan_latest( except Exception: continue + # Skip very old events (beyond retention window) + age_seconds = now_ts - mtime + if age_seconds < 0: + age_seconds = 0 + if age_seconds > max_age_seconds: + continue # too old, ignore this file + cached = cache.get(fpath) if cached is not None and cached[0] == mtime: uid = cached[1] else: uid = sniff_unit_from_mlg(fpath, header_bytes) if not uid: + # If unsniffable but very recent, log for later inspection if (recent_cutoff is not None) and (mtime >= recent_cutoff): if logger: logger(f"[unsniffable-recent] {fpath}") @@ -278,36 +216,6 @@ def scan_latest( return latest -# --- Roster fetch (Dropbox/HTTPS) helper --- -def refresh_roster_from_url(url: str, dest: str, min_seconds: int, state: dict, logger=None) -> None: - now = time.time() - - # throttle fetches; only pull if enough time elapsed - if now - state.get("t", 0) < max(0, int(min_seconds or 0)): - return - - try: - with urllib.request.urlopen(url, timeout=15) as r: - data = r.read() - if data and data.strip(): - with open(dest, "wb") as f: - f.write(data) - state["t"] = now - if logger: - logger( - f"[roster] refreshed from {url} at " - f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -> {dest} ({len(data)} bytes)" - ) - except Exception as e: - if logger: - logger(f"[roster-fetch-error] {e}") - - -# --- config helper: case-insensitive key lookup --- -def cfg_get(cfg: dict, key: str, default=None): - return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default))) - - # --- API heartbeat / SFM telemetry helpers --- def send_api_payload(payload: dict, api_url: str) -> None: if not api_url: @@ -378,7 +286,6 @@ def main() -> None: cfg = load_config(os.path.join(here, "config.ini")) WATCH_PATH = cfg["WATCH_PATH"] - ROSTER_FILE = cfg["ROSTER_FILE"] SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"]) OK_HOURS = float(cfg["OK_HOURS"]) MISSING_HOURS = float(cfg["MISSING_HOURS"]) @@ -387,52 +294,30 @@ def main() -> None: LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"]) COLORIZE = bool(cfg["COLORIZE"]) MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) + RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"]) + MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"]) C_OK = ansi(COLORIZE, "\033[92m") C_PEN = ansi(COLORIZE, "\033[93m") C_MIS = ansi(COLORIZE, "\033[91m") - C_UNX = ansi(COLORIZE, "\033[95m") C_RST = ansi(COLORIZE, "\033[0m") - # --- Dropbox roster refresh (pull CSV to local cache) --- - roster_state: Dict[str, Any] = {} - url = str(cfg_get(cfg, "ROSTER_URL", "") or "") - print( - f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} " - f"ROSTER_URL={'set' if url else 'not set'}" + "[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( + WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) + ) ) log_message( LOG_FILE, ENABLE_LOGGING, - f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}", + "[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( + WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) + ), ) - if url.lower().startswith("http"): - refresh_roster_from_url( - url, - ROSTER_FILE, - int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)), - roster_state, - lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), - ) - # cache for scanning sniff_cache: Dict[str, Tuple[float, str]] = {} - # Always load the (possibly refreshed) local roster - try: - active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE) - except Exception as ex: - log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}") - active, bench, ignored, notes_by_unit = set(), set(), set(), {} - - # track roster file modification time - try: - roster_mtime = os.path.getmtime(ROSTER_FILE) - except Exception: - roster_mtime = None - last_api_ts: float = 0.0 while True: @@ -443,65 +328,25 @@ def main() -> None: print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) print("-" * 110) - # Periodically refresh roster file from Dropbox - if url.lower().startswith("http"): - refresh_roster_from_url( - url, - ROSTER_FILE, - int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)), - roster_state, - lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), - ) - - # Reload roster into memory if the file changed - try: - m = os.path.getmtime(ROSTER_FILE) - except Exception: - m = None - - if m is not None and m != roster_mtime: - roster_mtime = m - try: - new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE) - if new_active or new_bench or new_ignored: - active, bench, ignored, notes_by_unit = ( - new_active, - new_bench, - new_ignored, - new_notes_by_unit, - ) - print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}") - log_message( - LOG_FILE, - ENABLE_LOGGING, - f"[roster] reloaded {len(active)} active units", - ) - else: - print("[ROSTER] Reload skipped — no valid active units in new file") - log_message( - LOG_FILE, - ENABLE_LOGGING, - "[roster] reload skipped — roster parse failed or empty", - ) - except Exception as ex: - print(f"[ROSTER] Reload failed, keeping previous roster: {ex}") - log_message( - LOG_FILE, - ENABLE_LOGGING, - f"[roster] reload failed, keeping previous roster: {ex}", - ) - clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) - recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400) + recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400) logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) - latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger_fn) + latest = scan_latest( + WATCH_PATH, + MLG_HEADER_BYTES, + sniff_cache, + recent_cutoff, + MAX_EVENT_AGE_DAYS, + logger_fn, + ) now_epoch = time.time() - # Active units - for uid in sorted(active): - info = latest.get(uid) - if info is not None: + # Detected units summary (no roster dependency) + if latest: + print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS)) + for uid in sorted(latest.keys()): + info = latest[uid] age_hours = (now_epoch - info["mtime"]) / 3600.0 if age_hours > MISSING_HOURS: status, col = "Missing", C_MIS @@ -510,67 +355,26 @@ def main() -> None: else: status, col = "OK", C_OK - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" line = ( - "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}".format( + "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){rst}".format( col=col, uid=uid, status=status, age=fmt_age(now_epoch, info["mtime"]), last=fmt_last(info["mtime"]), fname=info["fname"], - note=note_suffix, - rst=C_RST, - ) - ) - else: - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" - line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format( - col=C_MIS, uid=uid, note=note_suffix, rst=C_RST - ) - - print(line) - log_message(LOG_FILE, ENABLE_LOGGING, line) - - # Bench Units (rostered but not active in field) - print("\nBench Units (rostered, not active):") - for uid in sorted(bench): - info = latest.get(uid) - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" - if info: - line = ( - f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} " - f"(File: {info['fname']}){note_suffix}" - ) - else: - line = f"{uid:<8} Bench Last: ---{note_suffix}" - print(line) - log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line) - - # Unexpected units - unexpected = [ - u - for u in latest.keys() - if u not in active and u not in bench and u not in ignored and u not in notes_by_unit - ] - if unexpected: - print("\nUnexpected Units Detected:") - for uid in sorted(unexpected): - info = latest[uid] - line = ( - "{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}".format( - col=C_UNX, - uid=uid, - last=fmt_last(info["mtime"]), - fname=info["fname"], rst=C_RST, ) ) print(line) - log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line) + log_message(LOG_FILE, ENABLE_LOGGING, line) + else: + print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS)) + log_message( + LOG_FILE, + ENABLE_LOGGING, + "[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS), + ) # ---- API heartbeat to SFM ---- if cfg.get("API_ENABLED", False):