Merge: dev to main, refactor rename #1

Merged
serversdown merged 16 commits from dev into main 2026-03-03 17:12:58 -05:00
2 changed files with 245 additions and 100 deletions
Showing only changes of commit 47718e7cad - Show all commits

View File

@@ -1,4 +1,12 @@
[emitter] [emitter]
# --- API Heartbeat Settings ---
API_ENABLED = true
API_URL = http://10.0.0.40:8001/api/series3/heartbeat
API_INTERVAL_SECONDS = 300
SOURCE_ID = dl2-series3
SOURCE_TYPE = series3_emitter
# Paths # Paths
SERIES3_PATH = C:\Blastware 10\Event\autocall home SERIES3_PATH = C:\Blastware 10\Event\autocall home
ROSTER_FILE = C:\SeismoEmitter\series3_roster.csv ROSTER_FILE = C:\SeismoEmitter\series3_roster.csv

View File

@@ -1,5 +1,5 @@
""" """
Series 3 Emitter — v1.0.0 (Stable Baseline, SemVer Reset) Series 3 Emitter — v1.1.0
Environment: Environment:
- Python 3.8 (Windows 7 compatible) - Python 3.8 (Windows 7 compatible)
@@ -14,32 +14,32 @@ Key Features:
- Compact console heartbeat with status per unit - Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable) - Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####) - Safe .MLG header sniff for unit IDs (BE#### / BA####)
- NEW in v1.1.0:
Changelog: - Standardized SFM Telemetry JSON payload (source-agnostic)
- Reset to semantic versioning (from legacy v5.9 beta) - Periodic HTTP heartbeat POST to SFM backend
- Fixed stray `note=note_suffix` bug in Unexpected Units block
- Removed duplicate imports and redundant roster load at startup
- Added startup config echo (paths + URL status)
""" """
import os import os
import re import re
import csv import csv
import time import time
import json
import configparser import configparser
import urllib.request import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple, Set, List from typing import Dict, Any, Optional, Tuple, Set, List
from socket import gethostname
# ---------------- Config ---------------- # ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]: def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [emitter] section.""" """Load INI with tolerant inline comments and a required [emitter] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(';', '#')) cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str # preserve key case cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f:
txt = f.read() txt = f.read()
# Ensure we have a section header # Ensure we have a section header
if not re.search(r'^\s*\[', txt, flags=re.M): if not re.search(r"^\s*\[", txt, flags=re.M):
txt = "[emitter]\n" + txt txt = "[emitter]\n" + txt
cp.read_string(txt) cp.read_string(txt)
sec = cp["emitter"] sec = cp["emitter"]
@@ -57,7 +57,7 @@ def load_config(path: str) -> Dict[str, Any]:
v = sec.get(k, None) v = sec.get(k, None)
if v is None: if v is None:
return dflt return dflt
return v.strip().lower() in ("1","true","on","yes","y") return v.strip().lower() in ("1", "true", "on", "yes", "y")
return { return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
@@ -73,12 +73,21 @@ def load_config(path: str) -> Dict[str, Any]:
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off "COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
# API heartbeat / SFM telemetry
"API_ENABLED": get_bool("API_ENABLED", False),
"API_URL": get_str("API_URL", ""),
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_emitter"),
} }
# --------------- ANSI helpers --------------- # --------------- ANSI helpers ---------------
def ansi(enabled: bool, code: str) -> str: def ansi(enabled: bool, code: str) -> str:
return code if enabled else "" return code if enabled else ""
# --------------- Logging -------------------- # --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None: def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled: if not enabled:
@@ -92,6 +101,7 @@ def log_message(path: str, enabled: bool, msg: str) -> None:
except Exception: except Exception:
pass pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None: def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0: if not enabled or retention_days <= 0:
return return
@@ -115,15 +125,18 @@ def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> N
except Exception: except Exception:
pass pass
# --------------- Roster --------------------- # --------------- Roster ---------------------
def normalize_id(uid: str) -> str: def normalize_id(uid: str) -> str:
if uid is None: if uid is None:
return "" return ""
return uid.replace(" ", "").strip().upper() return uid.replace(" ", "").strip().upper()
def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]: def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]:
"""CSV tolerant of commas in notes: device_id, active, notes... """
Returns: active, bench, ignored, notes_by_unit CSV tolerant of commas in notes: device_id, active, notes...
Returns: active, bench, ignored, notes_by_unit
""" """
active: Set[str] = set() active: Set[str] = set()
bench: Set[str] = set() bench: Set[str] = set()
@@ -132,15 +145,18 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]
if not os.path.exists(path): if not os.path.exists(path):
print("[WARN] Roster not found:", path) print("[WARN] Roster not found:", path)
return active, notes_by_unit return active, bench, ignored, notes_by_unit
try: try:
with open(path, "r", encoding="utf-8-sig", newline="") as f: with open(path, "r", encoding="utf-8-sig", newline="") as f:
rdr = csv.reader(f) rdr = csv.reader(f)
try: try:
headers = next(rdr) headers = next(rdr)
except StopIteration: except StopIteration:
return active, notes_by_unit return active, bench, ignored, notes_by_unit
headers = [(h or "").strip().lower() for h in headers] headers = [(h or "").strip().lower() for h in headers]
def idx_of(name: str, fallbacks: List[str]) -> Optional[int]: def idx_of(name: str, fallbacks: List[str]) -> Optional[int]:
if name in headers: if name in headers:
return headers.index(name) return headers.index(name)
@@ -148,12 +164,15 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]
if fb in headers: if fb in headers:
return headers.index(fb) return headers.index(fb)
return None return None
i_id = idx_of("device_id", ["unitid","id"])
i_id = idx_of("device_id", ["unitid", "id"])
i_ac = idx_of("active", []) i_ac = idx_of("active", [])
i_no = idx_of("notes", ["note","location"]) i_no = idx_of("notes", ["note", "location"])
if i_id is None or i_ac is None: if i_id is None or i_ac is None:
print("[WARN] Roster missing device_id/active columns") print("[WARN] Roster missing device_id/active columns")
return active, notes_by_unit return active, bench, ignored, notes_by_unit
for row in rdr: for row in rdr:
if len(row) <= max(i_id, i_ac): if len(row) <= max(i_id, i_ac):
continue continue
@@ -162,25 +181,28 @@ def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]
if i_no is not None: if i_no is not None:
extra = row[i_no:] extra = row[i_no:]
note = ",".join([c or "" for c in extra]).strip().rstrip(",") note = ",".join([c or "" for c in extra]).strip().rstrip(",")
notes_by_unit[uid] = note
if not uid: if not uid:
continue continue
is_active = (row[i_ac] or "").strip().lower() in ("yes","y","true","1","on") notes_by_unit[uid] = note
flag = (row[i_ac] or "").strip().lower() flag = (row[i_ac] or "").strip().lower()
if flag in ("yes","y","true","1","on"): if flag in ("yes", "y", "true", "1", "on"):
active.add(uid) active.add(uid)
elif flag in ("no","n","off","0"): elif flag in ("no", "n", "off", "0"):
bench.add(uid) bench.add(uid)
elif flag in ("ignore","retired","old"): elif flag in ("ignore", "retired", "old"):
ignored.add(uid) ignored.add(uid)
except Exception as e: except Exception as e:
print("[WARN] Roster read error:", e) print("[WARN] Roster read error:", e)
return active, bench, ignored, notes_by_unit return active, bench, ignored, notes_by_unit
# --------------- .MLG sniff ------------------ # --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None.""" """Return BE####/BA#### from header bytes, or None."""
try: try:
@@ -198,21 +220,30 @@ def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
except Exception: except Exception:
return None return None
# --------------- Scan helpers --------------- # --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str: def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str: def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60) mins = int((now_epoch - mtime) // 60)
if mins < 0: mins = 0 if mins < 0:
return "{}h {}m".format(mins//60, mins%60) mins = 0
return "{}h {}m".format(mins // 60, mins % 60)
def scan_latest(watch: str, header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float = None,
logger=None):
"""Return newest .MLG per unit: {uid: {'mtime': float, 'fname': str}}""" def scan_latest(
watch: str,
header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float = None,
logger=None,
) -> Dict[str, Dict[str, Any]]:
"""
Return newest .MLG per unit:
{uid: {'mtime': float, 'fname': str, 'path': str}}
"""
latest: Dict[str, Dict[str, Any]] = {} latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch): if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch) print("[WARN] Watch path not found:", watch)
@@ -227,26 +258,28 @@ def scan_latest(watch: str, header_bytes: int,
mtime = e.stat().st_mtime mtime = e.stat().st_mtime
except Exception: except Exception:
continue continue
cached = cache.get(fpath) cached = cache.get(fpath)
if cached is not None and cached[0] == mtime: if cached is not None and cached[0] == mtime:
uid = cached[1] uid = cached[1]
else: else:
uid = sniff_unit_from_mlg(fpath, header_bytes) uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid: if not uid:
if (recent_cutoff is not None) and (mtime >= recent_cutoff): if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger: if logger:
logger(f"[unsniffable-recent] {fpath}") logger(f"[unsniffable-recent] {fpath}")
continue # skip file if no unit ID found in header continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid) cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]): if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name} latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath}
except Exception as ex: except Exception as ex:
print("[WARN] Scan error:", ex) print("[WARN] Scan error:", ex)
return latest return latest
# --- Roster fetch (Dropbox/HTTPS) helper --- # --- Roster fetch (Dropbox/HTTPS) helper ---
def refresh_roster_from_url(url: str, dest: str, min_seconds: int, def refresh_roster_from_url(url: str, dest: str, min_seconds: int, state: dict, logger=None) -> None:
state: dict, logger=None):
now = time.time() now = time.time()
# throttle fetches; only pull if enough time elapsed # throttle fetches; only pull if enough time elapsed
@@ -261,9 +294,10 @@ def refresh_roster_from_url(url: str, dest: str, min_seconds: int,
f.write(data) f.write(data)
state["t"] = now state["t"] = now
if logger: if logger:
from datetime import datetime logger(
logger(f"[roster] refreshed from {url} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} " f"[roster] refreshed from {url} at "
f"-> {dest} ({len(data)} bytes)") f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} -> {dest} ({len(data)} bytes)"
)
except Exception as e: except Exception as e:
if logger: if logger:
logger(f"[roster-fetch-error] {e}") logger(f"[roster-fetch-error] {e}")
@@ -273,6 +307,71 @@ def refresh_roster_from_url(url: str, dest: str, min_seconds: int,
def cfg_get(cfg: dict, key: str, default=None): def cfg_get(cfg: dict, key: str, default=None):
return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default))) return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default)))
# --- API heartbeat / SFM telemetry helpers ---
def send_api_payload(payload: dict, api_url: str) -> None:
if not api_url:
return
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as res:
print(f"[API] POST success: {res.status}")
except urllib.error.URLError as e:
print(f"[API] POST failed: {e}")
def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""
Build SFM Telemetry JSON v1 payload from latest-unit dict.
Schema is source-agnostic and future-proof.
"""
now_iso = datetime.now(timezone.utc).isoformat()
now_ts = time.time()
payload = {
"source_id": cfg.get("SOURCE_ID", gethostname()),
"source_type": cfg.get("SOURCE_TYPE", "series3_emitter"),
"timestamp": now_iso,
"units": [],
}
for unit_id, info in units_dict.items():
mtime = info.get("mtime")
if mtime is not None:
last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
age_minutes = int(max(0, (now_ts - mtime) // 60))
else:
last_event_iso = None
age_minutes = None
file_path = info.get("path")
file_size = None
if file_path:
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
payload["units"].append(
{
"unit_id": unit_id,
"last_event_time": last_event_iso,
"age_minutes": age_minutes,
"observation_method": "mlg_scan",
"event_metadata": {
"file_name": info.get("fname"),
"file_path": file_path,
"file_size_bytes": file_size,
"event_number": None,
"event_type": None,
},
}
)
return payload
# --------------- Main loop ------------------ # --------------- Main loop ------------------
def main() -> None: def main() -> None:
here = os.path.dirname(__file__) or "." here = os.path.dirname(__file__) or "."
@@ -289,24 +388,25 @@ def main() -> None:
COLORIZE = bool(cfg["COLORIZE"]) COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
C_OK = ansi(COLORIZE, "\033[92m") C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m") C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m") C_MIS = ansi(COLORIZE, "\033[91m")
C_UNX = ansi(COLORIZE, "\033[95m") C_UNX = ansi(COLORIZE, "\033[95m")
C_RST = ansi(COLORIZE, "\033[0m") C_RST = ansi(COLORIZE, "\033[0m")
# --- Dropbox roster refresh (pull CSV to local cache) --- # --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {} roster_state: Dict[str, Any] = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "") url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# 🔎 Patch 3: startup config echo (helps debugging) print(
print(f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} "
# (optional, also write it to the log file) f"ROSTER_URL={'set' if url else 'not set'}"
log_message(LOG_FILE, ENABLE_LOGGING, )
f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") log_message(
LOG_FILE,
ENABLE_LOGGING,
f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}",
)
if url.lower().startswith("http"): if url.lower().startswith("http"):
refresh_roster_from_url( refresh_roster_from_url(
@@ -320,28 +420,25 @@ def main() -> None:
# cache for scanning # cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {} sniff_cache: Dict[str, Tuple[float, str]] = {}
# Always load the (possibly refreshed) local roster # Always load the (possibly refreshed) local roster
try: try:
active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE) active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE)
except Exception as ex: except Exception as ex:
log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}") log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}")
active = set() active, bench, ignored, notes_by_unit = set(), set(), set(), {}
bench = set()
ignored = set()
notes_by_unit = {}
# track roster file modification time # track roster file modification time
try: try:
roster_mtime = os.path.getmtime(ROSTER_FILE) roster_mtime = os.path.getmtime(ROSTER_FILE)
except Exception: except Exception:
roster_mtime = None roster_mtime = None
last_api_ts: float = 0.0
while True: while True:
try: try:
now_local = datetime.now().isoformat() now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat() now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110) print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110) print("-" * 110)
@@ -363,29 +460,45 @@ def main() -> None:
m = None m = None
if m is not None and m != roster_mtime: if m is not None and m != roster_mtime:
roster_mtime = m roster_mtime = m
try: try:
new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE) new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE)
if new_active or new_bench or new_ignored: if new_active or new_bench or new_ignored:
active, bench, ignored, notes_by_unit = new_active, new_bench, new_ignored, new_notes_by_unit active, bench, ignored, notes_by_unit = (
print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}") new_active,
log_message(LOG_FILE, ENABLE_LOGGING, new_bench,
f"[roster] reloaded {len(active)} active units") new_ignored,
else: new_notes_by_unit,
print("[ROSTER] Reload skipped — no valid active units in new file") )
log_message(LOG_FILE, ENABLE_LOGGING, print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}")
"[roster] reload skipped — roster parse failed or empty") log_message(
except Exception as ex: LOG_FILE,
print(f"[ROSTER] Reload failed, keeping previous roster: {ex}") ENABLE_LOGGING,
log_message(LOG_FILE, ENABLE_LOGGING, f"[roster] reloaded {len(active)} active units",
f"[roster] reload failed, keeping previous roster: {ex}") )
else:
print("[ROSTER] Reload skipped — no valid active units in new file")
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[roster] reload skipped — roster parse failed or empty",
)
except Exception as ex:
print(f"[ROSTER] Reload failed, keeping previous roster: {ex}")
log_message(
LOG_FILE,
ENABLE_LOGGING,
f"[roster] reload failed, keeping previous roster: {ex}",
)
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400) recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400)
logger = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger)
latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger_fn)
now_epoch = time.time() now_epoch = time.time()
# Active units
for uid in sorted(active): for uid in sorted(active):
info = latest.get(uid) info = latest.get(uid)
if info is not None: if info is not None:
@@ -396,16 +509,28 @@ def main() -> None:
status, col = "Pending", C_PEN status, col = "Pending", C_PEN
else: else:
status, col = "OK", C_OK status, col = "OK", C_OK
note = notes_by_unit.get(uid, "") note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else "" note_suffix = f" [{note}]" if note else ""
line = ("{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}" line = (
.format(col=col, uid=uid, status=status, "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}".format(
age=fmt_age(now_epoch, info["mtime"]), col=col,
last=fmt_last(info["mtime"]), fname=info["fname"], note=note_suffix, rst=C_RST)) uid=uid,
status=status,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]),
fname=info["fname"],
note=note_suffix,
rst=C_RST,
)
)
else: else:
note = notes_by_unit.get(uid, "") note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else "" note_suffix = f" [{note}]" if note else ""
line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(col=C_MIS, uid=uid, note=note_suffix, rst=C_RST) line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(
col=C_MIS, uid=uid, note=note_suffix, rst=C_RST
)
print(line) print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line) log_message(LOG_FILE, ENABLE_LOGGING, line)
@@ -416,36 +541,46 @@ def main() -> None:
note = notes_by_unit.get(uid, "") note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else "" note_suffix = f" [{note}]" if note else ""
if info: if info:
line = (f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}") line = (
f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} "
f"(File: {info['fname']}){note_suffix}"
)
else: else:
line = (f"{uid:<8} Bench Last: ---{note_suffix}") line = f"{uid:<8} Bench Last: ---{note_suffix}"
print(line) print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line) log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line)
# Ignored Units (retired, broken, or do-not-care) # Unexpected units
# ignored_detected = [u for u in latest.keys() if u in ignored]
# if ignored_detected:
# print("\nIgnored Units:")
# for uid in sorted(ignored_detected):
# info = latest[uid]
# note = notes_by_unit.get(uid, "")
# note_suffix = f" [{note}]" if note else ""
# line = (f"{uid:<8} Ignored Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}")
# print(line)
# log_message(LOG_FILE, ENABLE_LOGGING, "[ignored] " + line)
unexpected = [ unexpected = [
u for u in latest.keys() u
for u in latest.keys()
if u not in active and u not in bench and u not in ignored and u not in notes_by_unit if u not in active and u not in bench and u not in ignored and u not in notes_by_unit
] ]
if unexpected: if unexpected:
print("\nUnexpected Units Detected:") print("\nUnexpected Units Detected:")
for uid in sorted(unexpected): for uid in sorted(unexpected):
info = latest[uid] info = latest[uid]
line = ("{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}" line = (
.format(col=C_UNX, uid=uid, last=fmt_last(info["mtime"]), fname=info["fname"], rst=C_RST)) "{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}".format(
col=C_UNX,
uid=uid,
last=fmt_last(info["mtime"]),
fname=info["fname"],
rst=C_RST,
)
)
print(line) print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line) log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line)
# ---- API heartbeat to SFM ----
if cfg.get("API_ENABLED", False):
now_ts = time.time()
interval = int(cfg.get("API_INTERVAL_SECONDS", 300))
if now_ts - last_api_ts >= interval:
payload = build_sfm_payload(latest, cfg)
send_api_payload(payload, cfg.get("API_URL", ""))
last_api_ts = now_ts
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nStopping...") print("\nStopping...")
break break
@@ -453,7 +588,9 @@ def main() -> None:
err = "[loop-error] {}".format(e) err = "[loop-error] {}".format(e)
print(err) print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err) log_message(LOG_FILE, ENABLE_LOGGING, err)
time.sleep(SCAN_INTERVAL) time.sleep(SCAN_INTERVAL)
if __name__ == "__main__": if __name__ == "__main__":
main() main()