389 lines
14 KiB
Python
389 lines
14 KiB
Python
"""
|
|
Thor Watcher — Series 4 Ingest Agent v0.2.0
|
|
|
|
Micromate (Series 4) ingest agent for Terra-View.
|
|
|
|
Behavior:
|
|
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
|
|
- For each UM####, finds the newest .MLG by timestamp in the filename
|
|
- Posts JSON heartbeat payload to Terra-View backend
|
|
- Tray-friendly: run_watcher(state, stop_event) for background thread use
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import json
|
|
import threading
|
|
import urllib.request
|
|
import urllib.error
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from socket import gethostname
|
|
|
|
|
|
# ── Version ───────────────────────────────────────────────────────────────────
|
|
|
|
VERSION = "0.2.0"
|
|
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
def load_config(config_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Load configuration from config.json.
|
|
Merges with defaults so any missing key is always present.
|
|
Raises on file-not-found or malformed JSON (caller handles).
|
|
"""
|
|
defaults: Dict[str, Any] = {
|
|
"thordata_path": r"C:\THORDATA",
|
|
"scan_interval": 60,
|
|
"api_url": "",
|
|
"api_timeout": 5,
|
|
"api_interval": 300,
|
|
"source_id": "",
|
|
"source_type": "series4_watcher",
|
|
"local_timezone": "America/New_York",
|
|
"enable_logging": True,
|
|
"log_file": os.path.join(
|
|
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
|
|
"ThorWatcher", "agent_logs", "thor_watcher.log"
|
|
),
|
|
"log_retention_days": 30,
|
|
"update_source": "gitea",
|
|
"update_url": "",
|
|
"debug": False,
|
|
}
|
|
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
raw = json.load(f)
|
|
|
|
return {**defaults, **raw}
|
|
|
|
|
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
|
|
|
def log_message(path: str, enabled: bool, msg: str) -> None:
|
|
if not enabled:
|
|
return
|
|
try:
|
|
d = os.path.dirname(path) or "."
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
with open(path, "a", encoding="utf-8") as f:
|
|
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _read_log_tail(log_file: str, n: int = 25) -> Optional[List[str]]:
|
|
"""Return the last n lines of the log file as a list, or None."""
|
|
if not log_file:
|
|
return None
|
|
try:
|
|
with open(log_file, "r", errors="replace") as f:
|
|
lines = f.readlines()
|
|
return [line.rstrip("\n") for line in lines[-n:]]
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ── MLG filename parsing ──────────────────────────────────────────────────────
|
|
|
|
# Matches: UM12345_20251204193042.MLG
|
|
_MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
|
|
|
|
|
|
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
|
|
"""Parse UM####_YYYYMMDDHHMMSS.MLG -> (unit_id, timestamp) or None."""
|
|
m = _MLG_PATTERN.match(name)
|
|
if not m:
|
|
return None
|
|
unit_id = m.group(1).upper()
|
|
try:
|
|
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
|
|
except ValueError:
|
|
return None
|
|
return unit_id, ts
|
|
|
|
|
|
# ── THORDATA scanner ──────────────────────────────────────────────────────────
|
|
|
|
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Scan THORDATA folder structure: <root>/<Project>/<UM####>/*.MLG
|
|
|
|
Returns:
|
|
{ "UM12345": { "unit_id", "project", "last_call" (datetime naive local), "mlg_path" }, ... }
|
|
"""
|
|
unit_map: Dict[str, Dict[str, Any]] = {}
|
|
|
|
if not os.path.isdir(root):
|
|
return unit_map
|
|
|
|
try:
|
|
project_names = os.listdir(root)
|
|
except OSError:
|
|
return unit_map
|
|
|
|
for project_name in project_names:
|
|
project_path = os.path.join(root, project_name)
|
|
if not os.path.isdir(project_path):
|
|
continue
|
|
|
|
try:
|
|
unit_dirs = os.listdir(project_path)
|
|
except OSError:
|
|
continue
|
|
|
|
for unit_name in unit_dirs:
|
|
unit_path = os.path.join(project_path, unit_name)
|
|
if not os.path.isdir(unit_path):
|
|
continue
|
|
|
|
try:
|
|
files = os.listdir(unit_path)
|
|
except OSError:
|
|
continue
|
|
|
|
for fname in files:
|
|
if not fname.upper().endswith(".MLG"):
|
|
continue
|
|
parsed = parse_mlg_filename(fname)
|
|
if not parsed:
|
|
continue
|
|
unit_id, ts = parsed
|
|
full_path = os.path.join(unit_path, fname)
|
|
current = unit_map.get(unit_id)
|
|
if current is None or ts > current["last_call"]:
|
|
unit_map[unit_id] = {
|
|
"unit_id": unit_id,
|
|
"project": project_name,
|
|
"last_call": ts,
|
|
"mlg_path": full_path,
|
|
}
|
|
|
|
return unit_map
|
|
|
|
|
|
# ── API payload ───────────────────────────────────────────────────────────────
|
|
|
|
def build_api_payload(unit_map: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
|
|
"""Build the Terra-View JSON heartbeat payload."""
|
|
now_utc = datetime.now(timezone.utc)
|
|
now_local = datetime.now()
|
|
|
|
source_id = (cfg.get("source_id") or "").strip() or gethostname()
|
|
|
|
# Resolve local timezone for MLG timestamp conversion
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
local_tz = ZoneInfo(cfg.get("local_timezone") or "America/New_York")
|
|
except Exception:
|
|
local_tz = None
|
|
|
|
units = []
|
|
for unit_id, entry in unit_map.items():
|
|
last_call: datetime = entry["last_call"]
|
|
age_seconds = max(0.0, (now_local - last_call).total_seconds())
|
|
age_minutes = int(age_seconds // 60)
|
|
|
|
# MLG timestamps are local naive — convert to UTC for transmission
|
|
try:
|
|
if local_tz is not None:
|
|
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc)
|
|
last_call_str = last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
# Fallback: send as-is with Z and accept the inaccuracy
|
|
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
except Exception:
|
|
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
units.append({
|
|
"unit_id": unit_id,
|
|
"last_call": last_call_str,
|
|
"age_minutes": age_minutes,
|
|
"mlg_path": entry["mlg_path"],
|
|
"project_hint": entry["project"],
|
|
})
|
|
|
|
return {
|
|
"source_id": source_id,
|
|
"source_type": cfg.get("source_type", "series4_watcher"),
|
|
"version": VERSION,
|
|
"generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"units": units,
|
|
}
|
|
|
|
|
|
def send_api_payload(payload: dict, api_url: str, timeout: int) -> Optional[dict]:
|
|
"""POST payload to Terra-View. Returns parsed JSON response or None on failure."""
|
|
if not api_url:
|
|
return None
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
api_url, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
print("[API] POST success: {}".format(resp.status))
|
|
try:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except Exception:
|
|
return {}
|
|
except urllib.error.URLError as e:
|
|
print("[API] POST failed: {}".format(e))
|
|
return None
|
|
except Exception as e:
|
|
print("[API] Unexpected error: {}".format(e))
|
|
return None
|
|
|
|
|
|
# ── Watcher loop (tray-friendly) ──────────────────────────────────────────────
|
|
|
|
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
|
"""
|
|
Main watcher loop. Runs in a background thread when launched from the tray.
|
|
|
|
state keys written each cycle:
|
|
state["status"] — "running" | "error" | "starting"
|
|
state["api_status"] — "ok" | "fail" | "disabled"
|
|
state["units"] — list of unit dicts for tray display
|
|
state["last_scan"] — datetime of last successful scan
|
|
state["last_error"] — last error string or None
|
|
state["log_dir"] — directory containing the log file
|
|
state["cfg"] — loaded config dict
|
|
state["update_available"] — set True when API response signals an update
|
|
"""
|
|
# Resolve config path
|
|
if getattr(sys, "frozen", False):
|
|
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or ""
|
|
config_dir = os.path.join(_appdata, "ThorWatcher")
|
|
else:
|
|
config_dir = os.path.dirname(os.path.abspath(__file__)) or "."
|
|
config_path = os.path.join(config_dir, "config.json")
|
|
|
|
state["status"] = "starting"
|
|
state["units"] = []
|
|
state["last_scan"] = None
|
|
state["last_error"] = None
|
|
state["log_dir"] = None
|
|
state["cfg"] = {}
|
|
state["update_available"] = False
|
|
|
|
try:
|
|
cfg = load_config(config_path)
|
|
except Exception as e:
|
|
state["status"] = "error"
|
|
state["last_error"] = "Config load failed: {}".format(e)
|
|
return
|
|
|
|
state["cfg"] = cfg
|
|
log_file = cfg["log_file"]
|
|
state["log_dir"] = os.path.dirname(log_file) or config_dir
|
|
|
|
THORDATA_PATH = cfg["thordata_path"]
|
|
SCAN_INTERVAL = int(cfg["scan_interval"])
|
|
API_URL = cfg["api_url"]
|
|
API_TIMEOUT = int(cfg["api_timeout"])
|
|
API_INTERVAL = int(cfg["api_interval"])
|
|
ENABLE_LOGGING = bool(cfg["enable_logging"])
|
|
|
|
log_message(log_file, ENABLE_LOGGING,
|
|
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
|
|
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
|
|
)
|
|
)
|
|
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
|
|
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
|
|
))
|
|
|
|
last_api_ts = 0.0
|
|
|
|
while not stop_event.is_set():
|
|
try:
|
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
print("-" * 80)
|
|
print("Heartbeat @ {}".format(now_str))
|
|
print("-" * 80)
|
|
|
|
unit_map = scan_thordata(THORDATA_PATH)
|
|
now_local = datetime.now()
|
|
|
|
unit_list = []
|
|
for uid in sorted(unit_map.keys()):
|
|
entry = unit_map[uid]
|
|
last_call = entry["last_call"]
|
|
age_seconds = max(0.0, (now_local - last_call).total_seconds())
|
|
age_minutes = int(age_seconds // 60)
|
|
unit_list.append({
|
|
"uid": uid,
|
|
"age_minutes": age_minutes,
|
|
"last_call": last_call.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"mlg_path": entry["mlg_path"],
|
|
"project": entry["project"],
|
|
})
|
|
line = "{uid:<8} Age: {h}h {m}m Last: {last} Project: {proj}".format(
|
|
uid=uid,
|
|
h=age_minutes // 60,
|
|
m=age_minutes % 60,
|
|
last=last_call.strftime("%Y-%m-%d %H:%M:%S"),
|
|
proj=entry["project"],
|
|
)
|
|
print(line)
|
|
log_message(log_file, ENABLE_LOGGING, line)
|
|
|
|
if not unit_list:
|
|
msg = "[info] No Micromate units found in THORDATA"
|
|
print(msg)
|
|
log_message(log_file, ENABLE_LOGGING, msg)
|
|
|
|
state["status"] = "running"
|
|
state["units"] = unit_list
|
|
state["last_scan"] = datetime.now()
|
|
state["last_error"] = None
|
|
|
|
# ── API heartbeat ──────────────────────────────────────────────────
|
|
if API_URL:
|
|
now_ts = time.time()
|
|
if now_ts - last_api_ts >= API_INTERVAL:
|
|
payload = build_api_payload(unit_map, cfg)
|
|
payload["log_tail"] = _read_log_tail(log_file, 25)
|
|
response = send_api_payload(payload, API_URL, API_TIMEOUT)
|
|
last_api_ts = now_ts
|
|
if response is not None:
|
|
state["api_status"] = "ok"
|
|
if response.get("update_available"):
|
|
state["update_available"] = True
|
|
else:
|
|
state["api_status"] = "fail"
|
|
else:
|
|
state["api_status"] = "disabled"
|
|
|
|
except Exception as e:
|
|
err = "[loop-error] {}".format(e)
|
|
print(err)
|
|
log_message(log_file, ENABLE_LOGGING, err)
|
|
state["status"] = "error"
|
|
state["last_error"] = str(e)
|
|
|
|
stop_event.wait(timeout=SCAN_INTERVAL)
|
|
|
|
|
|
# ── Standalone entry point ────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
state: Dict[str, Any] = {}
|
|
stop_event = threading.Event()
|
|
try:
|
|
run_watcher(state, stop_event)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping...")
|
|
stop_event.set()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|