Files
series3-watcher/series3_emitter.py
2025-11-20 18:24:57 -05:00

478 lines
19 KiB
Python

"""
Series 3 Emitter — v1.0.0 (Stable Baseline, SemVer Reset)
Environment:
- Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path
Key Features:
- Atomic roster downloads from Dropbox (no partial files)
- Automatic roster refresh from Dropbox at configurable interval
- Automatic hot-reload into memory when roster CSV changes
- Failsafe reload: keeps previous roster if new file is invalid or empty
- Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
Changelog:
- Reset to semantic versioning (from legacy v5.9 beta)
- Fixed stray `note=note_suffix` bug in Unexpected Units block
- Removed duplicate imports and redundant roster load at startup
- Added startup config echo (paths + URL status)
"""
import os
import re
import csv
import time
import configparser
import urllib.request
import requests
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple, Set, List
# ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [emitter] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(';', '#'))
cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
# Ensure we have a section header
if not re.search(r'^\s*\[', txt, flags=re.M):
txt = "[emitter]\n" + txt
cp.read_string(txt)
sec = cp["emitter"]
def get_str(k: str, dflt: str) -> str:
return sec.get(k, dflt).strip()
def get_int(k: str, dflt: int) -> int:
try:
return int(sec.get(k, str(dflt)).strip())
except Exception:
return dflt
def get_bool(k: str, dflt: bool) -> bool:
v = sec.get(k, None)
if v is None:
return dflt
return v.strip().lower() in ("1","true","on","yes","y")
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"ROSTER_FILE": get_str("ROSTER_FILE", r"C:\SeismoEmitter\series3_roster.csv"),
"ROSTER_URL": get_str("ROSTER_URL", ""),
"ROSTER_REFRESH_MIN_SECONDS": get_int("ROSTER_REFRESH_MIN_SECONDS", 300),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\emitter_logs\series3_emitter.log"),
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
}
# --------------- ANSI helpers ---------------
def ansi(enabled: bool, code: str) -> str:
return code if enabled else ""
# --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0:
return
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
now = datetime.now(timezone.utc)
last = None
try:
if os.path.exists(stamp_file):
with open(stamp_file, "r", encoding="utf-8") as f:
last = datetime.fromisoformat(f.read().strip())
except Exception:
last = None
if (last is None) or (now - last > timedelta(days=retention_days)):
try:
if os.path.exists(log_file):
open(log_file, "w", encoding="utf-8").close()
with open(stamp_file, "w", encoding="utf-8") as f:
f.write(now.isoformat())
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
log_message(log_file, enabled, "Logs auto-cleared")
except Exception:
pass
# --------------- Roster ---------------------
def normalize_id(uid: str) -> str:
if uid is None:
return ""
return uid.replace(" ", "").strip().upper()
def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]:
"""CSV tolerant of commas in notes: device_id, active, notes...
Returns: active, bench, ignored, notes_by_unit
"""
active: Set[str] = set()
bench: Set[str] = set()
ignored: Set[str] = set()
notes_by_unit: Dict[str, str] = {}
if not os.path.exists(path):
print("[WARN] Roster not found:", path)
return active, notes_by_unit
try:
with open(path, "r", encoding="utf-8-sig", newline="") as f:
rdr = csv.reader(f)
try:
headers = next(rdr)
except StopIteration:
return active, notes_by_unit
headers = [(h or "").strip().lower() for h in headers]
def idx_of(name: str, fallbacks: List[str]) -> Optional[int]:
if name in headers:
return headers.index(name)
for fb in fallbacks:
if fb in headers:
return headers.index(fb)
return None
i_id = idx_of("device_id", ["unitid","id"])
i_ac = idx_of("active", [])
i_no = idx_of("notes", ["note","location"])
if i_id is None or i_ac is None:
print("[WARN] Roster missing device_id/active columns")
return active, notes_by_unit
for row in rdr:
if len(row) <= max(i_id, i_ac):
continue
uid = normalize_id(row[i_id])
note = ""
if i_no is not None:
extra = row[i_no:]
note = ",".join([c or "" for c in extra]).strip().rstrip(",")
notes_by_unit[uid] = note
if not uid:
continue
is_active = (row[i_ac] or "").strip().lower() in ("yes","y","true","1","on")
flag = (row[i_ac] or "").strip().lower()
if flag in ("yes","y","true","1","on"):
active.add(uid)
elif flag in ("no","n","off","0"):
bench.add(uid)
elif flag in ("ignore","retired","old"):
ignored.add(uid)
except Exception as e:
print("[WARN] Roster read error:", e)
return active, bench, ignored, notes_by_unit
# --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None."""
try:
with open(path, "rb") as f:
chunk = f.read(max(256, min(header_bytes, 65536)))
m = UNIT_BYTES_RE.search(chunk)
if not m:
return None
raw = m.group(0)
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
try:
return cleaned.decode("ascii").upper()
except Exception:
return None
except Exception:
return None
# --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60)
if mins < 0: mins = 0
return "{}h {}m".format(mins//60, mins%60)
def scan_latest(watch: str, header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float = None,
logger=None):
"""Return newest .MLG per unit: {uid: {'mtime': float, 'fname': str}}"""
latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch)
return latest
try:
with os.scandir(watch) as it:
for e in it:
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
continue
fpath = e.path
try:
mtime = e.stat().st_mtime
except Exception:
continue
cached = cache.get(fpath)
if cached is not None and cached[0] == mtime:
uid = cached[1]
else:
uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid:
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger:
logger(f"[unsniffable-recent] {fpath}")
continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name}
except Exception as ex:
print("[WARN] Scan error:", ex)
return latest
# --- Roster fetch (Dropbox/HTTPS) helper ---
def refresh_roster_from_url(url: str, dest: str, min_seconds: int,
state: dict, logger=None):
now = time.time()
# throttle fetches; only pull if enough time elapsed
if now - state.get("t", 0) < max(0, int(min_seconds or 0)):
return
try:
with urllib.request.urlopen(url, timeout=15) as r:
data = r.read()
if data and data.strip():
with open(dest, "wb") as f:
f.write(data)
state["t"] = now
if logger:
from datetime import datetime
logger(f"[roster] refreshed from {url} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
f"-> {dest} ({len(data)} bytes)")
except Exception as e:
if logger:
logger(f"[roster-fetch-error] {e}")
# --- config helper: case-insensitive key lookup ---
def cfg_get(cfg: dict, key: str, default=None):
return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default)))
#---Report to server ---
def report_to_server(server_url: str, uid: str, info: dict, status: str):
payload = {
"unit": uid,
"unit_type": "series3",
"timestamp": fmt_last(info["mtime"]),
"file": info["fname"],
"status": status
}
try:
requests.post(server_url, json=payload, timeout=5)
except Exception as e:
print(f"[WARN] report_to_server failed for {uid}: {e}")
# --------------- Main loop ------------------
def main() -> None:
here = os.path.dirname(__file__) or "."
cfg = load_config(os.path.join(here, "config.ini"))
WATCH_PATH = cfg["WATCH_PATH"]
ROSTER_FILE = cfg["ROSTER_FILE"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
OK_HOURS = float(cfg["OK_HOURS"])
MISSING_HOURS = float(cfg["MISSING_HOURS"])
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
LOG_FILE = cfg["LOG_FILE"]
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m")
C_UNX = ansi(COLORIZE, "\033[95m")
C_RST = ansi(COLORIZE, "\033[0m")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# 🔎 Patch 3: startup config echo (helps debugging)
print(f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}")
# (optional, also write it to the log file)
log_message(LOG_FILE, ENABLE_LOGGING,
f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}")
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
# cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {}
# Always load the (possibly refreshed) local roster
try:
active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE)
except Exception as ex:
log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}")
active = set()
bench = set()
ignored = set()
notes_by_unit = {}
# track roster file modification time
try:
roster_mtime = os.path.getmtime(ROSTER_FILE)
except Exception:
roster_mtime = None
while True:
try:
now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110)
# Periodically refresh roster file from Dropbox
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
# Reload roster into memory if the file changed
try:
m = os.path.getmtime(ROSTER_FILE)
except Exception:
m = None
if m is not None and m != roster_mtime:
roster_mtime = m
try:
new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE)
if new_active or new_bench or new_ignored:
active, bench, ignored, notes_by_unit = new_active, new_bench, new_ignored, new_notes_by_unit
print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}")
log_message(LOG_FILE, ENABLE_LOGGING,
f"[roster] reloaded {len(active)} active units")
else:
print("[ROSTER] Reload skipped — no valid active units in new file")
log_message(LOG_FILE, ENABLE_LOGGING,
"[roster] reload skipped — roster parse failed or empty")
except Exception as ex:
print(f"[ROSTER] Reload failed, keeping previous roster: {ex}")
log_message(LOG_FILE, ENABLE_LOGGING,
f"[roster] reload failed, keeping previous roster: {ex}")
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400)
logger = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger)
now_epoch = time.time()
for uid in sorted(active):
info = latest.get(uid)
if info is not None:
age_hours = (now_epoch - info["mtime"]) / 3600.0
if age_hours > MISSING_HOURS:
status, col = "Missing", C_MIS
elif age_hours > OK_HOURS:
status, col = "Pending", C_PEN
else:
status, col = "OK", C_OK
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = ("{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}"
.format(col=col, uid=uid, status=status,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]), fname=info["fname"], note=note_suffix, rst=C_RST))
else:
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(col=C_MIS, uid=uid, note=note_suffix, rst=C_RST)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line)
if info is not None:
report_to_server(cfg["API_URL"], uid, info, status)
# Bench Units (rostered but not active in field)
print("\nBench Units (rostered, not active):")
for uid in sorted(bench):
info = latest.get(uid)
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
if info:
line = (f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}")
else:
line = (f"{uid:<8} Bench Last: ---{note_suffix}")
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line)
# Ignored Units (retired, broken, or do-not-care)
# ignored_detected = [u for u in latest.keys() if u in ignored]
# if ignored_detected:
# print("\nIgnored Units:")
# for uid in sorted(ignored_detected):
# info = latest[uid]
# note = notes_by_unit.get(uid, "")
# note_suffix = f" [{note}]" if note else ""
# line = (f"{uid:<8} Ignored Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}")
# print(line)
# log_message(LOG_FILE, ENABLE_LOGGING, "[ignored] " + line)
unexpected = [
u for u in latest.keys()
if u not in active and u not in bench and u not in ignored and u not in notes_by_unit
]
if unexpected:
print("\nUnexpected Units Detected:")
for uid in sorted(unexpected):
info = latest[uid]
line = ("{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}"
.format(col=C_UNX, uid=uid, last=fmt_last(info["mtime"]), fname=info["fname"], rst=C_RST))
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line)
except KeyboardInterrupt:
print("\nStopping...")
break
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
time.sleep(SCAN_INTERVAL)
if __name__ == "__main__":
main()