From 551fdae106b86da005826c746b5dab9cfa824ef3 Mon Sep 17 00:00:00 2001 From: serversdwn Date: Mon, 1 Dec 2025 16:30:08 -0500 Subject: [PATCH] v1.1 w/ api funtion added --- series3_emitter.py | 349 ++++++++++++++++++++++++++++++--------------- 1 file changed, 234 insertions(+), 115 deletions(-) diff --git a/series3_emitter.py b/series3_emitter.py index 9ab04dc..50b06c3 100644 --- a/series3_emitter.py +++ b/series3_emitter.py @@ -1,5 +1,5 @@ """ -Series 3 Emitter — v1.0.0 (Stable Baseline, SemVer Reset) +Series 3 Emitter — v1.1.0 Environment: - Python 3.8 (Windows 7 compatible) @@ -14,33 +14,32 @@ Key Features: - Compact console heartbeat with status per unit - Logging with retention auto-clean (days configurable) - Safe .MLG header sniff for unit IDs (BE#### / BA####) - -Changelog: -- Reset to semantic versioning (from legacy v5.9 beta) -- Fixed stray `note=note_suffix` bug in Unexpected Units block -- Removed duplicate imports and redundant roster load at startup -- Added startup config echo (paths + URL status) +- NEW in v1.1.0: + - Standardized SFM Telemetry JSON payload (source-agnostic) + - Periodic HTTP heartbeat POST to SFM backend """ import os import re import csv import time +import json import configparser import urllib.request -import requests +import urllib.error from datetime import datetime, timezone, timedelta from typing import Dict, Any, Optional, Tuple, Set, List +from socket import gethostname # ---------------- Config ---------------- def load_config(path: str) -> Dict[str, Any]: """Load INI with tolerant inline comments and a required [emitter] section.""" - cp = configparser.ConfigParser(inline_comment_prefixes=(';', '#')) + cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#")) cp.optionxform = str # preserve key case with open(path, "r", encoding="utf-8") as f: txt = f.read() # Ensure we have a section header - if not re.search(r'^\s*\[', txt, flags=re.M): + if not re.search(r"^\s*\[", txt, flags=re.M): txt = "[emitter]\n" + txt cp.read_string(txt) sec = cp["emitter"] @@ -58,7 +57,7 @@ def load_config(path: str) -> Dict[str, Any]: v = sec.get(k, None) if v is None: return dflt - return v.strip().lower() in ("1","true","on","yes","y") + return v.strip().lower() in ("1", "true", "on", "yes", "y") return { "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), @@ -74,12 +73,21 @@ def load_config(path: str) -> Dict[str, Any]: "COLORIZE": get_bool("COLORIZE", False), # Win7 default off "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), + + # API heartbeat / SFM telemetry + "API_ENABLED": get_bool("API_ENABLED", False), + "API_URL": get_str("API_URL", ""), + "API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300), + "SOURCE_ID": get_str("SOURCE_ID", gethostname()), + "SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_emitter"), } + # --------------- ANSI helpers --------------- def ansi(enabled: bool, code: str) -> str: return code if enabled else "" + # --------------- Logging -------------------- def log_message(path: str, enabled: bool, msg: str) -> None: if not enabled: @@ -93,6 +101,7 @@ def log_message(path: str, enabled: bool, msg: str) -> None: except Exception: pass + def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None: if not enabled or retention_days <= 0: return @@ -116,15 +125,18 @@ def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> N except Exception: pass + # --------------- Roster --------------------- def normalize_id(uid: str) -> str: if uid is None: return "" return uid.replace(" ", "").strip().upper() + def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]: - """CSV tolerant of commas in notes: device_id, active, notes... - Returns: active, bench, ignored, notes_by_unit + """ + CSV tolerant of commas in notes: device_id, active, notes... + Returns: active, bench, ignored, notes_by_unit """ active: Set[str] = set() bench: Set[str] = set() @@ -133,15 +145,18 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str] if not os.path.exists(path): print("[WARN] Roster not found:", path) - return active, notes_by_unit + return active, bench, ignored, notes_by_unit + try: with open(path, "r", encoding="utf-8-sig", newline="") as f: rdr = csv.reader(f) try: headers = next(rdr) except StopIteration: - return active, notes_by_unit + return active, bench, ignored, notes_by_unit + headers = [(h or "").strip().lower() for h in headers] + def idx_of(name: str, fallbacks: List[str]) -> Optional[int]: if name in headers: return headers.index(name) @@ -149,12 +164,15 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str] if fb in headers: return headers.index(fb) return None - i_id = idx_of("device_id", ["unitid","id"]) + + i_id = idx_of("device_id", ["unitid", "id"]) i_ac = idx_of("active", []) - i_no = idx_of("notes", ["note","location"]) + i_no = idx_of("notes", ["note", "location"]) + if i_id is None or i_ac is None: print("[WARN] Roster missing device_id/active columns") - return active, notes_by_unit + return active, bench, ignored, notes_by_unit + for row in rdr: if len(row) <= max(i_id, i_ac): continue @@ -163,25 +181,28 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str] if i_no is not None: extra = row[i_no:] note = ",".join([c or "" for c in extra]).strip().rstrip(",") - notes_by_unit[uid] = note if not uid: continue - is_active = (row[i_ac] or "").strip().lower() in ("yes","y","true","1","on") + notes_by_unit[uid] = note + flag = (row[i_ac] or "").strip().lower() - if flag in ("yes","y","true","1","on"): + if flag in ("yes", "y", "true", "1", "on"): active.add(uid) - elif flag in ("no","n","off","0"): + elif flag in ("no", "n", "off", "0"): bench.add(uid) - elif flag in ("ignore","retired","old"): + elif flag in ("ignore", "retired", "old"): ignored.add(uid) except Exception as e: print("[WARN] Roster read error:", e) + return active, bench, ignored, notes_by_unit + # --------------- .MLG sniff ------------------ UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") + def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: """Return BE####/BA#### from header bytes, or None.""" try: @@ -199,21 +220,30 @@ def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: except Exception: return None + # --------------- Scan helpers --------------- def fmt_last(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") + def fmt_age(now_epoch: float, mtime: float) -> str: mins = int((now_epoch - mtime) // 60) - if mins < 0: mins = 0 - return "{}h {}m".format(mins//60, mins%60) + if mins < 0: + mins = 0 + return "{}h {}m".format(mins // 60, mins % 60) -def scan_latest(watch: str, header_bytes: int, - cache: Dict[str, Tuple[float, str]], - recent_cutoff: float = None, - logger=None): - """Return newest .MLG per unit: {uid: {'mtime': float, 'fname': str}}""" +def scan_latest( + watch: str, + header_bytes: int, + cache: Dict[str, Tuple[float, str]], + recent_cutoff: float = None, + logger=None, +) -> Dict[str, Dict[str, Any]]: + """ + Return newest .MLG per unit: + {uid: {'mtime': float, 'fname': str, 'path': str}} + """ latest: Dict[str, Dict[str, Any]] = {} if not os.path.exists(watch): print("[WARN] Watch path not found:", watch) @@ -228,26 +258,28 @@ def scan_latest(watch: str, header_bytes: int, mtime = e.stat().st_mtime except Exception: continue + cached = cache.get(fpath) if cached is not None and cached[0] == mtime: uid = cached[1] else: - uid = sniff_unit_from_mlg(fpath, header_bytes) - if not uid: + uid = sniff_unit_from_mlg(fpath, header_bytes) + if not uid: if (recent_cutoff is not None) and (mtime >= recent_cutoff): if logger: logger(f"[unsniffable-recent] {fpath}") - continue # skip file if no unit ID found in header - cache[fpath] = (mtime, uid) + continue # skip file if no unit ID found in header + cache[fpath] = (mtime, uid) + if (uid not in latest) or (mtime > latest[uid]["mtime"]): - latest[uid] = {"mtime": mtime, "fname": e.name} + latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath} except Exception as ex: print("[WARN] Scan error:", ex) return latest + # --- Roster fetch (Dropbox/HTTPS) helper --- -def refresh_roster_from_url(url: str, dest: str, min_seconds: int, - state: dict, logger=None): +def refresh_roster_from_url(url: str, dest: str, min_seconds: int, state: dict, logger=None) -> None: now = time.time() # throttle fetches; only pull if enough time elapsed @@ -262,9 +294,10 @@ def refresh_roster_from_url(url: str, dest: str, min_seconds: int, f.write(data) state["t"] = now if logger: - from datetime import datetime - logger(f"[roster] refreshed from {url} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} " - f"-> {dest} ({len(data)} bytes)") + logger( + f"[roster] refreshed from {url} at " + f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -> {dest} ({len(data)} bytes)" + ) except Exception as e: if logger: logger(f"[roster-fetch-error] {e}") @@ -274,19 +307,69 @@ def refresh_roster_from_url(url: str, dest: str, min_seconds: int, def cfg_get(cfg: dict, key: str, default=None): return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default))) -#---Report to server --- -def report_to_server(server_url: str, uid: str, info: dict, status: str): - payload = { - "unit": uid, - "unit_type": "series3", - "timestamp": fmt_last(info["mtime"]), - "file": info["fname"], - "status": status - } + +# --- API heartbeat / SFM telemetry helpers --- +def send_api_payload(payload: dict, api_url: str) -> None: + if not api_url: + return + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"}) try: - requests.post(server_url, json=payload, timeout=5) - except Exception as e: - print(f"[WARN] report_to_server failed for {uid}: {e}") + with urllib.request.urlopen(req, timeout=5) as res: + print(f"[API] POST success: {res.status}") + except urllib.error.URLError as e: + print(f"[API] POST failed: {e}") + + +def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict: + """ + Build SFM Telemetry JSON v1 payload from latest-unit dict. + Schema is source-agnostic and future-proof. + """ + now_iso = datetime.now(timezone.utc).isoformat() + now_ts = time.time() + + payload = { + "source_id": cfg.get("SOURCE_ID", gethostname()), + "source_type": cfg.get("SOURCE_TYPE", "series3_emitter"), + "timestamp": now_iso, + "units": [], + } + + for unit_id, info in units_dict.items(): + mtime = info.get("mtime") + if mtime is not None: + last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() + age_minutes = int(max(0, (now_ts - mtime) // 60)) + else: + last_event_iso = None + age_minutes = None + + file_path = info.get("path") + file_size = None + if file_path: + try: + file_size = os.path.getsize(file_path) + except Exception: + file_size = None + + payload["units"].append( + { + "unit_id": unit_id, + "last_event_time": last_event_iso, + "age_minutes": age_minutes, + "observation_method": "mlg_scan", + "event_metadata": { + "file_name": info.get("fname"), + "file_path": file_path, + "file_size_bytes": file_size, + "event_number": None, + "event_type": None, + }, + } + ) + + return payload # --------------- Main loop ------------------ @@ -305,25 +388,26 @@ def main() -> None: COLORIZE = bool(cfg["COLORIZE"]) MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) - C_OK = ansi(COLORIZE, "\033[92m") + C_OK = ansi(COLORIZE, "\033[92m") C_PEN = ansi(COLORIZE, "\033[93m") C_MIS = ansi(COLORIZE, "\033[91m") C_UNX = ansi(COLORIZE, "\033[95m") C_RST = ansi(COLORIZE, "\033[0m") - - # --- Dropbox roster refresh (pull CSV to local cache) --- - roster_state = {} - url = str(cfg_get(cfg, "ROSTER_URL", "") or "") - # --- Dropbox roster refresh (pull CSV to local cache) --- - roster_state = {} + + # --- Dropbox roster refresh (pull CSV to local cache) --- + roster_state: Dict[str, Any] = {} url = str(cfg_get(cfg, "ROSTER_URL", "") or "") - # 🔎 Patch 3: startup config echo (helps debugging) - print(f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") - # (optional, also write it to the log file) - log_message(LOG_FILE, ENABLE_LOGGING, - f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") - + print( + f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} " + f"ROSTER_URL={'set' if url else 'not set'}" + ) + log_message( + LOG_FILE, + ENABLE_LOGGING, + f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}", + ) + if url.lower().startswith("http"): refresh_roster_from_url( url, @@ -336,28 +420,25 @@ def main() -> None: # cache for scanning sniff_cache: Dict[str, Tuple[float, str]] = {} - # Always load the (possibly refreshed) local roster + # Always load the (possibly refreshed) local roster try: active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE) except Exception as ex: log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}") - active = set() - bench = set() - ignored = set() - notes_by_unit = {} + active, bench, ignored, notes_by_unit = set(), set(), set(), {} - # track roster file modification time + # track roster file modification time try: roster_mtime = os.path.getmtime(ROSTER_FILE) except Exception: roster_mtime = None - + last_api_ts: float = 0.0 while True: try: now_local = datetime.now().isoformat() - now_utc = datetime.now(timezone.utc).isoformat() + now_utc = datetime.now(timezone.utc).isoformat() print("-" * 110) print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) print("-" * 110) @@ -379,29 +460,45 @@ def main() -> None: m = None if m is not None and m != roster_mtime: - roster_mtime = m - try: - new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE) - if new_active or new_bench or new_ignored: - active, bench, ignored, notes_by_unit = new_active, new_bench, new_ignored, new_notes_by_unit - print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}") - log_message(LOG_FILE, ENABLE_LOGGING, - f"[roster] reloaded {len(active)} active units") - else: - print("[ROSTER] Reload skipped — no valid active units in new file") - log_message(LOG_FILE, ENABLE_LOGGING, - "[roster] reload skipped — roster parse failed or empty") - except Exception as ex: - print(f"[ROSTER] Reload failed, keeping previous roster: {ex}") - log_message(LOG_FILE, ENABLE_LOGGING, - f"[roster] reload failed, keeping previous roster: {ex}") + roster_mtime = m + try: + new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE) + if new_active or new_bench or new_ignored: + active, bench, ignored, notes_by_unit = ( + new_active, + new_bench, + new_ignored, + new_notes_by_unit, + ) + print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}") + log_message( + LOG_FILE, + ENABLE_LOGGING, + f"[roster] reloaded {len(active)} active units", + ) + else: + print("[ROSTER] Reload skipped — no valid active units in new file") + log_message( + LOG_FILE, + ENABLE_LOGGING, + "[roster] reload skipped — roster parse failed or empty", + ) + except Exception as ex: + print(f"[ROSTER] Reload failed, keeping previous roster: {ex}") + log_message( + LOG_FILE, + ENABLE_LOGGING, + f"[roster] reload failed, keeping previous roster: {ex}", + ) clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400) - logger = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) - latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger) + logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) + + latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger_fn) now_epoch = time.time() + # Active units for uid in sorted(active): info = latest.get(uid) if info is not None: @@ -412,20 +509,30 @@ def main() -> None: status, col = "Pending", C_PEN else: status, col = "OK", C_OK + note = notes_by_unit.get(uid, "") note_suffix = f" [{note}]" if note else "" - line = ("{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}" - .format(col=col, uid=uid, status=status, - age=fmt_age(now_epoch, info["mtime"]), - last=fmt_last(info["mtime"]), fname=info["fname"], note=note_suffix, rst=C_RST)) + line = ( + "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}".format( + col=col, + uid=uid, + status=status, + age=fmt_age(now_epoch, info["mtime"]), + last=fmt_last(info["mtime"]), + fname=info["fname"], + note=note_suffix, + rst=C_RST, + ) + ) else: note = notes_by_unit.get(uid, "") note_suffix = f" [{note}]" if note else "" - line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(col=C_MIS, uid=uid, note=note_suffix, rst=C_RST) + line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format( + col=C_MIS, uid=uid, note=note_suffix, rst=C_RST + ) + print(line) log_message(LOG_FILE, ENABLE_LOGGING, line) - if info is not None: - report_to_server(cfg["API_URL"], uid, info, status) # Bench Units (rostered but not active in field) print("\nBench Units (rostered, not active):") @@ -434,36 +541,46 @@ def main() -> None: note = notes_by_unit.get(uid, "") note_suffix = f" [{note}]" if note else "" if info: - line = (f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}") + line = ( + f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} " + f"(File: {info['fname']}){note_suffix}" + ) else: - line = (f"{uid:<8} Bench Last: ---{note_suffix}") + line = f"{uid:<8} Bench Last: ---{note_suffix}" print(line) log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line) - # Ignored Units (retired, broken, or do-not-care) -# ignored_detected = [u for u in latest.keys() if u in ignored] -# if ignored_detected: -# print("\nIgnored Units:") -# for uid in sorted(ignored_detected): -# info = latest[uid] -# note = notes_by_unit.get(uid, "") -# note_suffix = f" [{note}]" if note else "" -# line = (f"{uid:<8} Ignored Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}") -# print(line) -# log_message(LOG_FILE, ENABLE_LOGGING, "[ignored] " + line) + # Unexpected units unexpected = [ - u for u in latest.keys() + u + for u in latest.keys() if u not in active and u not in bench and u not in ignored and u not in notes_by_unit ] if unexpected: print("\nUnexpected Units Detected:") for uid in sorted(unexpected): info = latest[uid] - line = ("{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}" - .format(col=C_UNX, uid=uid, last=fmt_last(info["mtime"]), fname=info["fname"], rst=C_RST)) + line = ( + "{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}".format( + col=C_UNX, + uid=uid, + last=fmt_last(info["mtime"]), + fname=info["fname"], + rst=C_RST, + ) + ) print(line) log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line) + # ---- API heartbeat to SFM ---- + if cfg.get("API_ENABLED", False): + now_ts = time.time() + interval = int(cfg.get("API_INTERVAL_SECONDS", 300)) + if now_ts - last_api_ts >= interval: + payload = build_sfm_payload(latest, cfg) + send_api_payload(payload, cfg.get("API_URL", "")) + last_api_ts = now_ts + except KeyboardInterrupt: print("\nStopping...") break @@ -471,7 +588,9 @@ def main() -> None: err = "[loop-error] {}".format(e) print(err) log_message(LOG_FILE, ENABLE_LOGGING, err) + time.sleep(SCAN_INTERVAL) - + + if __name__ == "__main__": main()