""" Thor Watcher — Series 4 Ingest Agent v0.2.0 Micromate (Series 4) ingest agent for Terra-View. Behavior: - Scans C:\THORDATA\\\*.MLG - For each UM####, finds the newest .MLG by timestamp in the filename - Posts JSON heartbeat payload to Terra-View backend - Tray-friendly: run_watcher(state, stop_event) for background thread use """ import os import re import sys import time import json import threading import urllib.request import urllib.error from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple from socket import gethostname # ── Version ─────────────────────────────────────────────────────────────────── VERSION = "0.2.0" # ── Config ──────────────────────────────────────────────────────────────────── def load_config(config_path: str) -> Dict[str, Any]: """ Load configuration from config.json. Merges with defaults so any missing key is always present. Raises on file-not-found or malformed JSON (caller handles). """ defaults: Dict[str, Any] = { "thordata_path": r"C:\THORDATA", "scan_interval": 60, "api_url": "", "api_timeout": 5, "api_interval": 300, "source_id": "", "source_type": "series4_watcher", "local_timezone": "America/New_York", "enable_logging": True, "log_file": os.path.join( os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\", "ThorWatcher", "agent_logs", "thor_watcher.log" ), "log_retention_days": 30, "update_source": "gitea", "update_url": "", "debug": False, } with open(config_path, "r", encoding="utf-8") as f: raw = json.load(f) return {**defaults, **raw} # ── Logging ─────────────────────────────────────────────────────────────────── def log_message(path: str, enabled: bool, msg: str) -> None: if not enabled: return try: d = os.path.dirname(path) or "." if not os.path.exists(d): os.makedirs(d) with open(path, "a", encoding="utf-8") as f: f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) except Exception: pass def _read_log_tail(log_file: str, n: int = 25) -> Optional[List[str]]: """Return the last n lines of the log file as a list, or None.""" if not log_file: return None try: with open(log_file, "r", errors="replace") as f: lines = f.readlines() return [line.rstrip("\n") for line in lines[-n:]] except Exception: return None # ── MLG filename parsing ────────────────────────────────────────────────────── # Matches: UM12345_20251204193042.MLG _MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE) def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]: """Parse UM####_YYYYMMDDHHMMSS.MLG -> (unit_id, timestamp) or None.""" m = _MLG_PATTERN.match(name) if not m: return None unit_id = m.group(1).upper() try: ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S") except ValueError: return None return unit_id, ts # ── THORDATA scanner ────────────────────────────────────────────────────────── def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]: """ Scan THORDATA folder structure: ///*.MLG Returns: { "UM12345": { "unit_id", "project", "last_call" (datetime naive local), "mlg_path" }, ... } """ unit_map: Dict[str, Dict[str, Any]] = {} if not os.path.isdir(root): return unit_map try: project_names = os.listdir(root) except OSError: return unit_map for project_name in project_names: project_path = os.path.join(root, project_name) if not os.path.isdir(project_path): continue try: unit_dirs = os.listdir(project_path) except OSError: continue for unit_name in unit_dirs: unit_path = os.path.join(project_path, unit_name) if not os.path.isdir(unit_path): continue try: files = os.listdir(unit_path) except OSError: continue for fname in files: if not fname.upper().endswith(".MLG"): continue parsed = parse_mlg_filename(fname) if not parsed: continue unit_id, ts = parsed full_path = os.path.join(unit_path, fname) current = unit_map.get(unit_id) if current is None or ts > current["last_call"]: unit_map[unit_id] = { "unit_id": unit_id, "project": project_name, "last_call": ts, "mlg_path": full_path, } return unit_map # ── API payload ─────────────────────────────────────────────────────────────── def build_api_payload(unit_map: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict: """Build the Terra-View JSON heartbeat payload.""" now_utc = datetime.now(timezone.utc) now_local = datetime.now() source_id = (cfg.get("source_id") or "").strip() or gethostname() # Resolve local timezone for MLG timestamp conversion try: from zoneinfo import ZoneInfo local_tz = ZoneInfo(cfg.get("local_timezone") or "America/New_York") except Exception: local_tz = None units = [] for unit_id, entry in unit_map.items(): last_call: datetime = entry["last_call"] age_seconds = max(0.0, (now_local - last_call).total_seconds()) age_minutes = int(age_seconds // 60) # MLG timestamps are local naive — convert to UTC for transmission try: if local_tz is not None: last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc) last_call_str = last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ") else: # Fallback: send as-is with Z and accept the inaccuracy last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ") except Exception: last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ") units.append({ "unit_id": unit_id, "last_call": last_call_str, "age_minutes": age_minutes, "mlg_path": entry["mlg_path"], "project_hint": entry["project"], }) return { "source_id": source_id, "source_type": cfg.get("source_type", "series4_watcher"), "version": VERSION, "generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"), "units": units, } def send_api_payload(payload: dict, api_url: str, timeout: int) -> Optional[dict]: """POST payload to Terra-View. Returns parsed JSON response or None on failure.""" if not api_url: return None data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( api_url, data=data, headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: print("[API] POST success: {}".format(resp.status)) try: return json.loads(resp.read().decode("utf-8")) except Exception: return {} except urllib.error.URLError as e: print("[API] POST failed: {}".format(e)) return None except Exception as e: print("[API] Unexpected error: {}".format(e)) return None # ── Watcher loop (tray-friendly) ────────────────────────────────────────────── def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: """ Main watcher loop. Runs in a background thread when launched from the tray. state keys written each cycle: state["status"] — "running" | "error" | "starting" state["api_status"] — "ok" | "fail" | "disabled" state["units"] — list of unit dicts for tray display state["last_scan"] — datetime of last successful scan state["last_error"] — last error string or None state["log_dir"] — directory containing the log file state["cfg"] — loaded config dict state["update_available"] — set True when API response signals an update """ # Resolve config path if getattr(sys, "frozen", False): _appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "" config_dir = os.path.join(_appdata, "ThorWatcher") else: config_dir = os.path.dirname(os.path.abspath(__file__)) or "." config_path = os.path.join(config_dir, "config.json") state["status"] = "starting" state["units"] = [] state["last_scan"] = None state["last_error"] = None state["log_dir"] = None state["cfg"] = {} state["update_available"] = False try: cfg = load_config(config_path) except Exception as e: state["status"] = "error" state["last_error"] = "Config load failed: {}".format(e) return state["cfg"] = cfg log_file = cfg["log_file"] state["log_dir"] = os.path.dirname(log_file) or config_dir THORDATA_PATH = cfg["thordata_path"] SCAN_INTERVAL = int(cfg["scan_interval"]) API_URL = cfg["api_url"] API_TIMEOUT = int(cfg["api_timeout"]) API_INTERVAL = int(cfg["api_interval"]) ENABLE_LOGGING = bool(cfg["enable_logging"]) log_message(log_file, ENABLE_LOGGING, "[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format( THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL) ) ) print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format( THORDATA_PATH, SCAN_INTERVAL, bool(API_URL) )) last_api_ts = 0.0 while not stop_event.is_set(): try: now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print("-" * 80) print("Heartbeat @ {}".format(now_str)) print("-" * 80) unit_map = scan_thordata(THORDATA_PATH) now_local = datetime.now() unit_list = [] for uid in sorted(unit_map.keys()): entry = unit_map[uid] last_call = entry["last_call"] age_seconds = max(0.0, (now_local - last_call).total_seconds()) age_minutes = int(age_seconds // 60) unit_list.append({ "uid": uid, "age_minutes": age_minutes, "last_call": last_call.strftime("%Y-%m-%d %H:%M:%S"), "mlg_path": entry["mlg_path"], "project": entry["project"], }) line = "{uid:<8} Age: {h}h {m}m Last: {last} Project: {proj}".format( uid=uid, h=age_minutes // 60, m=age_minutes % 60, last=last_call.strftime("%Y-%m-%d %H:%M:%S"), proj=entry["project"], ) print(line) log_message(log_file, ENABLE_LOGGING, line) if not unit_list: msg = "[info] No Micromate units found in THORDATA" print(msg) log_message(log_file, ENABLE_LOGGING, msg) state["status"] = "running" state["units"] = unit_list state["last_scan"] = datetime.now() state["last_error"] = None # ── API heartbeat ────────────────────────────────────────────────── if API_URL: now_ts = time.time() if now_ts - last_api_ts >= API_INTERVAL: payload = build_api_payload(unit_map, cfg) payload["log_tail"] = _read_log_tail(log_file, 25) response = send_api_payload(payload, API_URL, API_TIMEOUT) last_api_ts = now_ts if response is not None: state["api_status"] = "ok" if response.get("update_available"): state["update_available"] = True else: state["api_status"] = "fail" else: state["api_status"] = "disabled" except Exception as e: err = "[loop-error] {}".format(e) print(err) log_message(log_file, ENABLE_LOGGING, err) state["status"] = "error" state["last_error"] = str(e) stop_event.wait(timeout=SCAN_INTERVAL) # ── Standalone entry point ──────────────────────────────────────────────────── def main() -> None: state: Dict[str, Any] = {} stop_event = threading.Event() try: run_watcher(state, stop_event) except KeyboardInterrupt: print("\nStopping...") stop_event.set() if __name__ == "__main__": main()