Feat: tray icon now shows API/watcher health rather than unit ages. unit submenu removed, now handled by recieving software. Chore: remove old unneeded code from deprecated features (console colorization, Missing/pending age limits)
472 lines
17 KiB
Python
472 lines
17 KiB
Python
"""
|
|
Series 3 Watcher — v1.4.2
|
|
|
|
Environment:
|
|
- Python 3.8 (Windows 7 compatible)
|
|
- Runs on DL2 with Blastware 10 event path
|
|
|
|
Key Features:
|
|
- Config-driven paths, intervals, and logging
|
|
- Compact console heartbeat with status per unit
|
|
- Logging with retention auto-clean (days configurable)
|
|
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
|
|
- Standardized SFM Telemetry JSON payload (source-agnostic)
|
|
- Periodic HTTP heartbeat POST to SFM backend
|
|
- Tray-friendly: run_watcher(state, stop_event) for background thread use
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import json
|
|
import threading
|
|
import configparser
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from socket import gethostname
|
|
|
|
|
|
# ---------------- Config ----------------
|
|
def load_config(path: str) -> Dict[str, Any]:
|
|
"""Load INI with tolerant inline comments and a required [agent] section."""
|
|
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
|
|
cp.optionxform = str # preserve key case
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
txt = f.read()
|
|
# Ensure we have a section header
|
|
if not re.search(r"^\s*\[", txt, flags=re.M):
|
|
txt = "[agent]\n" + txt
|
|
cp.read_string(txt)
|
|
sec = cp["agent"]
|
|
|
|
def get_str(k: str, dflt: str) -> str:
|
|
return sec.get(k, dflt).strip()
|
|
|
|
def get_int(k: str, dflt: int) -> int:
|
|
try:
|
|
return int(sec.get(k, str(dflt)).strip())
|
|
except Exception:
|
|
return dflt
|
|
|
|
def get_bool(k: str, dflt: bool) -> bool:
|
|
v = sec.get(k, None)
|
|
if v is None:
|
|
return dflt
|
|
return v.strip().lower() in ("1", "true", "on", "yes", "y")
|
|
|
|
return {
|
|
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
|
|
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
|
|
"OK_HOURS": float(get_int("OK_HOURS", 12)),
|
|
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
|
|
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
|
|
"LOG_FILE": get_str("LOG_FILE", os.path.join(
|
|
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
|
|
"Series3Watcher", "agent_logs", "series3_watcher.log"
|
|
)),
|
|
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
|
|
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
|
|
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
|
|
"MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365),
|
|
|
|
# API heartbeat / SFM telemetry
|
|
"API_ENABLED": get_bool("API_ENABLED", False),
|
|
"API_URL": get_str("API_URL", ""),
|
|
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
|
|
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
|
|
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_watcher"),
|
|
}
|
|
|
|
|
|
|
|
# --------------- Logging --------------------
|
|
def log_message(path: str, enabled: bool, msg: str) -> None:
|
|
if not enabled:
|
|
return
|
|
try:
|
|
d = os.path.dirname(path) or "."
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
with open(path, "a", encoding="utf-8") as f:
|
|
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
|
|
except Exception:
|
|
# Logging must never crash the watcher
|
|
pass
|
|
|
|
|
|
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
|
|
if not enabled or retention_days <= 0:
|
|
return
|
|
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
|
|
now = datetime.now(timezone.utc)
|
|
last = None
|
|
try:
|
|
if os.path.exists(stamp_file):
|
|
with open(stamp_file, "r", encoding="utf-8") as f:
|
|
last = datetime.fromisoformat(f.read().strip())
|
|
except Exception:
|
|
last = None
|
|
if (last is None) or (now - last > timedelta(days=retention_days)):
|
|
try:
|
|
if os.path.exists(log_file):
|
|
open(log_file, "w", encoding="utf-8").close()
|
|
with open(stamp_file, "w", encoding="utf-8") as f:
|
|
f.write(now.isoformat())
|
|
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
|
|
log_message(log_file, enabled, "Logs auto-cleared")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# --------------- .MLG sniff ------------------
|
|
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
|
|
|
|
|
|
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
|
|
"""Return BE####/BA#### from header bytes, or None."""
|
|
try:
|
|
with open(path, "rb") as f:
|
|
chunk = f.read(max(256, min(header_bytes, 65536)))
|
|
m = UNIT_BYTES_RE.search(chunk)
|
|
if not m:
|
|
return None
|
|
raw = m.group(0)
|
|
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
|
|
try:
|
|
return cleaned.decode("ascii").upper()
|
|
except Exception:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# --------------- Scan helpers ---------------
|
|
def fmt_last(ts: float) -> str:
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def fmt_age(now_epoch: float, mtime: float) -> str:
|
|
mins = int((now_epoch - mtime) // 60)
|
|
if mins < 0:
|
|
mins = 0
|
|
return "{}h {}m".format(mins // 60, mins % 60)
|
|
|
|
|
|
def scan_latest(
|
|
watch: str,
|
|
header_bytes: int,
|
|
cache: Dict[str, Tuple[float, str]],
|
|
recent_cutoff: float,
|
|
max_age_days: int,
|
|
logger=None,
|
|
) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Return newest .MLG per unit, only for files newer than max_age_days:
|
|
{uid: {'mtime': float, 'fname': str, 'path': str}}
|
|
"""
|
|
latest: Dict[str, Dict[str, Any]] = {}
|
|
if not os.path.exists(watch):
|
|
print("[WARN] Watch path not found:", watch)
|
|
return latest
|
|
|
|
now_ts = time.time()
|
|
max_age_days = max(1, int(max_age_days)) # sanity floor
|
|
max_age_seconds = max_age_days * 86400.0
|
|
|
|
try:
|
|
with os.scandir(watch) as it:
|
|
for e in it:
|
|
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
|
|
continue
|
|
fpath = e.path
|
|
try:
|
|
mtime = e.stat().st_mtime
|
|
except Exception:
|
|
continue
|
|
|
|
# Skip very old events (beyond retention window)
|
|
age_seconds = now_ts - mtime
|
|
if age_seconds < 0:
|
|
age_seconds = 0
|
|
if age_seconds > max_age_seconds:
|
|
continue # too old, ignore this file
|
|
|
|
cached = cache.get(fpath)
|
|
if cached is not None and cached[0] == mtime:
|
|
uid = cached[1]
|
|
else:
|
|
uid = sniff_unit_from_mlg(fpath, header_bytes)
|
|
if not uid:
|
|
# If unsniffable but very recent, log for later inspection
|
|
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
|
|
if logger:
|
|
logger("[unsniffable-recent] {}".format(fpath))
|
|
continue # skip file if no unit ID found in header
|
|
cache[fpath] = (mtime, uid)
|
|
|
|
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
|
|
latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath}
|
|
except Exception as ex:
|
|
print("[WARN] Scan error:", ex)
|
|
return latest
|
|
|
|
|
|
# --- API heartbeat / SFM telemetry helpers ---
|
|
VERSION = "1.4.2"
|
|
|
|
|
|
def _read_log_tail(log_file: str, n: int = 25) -> Optional[list]:
|
|
"""Return the last n lines of the log file as a list of strings, or None on failure."""
|
|
if not log_file:
|
|
return None
|
|
try:
|
|
with open(log_file, "r", errors="replace") as f:
|
|
lines = f.readlines()
|
|
return [l.rstrip("\n") for l in lines[-n:]]
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def send_api_payload(payload: dict, api_url: str) -> Optional[dict]:
|
|
"""POST payload to API. Returns parsed JSON response dict, or None on failure."""
|
|
if not api_url:
|
|
return None
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=5) as res:
|
|
print("[API] POST success: {}".format(res.status))
|
|
try:
|
|
return json.loads(res.read().decode("utf-8"))
|
|
except Exception:
|
|
return None
|
|
except urllib.error.URLError as e:
|
|
print("[API] POST failed: {}".format(e))
|
|
return None
|
|
|
|
|
|
def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
|
|
"""
|
|
Build SFM Telemetry JSON v1 payload from latest-unit dict.
|
|
Schema is source-agnostic and future-proof.
|
|
"""
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
now_ts = time.time()
|
|
|
|
payload = {
|
|
"source_id": cfg.get("SOURCE_ID", gethostname()),
|
|
"source_type": cfg.get("SOURCE_TYPE", "series3_watcher"),
|
|
"timestamp": now_iso,
|
|
"units": [],
|
|
}
|
|
|
|
for unit_id, info in units_dict.items():
|
|
mtime = info.get("mtime")
|
|
if mtime is not None:
|
|
last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
|
|
age_minutes = int(max(0, (now_ts - mtime) // 60))
|
|
else:
|
|
last_event_iso = None
|
|
age_minutes = None
|
|
|
|
file_path = info.get("path")
|
|
file_size = None
|
|
if file_path:
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
except Exception:
|
|
file_size = None
|
|
|
|
payload["units"].append(
|
|
{
|
|
"unit_id": unit_id,
|
|
"last_event_time": last_event_iso,
|
|
"age_minutes": age_minutes,
|
|
"observation_method": "mlg_scan",
|
|
"event_metadata": {
|
|
"file_name": info.get("fname"),
|
|
"file_path": file_path,
|
|
"file_size_bytes": file_size,
|
|
"event_number": None,
|
|
"event_type": None,
|
|
},
|
|
}
|
|
)
|
|
|
|
return payload
|
|
|
|
|
|
# --------------- Watcher loop (tray-friendly) ----------------
|
|
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
|
"""
|
|
Main watcher loop. Runs in a background thread when launched from the tray.
|
|
|
|
state dict is written on every scan cycle:
|
|
state["status"] — "running" | "error" | "starting"
|
|
state["api_status"] — "ok" | "fail" | "disabled"
|
|
state["units"] — list of dicts: {uid, age_hours, last, fname}
|
|
state["last_scan"] — datetime of last successful scan (or None)
|
|
state["last_api"] — datetime of last successful API POST (or None)
|
|
state["last_error"] — last error string (or None)
|
|
state["log_dir"] — directory containing the log file
|
|
state["cfg"] — loaded config dict
|
|
"""
|
|
if getattr(sys, "frozen", False):
|
|
here = os.path.dirname(os.path.abspath(sys.executable))
|
|
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or here
|
|
config_dir = os.path.join(_appdata, "Series3Watcher")
|
|
else:
|
|
here = os.path.dirname(os.path.abspath(__file__)) or "."
|
|
config_dir = here
|
|
config_path = os.path.join(config_dir, "config.ini")
|
|
|
|
state["status"] = "starting"
|
|
state["units"] = []
|
|
state["last_scan"] = None
|
|
state["last_error"] = None
|
|
state["log_dir"] = None
|
|
state["cfg"] = {}
|
|
|
|
try:
|
|
cfg = load_config(config_path)
|
|
except Exception as e:
|
|
state["status"] = "error"
|
|
state["last_error"] = "Config load failed: {}".format(e)
|
|
return
|
|
|
|
state["cfg"] = cfg
|
|
state["log_dir"] = os.path.dirname(cfg["LOG_FILE"]) or here
|
|
|
|
WATCH_PATH = cfg["WATCH_PATH"]
|
|
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
|
|
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
|
|
LOG_FILE = cfg["LOG_FILE"]
|
|
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
|
|
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
|
|
RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"])
|
|
MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"])
|
|
|
|
print(
|
|
"[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
|
|
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
|
|
)
|
|
)
|
|
log_message(
|
|
LOG_FILE,
|
|
ENABLE_LOGGING,
|
|
"[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
|
|
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
|
|
),
|
|
)
|
|
|
|
sniff_cache: Dict[str, Tuple[float, str]] = {}
|
|
last_api_ts: float = 0.0
|
|
|
|
while not stop_event.is_set():
|
|
try:
|
|
now_local = datetime.now().isoformat()
|
|
now_utc = datetime.now(timezone.utc).isoformat()
|
|
print("-" * 110)
|
|
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
|
|
print("-" * 110)
|
|
|
|
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
|
|
recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400)
|
|
logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
|
|
|
|
latest = scan_latest(
|
|
WATCH_PATH,
|
|
MLG_HEADER_BYTES,
|
|
sniff_cache,
|
|
recent_cutoff,
|
|
MAX_EVENT_AGE_DAYS,
|
|
logger_fn,
|
|
)
|
|
now_epoch = time.time()
|
|
|
|
# Log detected units to console and log file (info only, no status judgement)
|
|
unit_list = []
|
|
if latest:
|
|
print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS))
|
|
for uid in sorted(latest.keys()):
|
|
info = latest[uid]
|
|
age_hours = (now_epoch - info["mtime"]) / 3600.0
|
|
unit_list.append({
|
|
"uid": uid,
|
|
"age_hours": age_hours,
|
|
"last": fmt_last(info["mtime"]),
|
|
"fname": info["fname"],
|
|
})
|
|
line = (
|
|
"{uid:<8} Age: {age:<7} Last: {last} (File: {fname})".format(
|
|
uid=uid,
|
|
age=fmt_age(now_epoch, info["mtime"]),
|
|
last=fmt_last(info["mtime"]),
|
|
fname=info["fname"],
|
|
)
|
|
)
|
|
print(line)
|
|
log_message(LOG_FILE, ENABLE_LOGGING, line)
|
|
else:
|
|
print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS))
|
|
log_message(
|
|
LOG_FILE,
|
|
ENABLE_LOGGING,
|
|
"[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS),
|
|
)
|
|
|
|
# Update shared state for tray — status reflects watcher health, not unit ages
|
|
state["status"] = "running"
|
|
state["units"] = unit_list
|
|
state["last_scan"] = datetime.now()
|
|
state["last_error"] = None
|
|
|
|
# ---- API heartbeat to SFM ----
|
|
if cfg.get("API_ENABLED", False):
|
|
now_ts = time.time()
|
|
interval = int(cfg.get("API_INTERVAL_SECONDS", 300))
|
|
if now_ts - last_api_ts >= interval:
|
|
hb_payload = build_sfm_payload(latest, cfg)
|
|
hb_payload["version"] = VERSION
|
|
hb_payload["watcher_status"] = state.get("status", "unknown")
|
|
hb_payload["log_tail"] = _read_log_tail(cfg.get("LOG_FILE", ""), 25)
|
|
response = send_api_payload(hb_payload, cfg.get("API_URL", ""))
|
|
last_api_ts = now_ts
|
|
if response is not None:
|
|
state["api_status"] = "ok"
|
|
state["last_api"] = datetime.now()
|
|
if response.get("update_available"):
|
|
state["update_available"] = True
|
|
else:
|
|
state["api_status"] = "fail"
|
|
else:
|
|
state["api_status"] = "disabled"
|
|
|
|
except Exception as e:
|
|
err = "[loop-error] {}".format(e)
|
|
print(err)
|
|
log_message(LOG_FILE, ENABLE_LOGGING, err)
|
|
state["status"] = "error"
|
|
state["last_error"] = str(e)
|
|
|
|
# Interruptible sleep: wake immediately if stop_event fires
|
|
stop_event.wait(timeout=SCAN_INTERVAL)
|
|
|
|
|
|
# --------------- Main (standalone) ------------------
|
|
def main() -> None:
|
|
state = {}
|
|
stop_event = threading.Event()
|
|
try:
|
|
run_watcher(state, stop_event)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping...")
|
|
stop_event.set()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|