Files
thor-watcher/series4_ingest.py
2026-03-20 17:34:51 -04:00

389 lines
14 KiB
Python

"""
Thor Watcher — Series 4 Ingest Agent v0.2.0
Micromate (Series 4) ingest agent for Terra-View.
Behavior:
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
- For each UM####, finds the newest .MLG by timestamp in the filename
- Posts JSON heartbeat payload to Terra-View backend
- Tray-friendly: run_watcher(state, stop_event) for background thread use
"""
import os
import re
import sys
import time
import json
import threading
import urllib.request
import urllib.error
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from socket import gethostname
# ── Version ───────────────────────────────────────────────────────────────────
VERSION = "0.2.0"
# ── Config ────────────────────────────────────────────────────────────────────
def load_config(config_path: str) -> Dict[str, Any]:
"""
Load configuration from config.json.
Merges with defaults so any missing key is always present.
Raises on file-not-found or malformed JSON (caller handles).
"""
defaults: Dict[str, Any] = {
"thordata_path": r"C:\THORDATA",
"scan_interval": 60,
"api_url": "",
"api_timeout": 5,
"api_interval": 300,
"source_id": "",
"source_type": "series4_watcher",
"local_timezone": "America/New_York",
"enable_logging": True,
"log_file": os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
"ThorWatcher", "agent_logs", "thor_watcher.log"
),
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
"debug": False,
}
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
return {**defaults, **raw}
# ── Logging ───────────────────────────────────────────────────────────────────
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
pass
def _read_log_tail(log_file: str, n: int = 25) -> Optional[List[str]]:
"""Return the last n lines of the log file as a list, or None."""
if not log_file:
return None
try:
with open(log_file, "r", errors="replace") as f:
lines = f.readlines()
return [line.rstrip("\n") for line in lines[-n:]]
except Exception:
return None
# ── MLG filename parsing ──────────────────────────────────────────────────────
# Matches: UM12345_20251204193042.MLG
_MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
"""Parse UM####_YYYYMMDDHHMMSS.MLG -> (unit_id, timestamp) or None."""
m = _MLG_PATTERN.match(name)
if not m:
return None
unit_id = m.group(1).upper()
try:
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError:
return None
return unit_id, ts
# ── THORDATA scanner ──────────────────────────────────────────────────────────
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
"""
Scan THORDATA folder structure: <root>/<Project>/<UM####>/*.MLG
Returns:
{ "UM12345": { "unit_id", "project", "last_call" (datetime naive local), "mlg_path" }, ... }
"""
unit_map: Dict[str, Dict[str, Any]] = {}
if not os.path.isdir(root):
return unit_map
try:
project_names = os.listdir(root)
except OSError:
return unit_map
for project_name in project_names:
project_path = os.path.join(root, project_name)
if not os.path.isdir(project_path):
continue
try:
unit_dirs = os.listdir(project_path)
except OSError:
continue
for unit_name in unit_dirs:
unit_path = os.path.join(project_path, unit_name)
if not os.path.isdir(unit_path):
continue
try:
files = os.listdir(unit_path)
except OSError:
continue
for fname in files:
if not fname.upper().endswith(".MLG"):
continue
parsed = parse_mlg_filename(fname)
if not parsed:
continue
unit_id, ts = parsed
full_path = os.path.join(unit_path, fname)
current = unit_map.get(unit_id)
if current is None or ts > current["last_call"]:
unit_map[unit_id] = {
"unit_id": unit_id,
"project": project_name,
"last_call": ts,
"mlg_path": full_path,
}
return unit_map
# ── API payload ───────────────────────────────────────────────────────────────
def build_api_payload(unit_map: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""Build the Terra-View JSON heartbeat payload."""
now_utc = datetime.now(timezone.utc)
now_local = datetime.now()
source_id = (cfg.get("source_id") or "").strip() or gethostname()
# Resolve local timezone for MLG timestamp conversion
try:
from zoneinfo import ZoneInfo
local_tz = ZoneInfo(cfg.get("local_timezone") or "America/New_York")
except Exception:
local_tz = None
units = []
for unit_id, entry in unit_map.items():
last_call: datetime = entry["last_call"]
age_seconds = max(0.0, (now_local - last_call).total_seconds())
age_minutes = int(age_seconds // 60)
# MLG timestamps are local naive — convert to UTC for transmission
try:
if local_tz is not None:
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc)
last_call_str = last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
# Fallback: send as-is with Z and accept the inaccuracy
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
units.append({
"unit_id": unit_id,
"last_call": last_call_str,
"age_minutes": age_minutes,
"mlg_path": entry["mlg_path"],
"project_hint": entry["project"],
})
return {
"source_id": source_id,
"source_type": cfg.get("source_type", "series4_watcher"),
"version": VERSION,
"generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"units": units,
}
def send_api_payload(payload: dict, api_url: str, timeout: int) -> Optional[dict]:
"""POST payload to Terra-View. Returns parsed JSON response or None on failure."""
if not api_url:
return None
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
api_url, data=data,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
print("[API] POST success: {}".format(resp.status))
try:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return {}
except urllib.error.URLError as e:
print("[API] POST failed: {}".format(e))
return None
except Exception as e:
print("[API] Unexpected error: {}".format(e))
return None
# ── Watcher loop (tray-friendly) ──────────────────────────────────────────────
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
"""
Main watcher loop. Runs in a background thread when launched from the tray.
state keys written each cycle:
state["status"] — "running" | "error" | "starting"
state["api_status"] — "ok" | "fail" | "disabled"
state["units"] — list of unit dicts for tray display
state["last_scan"] — datetime of last successful scan
state["last_error"] — last error string or None
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
state["update_available"] — set True when API response signals an update
"""
# Resolve config path
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or ""
config_dir = os.path.join(_appdata, "ThorWatcher")
else:
config_dir = os.path.dirname(os.path.abspath(__file__)) or "."
config_path = os.path.join(config_dir, "config.json")
state["status"] = "starting"
state["units"] = []
state["last_scan"] = None
state["last_error"] = None
state["log_dir"] = None
state["cfg"] = {}
state["update_available"] = False
try:
cfg = load_config(config_path)
except Exception as e:
state["status"] = "error"
state["last_error"] = "Config load failed: {}".format(e)
return
state["cfg"] = cfg
log_file = cfg["log_file"]
state["log_dir"] = os.path.dirname(log_file) or config_dir
THORDATA_PATH = cfg["thordata_path"]
SCAN_INTERVAL = int(cfg["scan_interval"])
API_URL = cfg["api_url"]
API_TIMEOUT = int(cfg["api_timeout"])
API_INTERVAL = int(cfg["api_interval"])
ENABLE_LOGGING = bool(cfg["enable_logging"])
log_message(log_file, ENABLE_LOGGING,
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
)
)
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
))
last_api_ts = 0.0
while not stop_event.is_set():
try:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("-" * 80)
print("Heartbeat @ {}".format(now_str))
print("-" * 80)
unit_map = scan_thordata(THORDATA_PATH)
now_local = datetime.now()
unit_list = []
for uid in sorted(unit_map.keys()):
entry = unit_map[uid]
last_call = entry["last_call"]
age_seconds = max(0.0, (now_local - last_call).total_seconds())
age_minutes = int(age_seconds // 60)
unit_list.append({
"uid": uid,
"age_minutes": age_minutes,
"last_call": last_call.strftime("%Y-%m-%d %H:%M:%S"),
"mlg_path": entry["mlg_path"],
"project": entry["project"],
})
line = "{uid:<8} Age: {h}h {m}m Last: {last} Project: {proj}".format(
uid=uid,
h=age_minutes // 60,
m=age_minutes % 60,
last=last_call.strftime("%Y-%m-%d %H:%M:%S"),
proj=entry["project"],
)
print(line)
log_message(log_file, ENABLE_LOGGING, line)
if not unit_list:
msg = "[info] No Micromate units found in THORDATA"
print(msg)
log_message(log_file, ENABLE_LOGGING, msg)
state["status"] = "running"
state["units"] = unit_list
state["last_scan"] = datetime.now()
state["last_error"] = None
# ── API heartbeat ──────────────────────────────────────────────────
if API_URL:
now_ts = time.time()
if now_ts - last_api_ts >= API_INTERVAL:
payload = build_api_payload(unit_map, cfg)
payload["log_tail"] = _read_log_tail(log_file, 25)
response = send_api_payload(payload, API_URL, API_TIMEOUT)
last_api_ts = now_ts
if response is not None:
state["api_status"] = "ok"
if response.get("update_available"):
state["update_available"] = True
else:
state["api_status"] = "fail"
else:
state["api_status"] = "disabled"
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(log_file, ENABLE_LOGGING, err)
state["status"] = "error"
state["last_error"] = str(e)
stop_event.wait(timeout=SCAN_INTERVAL)
# ── Standalone entry point ────────────────────────────────────────────────────
def main() -> None:
state: Dict[str, Any] = {}
stop_event = threading.Event()
try:
run_watcher(state, stop_event)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if __name__ == "__main__":
main()