""" Series 4 Ingest Agent — v0.1.3 Micromate (Series 4) ingest agent for Seismo Fleet Manager (SFM). Behavior: - Scans C:\THORDATA\\\*.MLG - For each UM####, finds the newest .MLG by timestamp in the filename - Computes "age" from last_call -> now - Classifies status as OK / LATE / STALE - Prints a console heartbeat - (Optional) Posts JSON payload to SFM backend No roster. SFM backend decides what to do with each unit. """ import argparse import logging from logging.handlers import RotatingFileHandler import os import re import time import json import sys from datetime import datetime, timedelta, timezone from typing import Dict, Any, Optional, Tuple from zoneinfo import ZoneInfo try: # urllib is in stdlib; used instead of requests for portability import urllib.request import urllib.error except ImportError: urllib = None # type: ignore __version__ = "0.1.3" # ---------------- Config ---------------- DEFAULT_CONFIG_PATH = r"C:\ProgramData\ThorIngest\config.json" DEFAULT_LOG_DIR = r"C:\ProgramData\ThorIngest\logs" def load_config(config_path: Optional[str] = None) -> Dict[str, Any]: """ Load configuration from a JSON file. Falls back to defaults if file doesn't exist or has errors. """ defaults = { "thordata_path": r"C:\THORDATA", "scan_interval": 60, "late_days": 2, "stale_days": 60, "sfm_endpoint": "", "sfm_timeout": 5, "debug": True } # Resolve config path order: # 1) explicit config_path (if provided) # 2) ProgramData default # 3) local directory config.json (script dir) script_dir = os.path.dirname(os.path.abspath(__file__)) candidates = [] if config_path: candidates.append(config_path) candidates.append(DEFAULT_CONFIG_PATH) candidates.append(os.path.join(script_dir, "config.json")) full_config_path = None for path in candidates: if os.path.exists(path): full_config_path = path break if full_config_path is None: print( f"[WARN] Config file not found in: {', '.join(candidates)}, using defaults", file=sys.stderr ) return defaults try: with open(full_config_path, 'r') as f: config = json.load(f) # Merge with defaults to ensure all keys exist return {**defaults, **config} except json.JSONDecodeError as e: print(f"[WARN] Invalid JSON in config file: {e}, using defaults", file=sys.stderr) return defaults except Exception as e: print(f"[WARN] Error loading config file: {e}, using defaults", file=sys.stderr) return defaults THORDATA_PATH = r"C:\THORDATA" SCAN_INTERVAL = 60 LATE_DAYS = 2 STALE_DAYS = 60 SFM_ENDPOINT = "" SFM_TIMEOUT = 5 DEBUG = True logger = logging.getLogger("thor_ingest") # Regex: UM12345_YYYYMMDDHHMMSS.MLG MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE) # ---------------- Helpers ---------------- def debug(msg: str) -> None: if DEBUG: logger.debug(msg) def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]: """ Parse a Micromate MLG filename of the form: UM12345_20251204193042.MLG Returns: (unit_id, timestamp) or None if pattern doesn't match. """ m = MLG_PATTERN.match(name) if not m: return None unit_id_raw = m.group(1) # e.g. "UM12345" ts_str = m.group(2) # "YYYYMMDDHHMMSS" try: ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S") except ValueError: return None # Normalize unit_id to uppercase for consistency return unit_id_raw.upper(), ts def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]: """ Scan THORDATA folder for Micromate MLG files. Expected structure: C:\THORDATA\\\*.MLG Returns: unit_map: { "UM12345": { "unit_id": "UM12345", "project": "Clearwater - ECMS 57940", "last_call": datetime(...), "mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG" }, ... } """ unit_map: Dict[str, Dict[str, Any]] = {} if not os.path.isdir(root): debug(f"THORDATA_PATH does not exist or is not a directory: {root}") return unit_map try: project_names = os.listdir(root) except OSError as e: debug(f"Failed to list THORDATA root '{root}': {e}") return unit_map for project_name in project_names: project_path = os.path.join(root, project_name) if not os.path.isdir(project_path): continue # Each project contains UM#### subfolders try: unit_dirs = os.listdir(project_path) except OSError as e: debug(f"Failed to list project '{project_path}': {e}") continue for unit_name in unit_dirs: unit_path = os.path.join(project_path, unit_name) if not os.path.isdir(unit_path): continue # We expect folder names like "UM12345" # but we'll parse filenames anyway, so we don't rely on folder naming. try: files = os.listdir(unit_path) except OSError as e: debug(f"Failed to list unit folder '{unit_path}': {e}") continue for fname in files: if not fname.upper().endswith(".MLG"): continue parsed = parse_mlg_filename(fname) if not parsed: continue unit_id, ts = parsed full_path = os.path.join(unit_path, fname) current = unit_map.get(unit_id) if current is None or ts > current["last_call"]: unit_map[unit_id] = { "unit_id": unit_id, "project": project_name, "last_call": ts, "mlg_path": full_path, } return unit_map def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]: """ Determine status (OK / LATE / STALE) based on age in days. Returns: (status, age_days) """ if now is None: now = datetime.now() age = now - last_call # Protect against clocks being off; don't go negative. if age.total_seconds() < 0: age = timedelta(seconds=0) age_days = age.total_seconds() / 86400.0 if age_days < LATE_DAYS: status = "OK" elif age_days < STALE_DAYS: status = "LATE" else: status = "STALE" return status, age_days def format_age(td: timedelta) -> str: """ Format a timedelta into a human-readable age string. Examples: 1d 2h 3h 15m 42m """ total_seconds = int(td.total_seconds()) if total_seconds < 0: total_seconds = 0 days, rem = divmod(total_seconds, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) parts = [] if days > 0: parts.append(f"{days}d") if hours > 0: parts.append(f"{hours}h") if days == 0 and minutes > 0: parts.append(f"{minutes}m") # only show minutes if < 1d if not parts: return "0m" return " ".join(parts) def clear_console() -> None: """Clear the console screen (Windows / *nix).""" if not sys.stdout.isatty(): return if os.name == "nt": os.system("cls") else: os.system("clear") def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None: """ Print a console heartbeat table of all units. Example: UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940 """ now = datetime.now() clear_console() logger.info("Series 4 Ingest Agent — Micromate Heartbeat (v%s)", __version__) logger.info("THORDATA root: %s", THORDATA_PATH) logger.info("Now: %s", now.strftime('%Y-%m-%d %H:%M:%S')) logger.info("-" * 80) if not unit_map: logger.info("No units found (no .MLG files detected).") return # Sort by unit_id for stable output for unit_id in sorted(unit_map.keys()): entry = unit_map[unit_id] last_call = entry["last_call"] project = entry["project"] age_td = now - last_call status, _age_days = determine_status(last_call, now) age_str = format_age(age_td) last_str = last_call.strftime("%Y-%m-%d %H:%M:%S") logger.info( "%-8s %-6s Age: %-8s Last: %s Project: %s", unit_id, status, age_str, last_str, project, ) logger.info("-" * 80) logger.info("Total units: %s", len(unit_map)) logger.info("Next scan in %s seconds...", SCAN_INTERVAL) def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: """ Build a JSON-serializable payload for SFM backend. All timestamps are converted to UTC for transmission (standard practice). Terra-View stores UTC and converts to local time for display. Structure (example): { "source": "series4_ingest", "generated_at": "2025-12-04T20:01:00Z", "units": [ { "unit_id": "UM11719", "type": "micromate", "project_hint": "Clearwater - ECMS 57940", "last_call": "2025-12-05T00:30:42Z", "status": "OK", "age_days": 0.04, "age_hours": 0.9, "mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG" }, ... ] } """ now_local = datetime.now() now_utc = datetime.now(timezone.utc) local_tz = ZoneInfo("America/New_York") payload_units = [] for unit_id, entry in unit_map.items(): last_call: datetime = entry["last_call"] project = entry["project"] mlg_path = entry["mlg_path"] # Use local time for status calculation (age comparison) status, age_days = determine_status(last_call, now_local) age_hours = age_days * 24.0 # Convert last_call from local time to UTC for transmission last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc) payload_units.append( { "unit_id": unit_id, "type": "micromate", "project_hint": project, "last_call": last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ"), "status": status, "age_days": age_days, "age_hours": age_hours, "mlg_path": mlg_path, } ) payload = { "source": "series4_ingest", "generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"), "units": payload_units, } return payload def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None: """ Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured. This is intentionally conservative: - If SFM_ENDPOINT is empty -> do nothing - If any error occurs -> print to stderr, but do not crash the agent """ if not SFM_ENDPOINT: return if urllib is None: logger.warning( "urllib not available; cannot POST to SFM. " "Install standard Python or disable SFM_ENDPOINT." ) return payload = build_sfm_payload(unit_map) data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( SFM_ENDPOINT, data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp: _ = resp.read() # we don't care about the body for now debug(f"SFM POST OK: HTTP {resp.status}") except urllib.error.URLError as e: logger.warning("Failed to POST to SFM: %s", e) except Exception as e: logger.warning("Unexpected error during SFM POST: %s", e) def setup_logging(log_dir: str) -> None: os.makedirs(log_dir, exist_ok=True) logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) # File handler with rotation file_handler = RotatingFileHandler( os.path.join(log_dir, "thor_ingest.log"), maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8", ) file_handler.setLevel(logging.DEBUG if DEBUG else logging.INFO) file_format = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") file_handler.setFormatter(file_format) # Console handler (clean output) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG if DEBUG else logging.INFO) console_format = logging.Formatter("%(message)s") console_handler.setFormatter(console_format) # Avoid duplicate handlers if main is re-entered if not logger.handlers: logger.addHandler(file_handler) logger.addHandler(console_handler) def main() -> None: parser = argparse.ArgumentParser(description="Series 4 Ingest Agent (Micromate)") parser.add_argument("--config", help="Path to config.json") parser.add_argument("--log-dir", help="Directory for log output") parser.add_argument("--once", action="store_true", help="Run one scan and exit") parser.add_argument("--version", action="store_true", help="Print version and exit") args = parser.parse_args() if args.version: print(__version__) return config = load_config(args.config) global THORDATA_PATH, SCAN_INTERVAL, LATE_DAYS, STALE_DAYS, SFM_ENDPOINT, SFM_TIMEOUT, DEBUG THORDATA_PATH = config["thordata_path"] SCAN_INTERVAL = config["scan_interval"] LATE_DAYS = config["late_days"] STALE_DAYS = config["stale_days"] SFM_ENDPOINT = config["sfm_endpoint"] SFM_TIMEOUT = config["sfm_timeout"] DEBUG = config["debug"] log_dir = args.log_dir or DEFAULT_LOG_DIR setup_logging(log_dir) logger.info("Starting Series 4 Ingest Agent (Micromate) v%s", __version__) logger.info("THORDATA_PATH = %s", THORDATA_PATH) logger.info("SCAN_INTERVAL = %s seconds", SCAN_INTERVAL) logger.info("LATE_DAYS = %s, STALE_DAYS = %s", LATE_DAYS, STALE_DAYS) if not os.path.isdir(THORDATA_PATH): logger.warning("THORDATA_PATH does not exist: %s", THORDATA_PATH) loop_counter = 0 try: while True: loop_counter += 1 logger.info("\n[LOOP] Iteration %s starting...", loop_counter) try: unit_map = scan_thordata(THORDATA_PATH) debug(f"scan_thordata found {len(unit_map)} units") print_heartbeat(unit_map) emit_sfm_payload(unit_map) logger.info("[LOOP] Iteration complete, entering sleep...") except Exception as e: # Catch-all so a single error doesn't kill the loop logger.error("Exception in main loop: %s", e) if args.once: break # Sleep in 1-second chunks to avoid VM time drift weirdness for i in range(SCAN_INTERVAL): time.sleep(1) logger.info("[LOOP] Woke up for next scan") except KeyboardInterrupt: logger.info("\nSeries 4 Ingest Agent stopped by user.") if __name__ == "__main__": main()