Feat: full installation wizard/execuctable builder batch file created (to match series3-watcher.

fix: timezone issue. now sends utc.
This commit is contained in:
serversdwn
2026-03-20 17:34:51 -04:00
parent de6fd1e437
commit 81fca88475
7 changed files with 1424 additions and 331 deletions

View File

@@ -1,145 +1,130 @@
"""
Series 4 Ingest Agent v0.1.2
Thor Watcher — Series 4 Ingest Agent v0.2.0
Micromate (Series 4) ingest agent for Seismo Fleet Manager (SFM).
Micromate (Series 4) ingest agent for Terra-View.
Behavior:
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
- For each UM####, finds the newest .MLG by timestamp in the filename
- Computes "age" from last_call -> now
- Classifies status as OK / LATE / STALE
- Prints a console heartbeat
- (Optional) Posts JSON payload to SFM backend
No roster. SFM backend decides what to do with each unit.
- Posts JSON heartbeat payload to Terra-View backend
- Tray-friendly: run_watcher(state, stop_event) for background thread use
"""
import os
import re
import sys
import time
import json
import sys
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, Optional, Tuple
from zoneinfo import ZoneInfo
import threading
import urllib.request
import urllib.error
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from socket import gethostname
try:
# urllib is in stdlib; used instead of requests for portability
import urllib.request
import urllib.error
except ImportError:
urllib = None # type: ignore
# ---------------- Config ----------------
# ── Version ───────────────────────────────────────────────────────────────────
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
VERSION = "0.2.0"
# ── Config ────────────────────────────────────────────────────────────────────
def load_config(config_path: str) -> Dict[str, Any]:
"""
Load configuration from a JSON file.
Falls back to defaults if file doesn't exist or has errors.
Load configuration from config.json.
Merges with defaults so any missing key is always present.
Raises on file-not-found or malformed JSON (caller handles).
"""
defaults = {
"thordata_path": r"C:\THORDATA",
"scan_interval": 60,
"late_days": 2,
"stale_days": 60,
"sfm_endpoint": "",
"sfm_timeout": 5,
"debug": True
defaults: Dict[str, Any] = {
"thordata_path": r"C:\THORDATA",
"scan_interval": 60,
"api_url": "",
"api_timeout": 5,
"api_interval": 300,
"source_id": "",
"source_type": "series4_watcher",
"local_timezone": "America/New_York",
"enable_logging": True,
"log_file": os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
"ThorWatcher", "agent_logs", "thor_watcher.log"
),
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
"debug": False,
}
# Try to find config file relative to script location
script_dir = os.path.dirname(os.path.abspath(__file__))
full_config_path = os.path.join(script_dir, config_path)
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
if not os.path.exists(full_config_path):
print(f"[WARN] Config file not found at {full_config_path}, using defaults", file=sys.stderr)
return defaults
return {**defaults, **raw}
# ── Logging ───────────────────────────────────────────────────────────────────
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
with open(full_config_path, 'r') as f:
config = json.load(f)
# Merge with defaults to ensure all keys exist
return {**defaults, **config}
except json.JSONDecodeError as e:
print(f"[WARN] Invalid JSON in config file: {e}, using defaults", file=sys.stderr)
return defaults
except Exception as e:
print(f"[WARN] Error loading config file: {e}, using defaults", file=sys.stderr)
return defaults
# Load configuration
config = load_config()
THORDATA_PATH = config["thordata_path"]
SCAN_INTERVAL = config["scan_interval"]
LATE_DAYS = config["late_days"]
STALE_DAYS = config["stale_days"]
SFM_ENDPOINT = config["sfm_endpoint"]
SFM_TIMEOUT = config["sfm_timeout"]
DEBUG = config["debug"]
# Regex: UM12345_YYYYMMDDHHMMSS.MLG
MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
pass
# ---------------- Helpers ----------------
def _read_log_tail(log_file: str, n: int = 25) -> Optional[List[str]]:
"""Return the last n lines of the log file as a list, or None."""
if not log_file:
return None
try:
with open(log_file, "r", errors="replace") as f:
lines = f.readlines()
return [line.rstrip("\n") for line in lines[-n:]]
except Exception:
return None
def debug(msg: str) -> None:
if DEBUG:
print(f"[DEBUG] {msg}", file=sys.stderr, flush=True)
# ── MLG filename parsing ──────────────────────────────────────────────────────
# Matches: UM12345_20251204193042.MLG
_MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
"""
Parse a Micromate MLG filename of the form:
UM12345_20251204193042.MLG
Returns:
(unit_id, timestamp) or None if pattern doesn't match.
"""
m = MLG_PATTERN.match(name)
"""Parse UM####_YYYYMMDDHHMMSS.MLG -> (unit_id, timestamp) or None."""
m = _MLG_PATTERN.match(name)
if not m:
return None
unit_id_raw = m.group(1) # e.g. "UM12345"
ts_str = m.group(2) # "YYYYMMDDHHMMSS"
unit_id = m.group(1).upper()
try:
ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S")
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError:
return None
# Normalize unit_id to uppercase for consistency
return unit_id_raw.upper(), ts
return unit_id, ts
# ── THORDATA scanner ──────────────────────────────────────────────────────────
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
"""
Scan THORDATA folder for Micromate MLG files.
Expected structure:
C:\THORDATA\<Project>\<UM####>\*.MLG
Scan THORDATA folder structure: <root>/<Project>/<UM####>/*.MLG
Returns:
unit_map: {
"UM12345": {
"unit_id": "UM12345",
"project": "Clearwater - ECMS 57940",
"last_call": datetime(...),
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG"
},
...
}
{ "UM12345": { "unit_id", "project", "last_call" (datetime naive local), "mlg_path" }, ... }
"""
unit_map: Dict[str, Dict[str, Any]] = {}
if not os.path.isdir(root):
debug(f"THORDATA_PATH does not exist or is not a directory: {root}")
return unit_map
try:
project_names = os.listdir(root)
except OSError as e:
debug(f"Failed to list THORDATA root '{root}': {e}")
except OSError:
return unit_map
for project_name in project_names:
@@ -147,11 +132,9 @@ def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
if not os.path.isdir(project_path):
continue
# Each project contains UM#### subfolders
try:
unit_dirs = os.listdir(project_path)
except OSError as e:
debug(f"Failed to list project '{project_path}': {e}")
except OSError:
continue
for unit_name in unit_dirs:
@@ -159,280 +142,246 @@ def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
if not os.path.isdir(unit_path):
continue
# We expect folder names like "UM12345"
# but we'll parse filenames anyway, so we don't rely on folder naming.
try:
files = os.listdir(unit_path)
except OSError as e:
debug(f"Failed to list unit folder '{unit_path}': {e}")
except OSError:
continue
for fname in files:
if not fname.upper().endswith(".MLG"):
continue
parsed = parse_mlg_filename(fname)
if not parsed:
continue
unit_id, ts = parsed
full_path = os.path.join(unit_path, fname)
current = unit_map.get(unit_id)
if current is None or ts > current["last_call"]:
unit_map[unit_id] = {
"unit_id": unit_id,
"project": project_name,
"unit_id": unit_id,
"project": project_name,
"last_call": ts,
"mlg_path": full_path,
"mlg_path": full_path,
}
return unit_map
def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]:
"""
Determine status (OK / LATE / STALE) based on age in days.
# ── API payload ───────────────────────────────────────────────────────────────
Returns:
(status, age_days)
"""
if now is None:
now = datetime.now()
age = now - last_call
# Protect against clocks being off; don't go negative.
if age.total_seconds() < 0:
age = timedelta(seconds=0)
age_days = age.total_seconds() / 86400.0
if age_days < LATE_DAYS:
status = "OK"
elif age_days < STALE_DAYS:
status = "LATE"
else:
status = "STALE"
return status, age_days
def format_age(td: timedelta) -> str:
"""
Format a timedelta into a human-readable age string.
Examples:
1d 2h
3h 15m
42m
"""
total_seconds = int(td.total_seconds())
if total_seconds < 0:
total_seconds = 0
days, rem = divmod(total_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if days == 0 and minutes > 0:
parts.append(f"{minutes}m") # only show minutes if < 1d
if not parts:
return "0m"
return " ".join(parts)
def clear_console() -> None:
"""Clear the console screen (Windows / *nix)."""
if os.name == "nt":
os.system("cls")
else:
os.system("clear")
def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None:
"""
Print a console heartbeat table of all units.
Example:
UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940
"""
now = datetime.now()
clear_console()
print("Series 4 Ingest Agent — Micromate Heartbeat (v0.1.2)")
print(f"THORDATA root: {THORDATA_PATH}")
print(f"Now: {now.strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 80)
if not unit_map:
print("No units found (no .MLG files detected).")
return
# Sort by unit_id for stable output
for unit_id in sorted(unit_map.keys()):
entry = unit_map[unit_id]
last_call = entry["last_call"]
project = entry["project"]
age_td = now - last_call
status, _age_days = determine_status(last_call, now)
age_str = format_age(age_td)
last_str = last_call.strftime("%Y-%m-%d %H:%M:%S")
print(
f"{unit_id:<8} {status:<6} Age: {age_str:<8} "
f"Last: {last_str} Project: {project}"
)
print("-" * 80)
print(f"Total units: {len(unit_map)}")
print(f"Next scan in {SCAN_INTERVAL} seconds...")
sys.stdout.flush()
def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""
Build a JSON-serializable payload for SFM backend.
All timestamps are converted to UTC for transmission (standard practice).
Terra-View stores UTC and converts to local time for display.
Structure (example):
{
"source": "series4_ingest",
"generated_at": "2025-12-04T20:01:00Z",
"units": [
{
"unit_id": "UM11719",
"type": "micromate",
"project_hint": "Clearwater - ECMS 57940",
"last_call": "2025-12-05T00:30:42Z",
"status": "OK",
"age_days": 0.04,
"age_hours": 0.9,
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG"
},
...
]
}
"""
now_local = datetime.now()
def build_api_payload(unit_map: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""Build the Terra-View JSON heartbeat payload."""
now_utc = datetime.now(timezone.utc)
local_tz = ZoneInfo("America/New_York")
payload_units = []
now_local = datetime.now()
source_id = (cfg.get("source_id") or "").strip() or gethostname()
# Resolve local timezone for MLG timestamp conversion
try:
from zoneinfo import ZoneInfo
local_tz = ZoneInfo(cfg.get("local_timezone") or "America/New_York")
except Exception:
local_tz = None
units = []
for unit_id, entry in unit_map.items():
last_call: datetime = entry["last_call"]
project = entry["project"]
mlg_path = entry["mlg_path"]
age_seconds = max(0.0, (now_local - last_call).total_seconds())
age_minutes = int(age_seconds // 60)
# Use local time for status calculation (age comparison)
status, age_days = determine_status(last_call, now_local)
age_hours = age_days * 24.0
# MLG timestamps are local naive — convert to UTC for transmission
try:
if local_tz is not None:
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc)
last_call_str = last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
# Fallback: send as-is with Z and accept the inaccuracy
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
# Convert last_call from local time to UTC for transmission
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc)
units.append({
"unit_id": unit_id,
"last_call": last_call_str,
"age_minutes": age_minutes,
"mlg_path": entry["mlg_path"],
"project_hint": entry["project"],
})
payload_units.append(
{
"unit_id": unit_id,
"type": "micromate",
"project_hint": project,
"last_call": last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"status": status,
"age_days": age_days,
"age_hours": age_hours,
"mlg_path": mlg_path,
}
)
payload = {
"source": "series4_ingest",
return {
"source_id": source_id,
"source_type": cfg.get("source_type", "series4_watcher"),
"version": VERSION,
"generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"units": payload_units,
"units": units,
}
return payload
def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None:
"""
Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured.
This is intentionally conservative:
- If SFM_ENDPOINT is empty -> do nothing
- If any error occurs -> print to stderr, but do not crash the agent
"""
if not SFM_ENDPOINT:
return
if urllib is None:
print(
"[WARN] urllib not available; cannot POST to SFM. "
"Install standard Python or disable SFM_ENDPOINT.",
file=sys.stderr,
)
return
payload = build_sfm_payload(unit_map)
def send_api_payload(payload: dict, api_url: str, timeout: int) -> Optional[dict]:
"""POST payload to Terra-View. Returns parsed JSON response or None on failure."""
if not api_url:
return None
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
SFM_ENDPOINT,
data=data,
api_url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
print("[API] POST success: {}".format(resp.status))
try:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return {}
except urllib.error.URLError as e:
print("[API] POST failed: {}".format(e))
return None
except Exception as e:
print("[API] Unexpected error: {}".format(e))
return None
# ── Watcher loop (tray-friendly) ──────────────────────────────────────────────
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
"""
Main watcher loop. Runs in a background thread when launched from the tray.
state keys written each cycle:
state["status"] — "running" | "error" | "starting"
state["api_status"] — "ok" | "fail" | "disabled"
state["units"] — list of unit dicts for tray display
state["last_scan"] — datetime of last successful scan
state["last_error"] — last error string or None
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
state["update_available"] — set True when API response signals an update
"""
# Resolve config path
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or ""
config_dir = os.path.join(_appdata, "ThorWatcher")
else:
config_dir = os.path.dirname(os.path.abspath(__file__)) or "."
config_path = os.path.join(config_dir, "config.json")
state["status"] = "starting"
state["units"] = []
state["last_scan"] = None
state["last_error"] = None
state["log_dir"] = None
state["cfg"] = {}
state["update_available"] = False
try:
with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp:
_ = resp.read() # we don't care about the body for now
debug(f"SFM POST OK: HTTP {resp.status}")
except urllib.error.URLError as e:
print(f"[WARN] Failed to POST to SFM: {e}", file=sys.stderr)
cfg = load_config(config_path)
except Exception as e:
print(f"[WARN] Unexpected error during SFM POST: {e}", file=sys.stderr)
state["status"] = "error"
state["last_error"] = "Config load failed: {}".format(e)
return
state["cfg"] = cfg
log_file = cfg["log_file"]
state["log_dir"] = os.path.dirname(log_file) or config_dir
THORDATA_PATH = cfg["thordata_path"]
SCAN_INTERVAL = int(cfg["scan_interval"])
API_URL = cfg["api_url"]
API_TIMEOUT = int(cfg["api_timeout"])
API_INTERVAL = int(cfg["api_interval"])
ENABLE_LOGGING = bool(cfg["enable_logging"])
log_message(log_file, ENABLE_LOGGING,
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
)
)
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
))
last_api_ts = 0.0
while not stop_event.is_set():
try:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("-" * 80)
print("Heartbeat @ {}".format(now_str))
print("-" * 80)
unit_map = scan_thordata(THORDATA_PATH)
now_local = datetime.now()
unit_list = []
for uid in sorted(unit_map.keys()):
entry = unit_map[uid]
last_call = entry["last_call"]
age_seconds = max(0.0, (now_local - last_call).total_seconds())
age_minutes = int(age_seconds // 60)
unit_list.append({
"uid": uid,
"age_minutes": age_minutes,
"last_call": last_call.strftime("%Y-%m-%d %H:%M:%S"),
"mlg_path": entry["mlg_path"],
"project": entry["project"],
})
line = "{uid:<8} Age: {h}h {m}m Last: {last} Project: {proj}".format(
uid=uid,
h=age_minutes // 60,
m=age_minutes % 60,
last=last_call.strftime("%Y-%m-%d %H:%M:%S"),
proj=entry["project"],
)
print(line)
log_message(log_file, ENABLE_LOGGING, line)
if not unit_list:
msg = "[info] No Micromate units found in THORDATA"
print(msg)
log_message(log_file, ENABLE_LOGGING, msg)
state["status"] = "running"
state["units"] = unit_list
state["last_scan"] = datetime.now()
state["last_error"] = None
# ── API heartbeat ──────────────────────────────────────────────────
if API_URL:
now_ts = time.time()
if now_ts - last_api_ts >= API_INTERVAL:
payload = build_api_payload(unit_map, cfg)
payload["log_tail"] = _read_log_tail(log_file, 25)
response = send_api_payload(payload, API_URL, API_TIMEOUT)
last_api_ts = now_ts
if response is not None:
state["api_status"] = "ok"
if response.get("update_available"):
state["update_available"] = True
else:
state["api_status"] = "fail"
else:
state["api_status"] = "disabled"
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(log_file, ENABLE_LOGGING, err)
state["status"] = "error"
state["last_error"] = str(e)
stop_event.wait(timeout=SCAN_INTERVAL)
# ── Standalone entry point ────────────────────────────────────────────────────
def main() -> None:
print("Starting Series 4 Ingest Agent (Micromate) v0.1.2")
print(f"THORDATA_PATH = {THORDATA_PATH}")
print(f"SCAN_INTERVAL = {SCAN_INTERVAL} seconds")
print(f"LATE_DAYS = {LATE_DAYS}, STALE_DAYS = {STALE_DAYS}")
if not os.path.isdir(THORDATA_PATH):
print(f"[WARN] THORDATA_PATH does not exist: {THORDATA_PATH}", file=sys.stderr)
loop_counter = 0
state: Dict[str, Any] = {}
stop_event = threading.Event()
try:
while True:
loop_counter += 1
print(f"\n[LOOP] Iteration {loop_counter} starting...", flush=True)
try:
unit_map = scan_thordata(THORDATA_PATH)
debug(f"scan_thordata found {len(unit_map)} units")
print_heartbeat(unit_map)
emit_sfm_payload(unit_map)
print("[LOOP] Iteration complete, entering sleep...", flush=True)
except Exception as e:
# Catch-all so a single error doesn't kill the loop
print(f"[ERROR] Exception in main loop: {e}", file=sys.stderr)
sys.stderr.flush()
# Sleep in 1-second chunks to avoid VM time drift weirdness
for i in range(SCAN_INTERVAL):
time.sleep(1)
print("[LOOP] Woke up for next scan", flush=True)
run_watcher(state, stop_event)
except KeyboardInterrupt:
print("\nSeries 4 Ingest Agent stopped by user.")
print("\nStopping...")
stop_event.set()
if __name__ == "__main__":