Merge pull request #4 from serversdwn/dev

Roster deprecated, v1.2
This commit is contained in:
serversdwn
2025-12-04 17:16:23 -05:00
committed by GitHub

View File

@@ -1,36 +1,34 @@
""" """
Series 3 Emitter — v1.1.0 Series 3 Emitter — v1.2.0
Environment: Environment:
- Python 3.8 (Windows 7 compatible) - Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path - Runs on DL2 with Blastware 10 event path
Key Features: Key Features:
- Atomic roster downloads from Dropbox (no partial files)
- Automatic roster refresh from Dropbox at configurable interval
- Automatic hot-reload into memory when roster CSV changes
- Failsafe reload: keeps previous roster if new file is invalid or empty
- Config-driven paths, intervals, and logging - Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit - Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable) - Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####) - Safe .MLG header sniff for unit IDs (BE#### / BA####)
- NEW in v1.1.0: - Standardized SFM Telemetry JSON payload (source-agnostic)
- Standardized SFM Telemetry JSON payload (source-agnostic) - Periodic HTTP heartbeat POST to SFM backend
- Periodic HTTP heartbeat POST to SFM backend - NEW in v1.2.0:
- No local roster / CSV dependency
- Only scans .MLG files newer than MAX_EVENT_AGE_DAYS
""" """
import os import os
import re import re
import csv
import time import time
import json import json
import configparser import configparser
import urllib.request import urllib.request
import urllib.error import urllib.error
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple, Set, List from typing import Dict, Any, Optional, Tuple
from socket import gethostname from socket import gethostname
# ---------------- Config ---------------- # ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]: def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [emitter] section.""" """Load INI with tolerant inline comments and a required [emitter] section."""
@@ -61,9 +59,6 @@ def load_config(path: str) -> Dict[str, Any]:
return { return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"ROSTER_FILE": get_str("ROSTER_FILE", r"C:\SeismoEmitter\series3_roster.csv"),
"ROSTER_URL": get_str("ROSTER_URL", ""),
"ROSTER_REFRESH_MIN_SECONDS": get_int("ROSTER_REFRESH_MIN_SECONDS", 300),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300), "SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)), "OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)), "MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
@@ -73,6 +68,7 @@ def load_config(path: str) -> Dict[str, Any]:
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off "COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
"MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365),
# API heartbeat / SFM telemetry # API heartbeat / SFM telemetry
"API_ENABLED": get_bool("API_ENABLED", False), "API_ENABLED": get_bool("API_ENABLED", False),
@@ -99,6 +95,7 @@ def log_message(path: str, enabled: bool, msg: str) -> None:
with open(path, "a", encoding="utf-8") as f: with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception: except Exception:
# Logging must never crash the emitter
pass pass
@@ -126,79 +123,6 @@ def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> N
pass pass
# --------------- Roster ---------------------
def normalize_id(uid: str) -> str:
if uid is None:
return ""
return uid.replace(" ", "").strip().upper()
def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]:
"""
CSV tolerant of commas in notes: device_id, active, notes...
Returns: active, bench, ignored, notes_by_unit
"""
active: Set[str] = set()
bench: Set[str] = set()
ignored: Set[str] = set()
notes_by_unit: Dict[str, str] = {}
if not os.path.exists(path):
print("[WARN] Roster not found:", path)
return active, bench, ignored, notes_by_unit
try:
with open(path, "r", encoding="utf-8-sig", newline="") as f:
rdr = csv.reader(f)
try:
headers = next(rdr)
except StopIteration:
return active, bench, ignored, notes_by_unit
headers = [(h or "").strip().lower() for h in headers]
def idx_of(name: str, fallbacks: List[str]) -> Optional[int]:
if name in headers:
return headers.index(name)
for fb in fallbacks:
if fb in headers:
return headers.index(fb)
return None
i_id = idx_of("device_id", ["unitid", "id"])
i_ac = idx_of("active", [])
i_no = idx_of("notes", ["note", "location"])
if i_id is None or i_ac is None:
print("[WARN] Roster missing device_id/active columns")
return active, bench, ignored, notes_by_unit
for row in rdr:
if len(row) <= max(i_id, i_ac):
continue
uid = normalize_id(row[i_id])
note = ""
if i_no is not None:
extra = row[i_no:]
note = ",".join([c or "" for c in extra]).strip().rstrip(",")
if not uid:
continue
notes_by_unit[uid] = note
flag = (row[i_ac] or "").strip().lower()
if flag in ("yes", "y", "true", "1", "on"):
active.add(uid)
elif flag in ("no", "n", "off", "0"):
bench.add(uid)
elif flag in ("ignore", "retired", "old"):
ignored.add(uid)
except Exception as e:
print("[WARN] Roster read error:", e)
return active, bench, ignored, notes_by_unit
# --------------- .MLG sniff ------------------ # --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
@@ -237,17 +161,23 @@ def scan_latest(
watch: str, watch: str,
header_bytes: int, header_bytes: int,
cache: Dict[str, Tuple[float, str]], cache: Dict[str, Tuple[float, str]],
recent_cutoff: float = None, recent_cutoff: float,
max_age_days: int,
logger=None, logger=None,
) -> Dict[str, Dict[str, Any]]: ) -> Dict[str, Dict[str, Any]]:
""" """
Return newest .MLG per unit: Return newest .MLG per unit, only for files newer than max_age_days:
{uid: {'mtime': float, 'fname': str, 'path': str}} {uid: {'mtime': float, 'fname': str, 'path': str}}
""" """
latest: Dict[str, Dict[str, Any]] = {} latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch): if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch) print("[WARN] Watch path not found:", watch)
return latest return latest
now_ts = time.time()
max_age_days = max(1, int(max_age_days)) # sanity floor
max_age_seconds = max_age_days * 86400.0
try: try:
with os.scandir(watch) as it: with os.scandir(watch) as it:
for e in it: for e in it:
@@ -259,12 +189,20 @@ def scan_latest(
except Exception: except Exception:
continue continue
# Skip very old events (beyond retention window)
age_seconds = now_ts - mtime
if age_seconds < 0:
age_seconds = 0
if age_seconds > max_age_seconds:
continue # too old, ignore this file
cached = cache.get(fpath) cached = cache.get(fpath)
if cached is not None and cached[0] == mtime: if cached is not None and cached[0] == mtime:
uid = cached[1] uid = cached[1]
else: else:
uid = sniff_unit_from_mlg(fpath, header_bytes) uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid: if not uid:
# If unsniffable but very recent, log for later inspection
if (recent_cutoff is not None) and (mtime >= recent_cutoff): if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger: if logger:
logger(f"[unsniffable-recent] {fpath}") logger(f"[unsniffable-recent] {fpath}")
@@ -278,36 +216,6 @@ def scan_latest(
return latest return latest
# --- Roster fetch (Dropbox/HTTPS) helper ---
def refresh_roster_from_url(url: str, dest: str, min_seconds: int, state: dict, logger=None) -> None:
now = time.time()
# throttle fetches; only pull if enough time elapsed
if now - state.get("t", 0) < max(0, int(min_seconds or 0)):
return
try:
with urllib.request.urlopen(url, timeout=15) as r:
data = r.read()
if data and data.strip():
with open(dest, "wb") as f:
f.write(data)
state["t"] = now
if logger:
logger(
f"[roster] refreshed from {url} at "
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -> {dest} ({len(data)} bytes)"
)
except Exception as e:
if logger:
logger(f"[roster-fetch-error] {e}")
# --- config helper: case-insensitive key lookup ---
def cfg_get(cfg: dict, key: str, default=None):
return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default)))
# --- API heartbeat / SFM telemetry helpers --- # --- API heartbeat / SFM telemetry helpers ---
def send_api_payload(payload: dict, api_url: str) -> None: def send_api_payload(payload: dict, api_url: str) -> None:
if not api_url: if not api_url:
@@ -378,7 +286,6 @@ def main() -> None:
cfg = load_config(os.path.join(here, "config.ini")) cfg = load_config(os.path.join(here, "config.ini"))
WATCH_PATH = cfg["WATCH_PATH"] WATCH_PATH = cfg["WATCH_PATH"]
ROSTER_FILE = cfg["ROSTER_FILE"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"]) SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
OK_HOURS = float(cfg["OK_HOURS"]) OK_HOURS = float(cfg["OK_HOURS"])
MISSING_HOURS = float(cfg["MISSING_HOURS"]) MISSING_HOURS = float(cfg["MISSING_HOURS"])
@@ -387,52 +294,30 @@ def main() -> None:
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"]) LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
COLORIZE = bool(cfg["COLORIZE"]) COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"])
MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"])
C_OK = ansi(COLORIZE, "\033[92m") C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m") C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m") C_MIS = ansi(COLORIZE, "\033[91m")
C_UNX = ansi(COLORIZE, "\033[95m")
C_RST = ansi(COLORIZE, "\033[0m") C_RST = ansi(COLORIZE, "\033[0m")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state: Dict[str, Any] = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
print( print(
f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} " "[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
f"ROSTER_URL={'set' if url else 'not set'}" WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
)
) )
log_message( log_message(
LOG_FILE, LOG_FILE,
ENABLE_LOGGING, ENABLE_LOGGING,
f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}", "[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
) WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
),
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
) )
# cache for scanning # cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {} sniff_cache: Dict[str, Tuple[float, str]] = {}
# Always load the (possibly refreshed) local roster
try:
active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE)
except Exception as ex:
log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}")
active, bench, ignored, notes_by_unit = set(), set(), set(), {}
# track roster file modification time
try:
roster_mtime = os.path.getmtime(ROSTER_FILE)
except Exception:
roster_mtime = None
last_api_ts: float = 0.0 last_api_ts: float = 0.0
while True: while True:
@@ -443,65 +328,25 @@ def main() -> None:
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110) print("-" * 110)
# Periodically refresh roster file from Dropbox
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
# Reload roster into memory if the file changed
try:
m = os.path.getmtime(ROSTER_FILE)
except Exception:
m = None
if m is not None and m != roster_mtime:
roster_mtime = m
try:
new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE)
if new_active or new_bench or new_ignored:
active, bench, ignored, notes_by_unit = (
new_active,
new_bench,
new_ignored,
new_notes_by_unit,
)
print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}")
log_message(
LOG_FILE,
ENABLE_LOGGING,
f"[roster] reloaded {len(active)} active units",
)
else:
print("[ROSTER] Reload skipped — no valid active units in new file")
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[roster] reload skipped — roster parse failed or empty",
)
except Exception as ex:
print(f"[ROSTER] Reload failed, keeping previous roster: {ex}")
log_message(
LOG_FILE,
ENABLE_LOGGING,
f"[roster] reload failed, keeping previous roster: {ex}",
)
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400) recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400)
logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger_fn) latest = scan_latest(
WATCH_PATH,
MLG_HEADER_BYTES,
sniff_cache,
recent_cutoff,
MAX_EVENT_AGE_DAYS,
logger_fn,
)
now_epoch = time.time() now_epoch = time.time()
# Active units # Detected units summary (no roster dependency)
for uid in sorted(active): if latest:
info = latest.get(uid) print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS))
if info is not None: for uid in sorted(latest.keys()):
info = latest[uid]
age_hours = (now_epoch - info["mtime"]) / 3600.0 age_hours = (now_epoch - info["mtime"]) / 3600.0
if age_hours > MISSING_HOURS: if age_hours > MISSING_HOURS:
status, col = "Missing", C_MIS status, col = "Missing", C_MIS
@@ -510,67 +355,26 @@ def main() -> None:
else: else:
status, col = "OK", C_OK status, col = "OK", C_OK
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = ( line = (
"{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}".format( "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){rst}".format(
col=col, col=col,
uid=uid, uid=uid,
status=status, status=status,
age=fmt_age(now_epoch, info["mtime"]), age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]), last=fmt_last(info["mtime"]),
fname=info["fname"], fname=info["fname"],
note=note_suffix,
rst=C_RST, rst=C_RST,
) )
) )
else:
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(
col=C_MIS, uid=uid, note=note_suffix, rst=C_RST
)
print(line) print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line) log_message(LOG_FILE, ENABLE_LOGGING, line)
# Bench Units (rostered but not active in field)
print("\nBench Units (rostered, not active):")
for uid in sorted(bench):
info = latest.get(uid)
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
if info:
line = (
f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} "
f"(File: {info['fname']}){note_suffix}"
)
else: else:
line = f"{uid:<8} Bench Last: ---{note_suffix}" print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS))
print(line) log_message(
log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line) LOG_FILE,
ENABLE_LOGGING,
# Unexpected units "[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS),
unexpected = [
u
for u in latest.keys()
if u not in active and u not in bench and u not in ignored and u not in notes_by_unit
]
if unexpected:
print("\nUnexpected Units Detected:")
for uid in sorted(unexpected):
info = latest[uid]
line = (
"{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}".format(
col=C_UNX,
uid=uid,
last=fmt_last(info["mtime"]),
fname=info["fname"],
rst=C_RST,
) )
)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line)
# ---- API heartbeat to SFM ---- # ---- API heartbeat to SFM ----
if cfg.get("API_ENABLED", False): if cfg.get("API_ENABLED", False):