commit 1a5d864c13f96e3bb9769efcf1911a2045e4eb39 Author: serversdwn Date: Mon Dec 8 15:06:24 2025 -0500 Initial commit: Series-4 emitter v0.1.0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6c24455 Binary files /dev/null and b/.gitignore differ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..32df53c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "workbench.colorCustomizations": { + "activityBar.background": "#372C13", + "titleBar.activeBackground": "#4D3E1B", + "titleBar.activeForeground": "#FCFBF8" + } +} \ No newline at end of file diff --git a/series4_emitter.py b/series4_emitter.py new file mode 100644 index 0000000..74f4a86 --- /dev/null +++ b/series4_emitter.py @@ -0,0 +1,386 @@ +""" +Series 4 Emitter — v0.1.0 + +Micromate (Series 4) heartbeat emitter for Seismo Fleet Manager (SFM). + +Behavior: +- Scans C:\THORDATA\\\*.MLG +- For each UM####, finds the newest .MLG by timestamp in the filename +- Computes "age" from last_call -> now +- Classifies status as OK / LATE / STALE +- Prints a console heartbeat +- (Optional) Posts JSON payload to SFM backend + +No roster. SFM backend decides what to do with each unit. +""" + +import os +import re +import time +import json +import sys +from datetime import datetime, timedelta +from typing import Dict, Any, Optional, Tuple + +try: + # urllib is in stdlib; used instead of requests for portability + import urllib.request + import urllib.error +except ImportError: + urllib = None # type: ignore + +# ---------------- Config ---------------- + +# Root THORDATA folder on the THOR PC / VM +THORDATA_PATH = r"C:\THORDATA" + +# Scan interval in seconds +SCAN_INTERVAL = 60 # 1 minute + +# Age thresholds (in days) +LATE_DAYS = 2 # between OK and STALE +STALE_DAYS = 60 # stale if last call >= STALE_DAYS + +# Optional SFM backend endpoint (leave empty to disable HTTP) +SFM_ENDPOINT = "" # e.g. "http://sfm-backend.local/api/telemetry/series4" +SFM_TIMEOUT = 5 # seconds + +# Enable/disable debug prints +DEBUG = False + +# Regex: UM12345_YYYYMMDDHHMMSS.MLG +MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE) + + +# ---------------- Helpers ---------------- + + +def debug(msg: str) -> None: + if DEBUG: + print(f"[DEBUG] {msg}", file=sys.stderr, flush=True) + + +def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]: + """ + Parse a Micromate MLG filename of the form: + UM12345_20251204193042.MLG + + Returns: + (unit_id, timestamp) or None if pattern doesn't match. + """ + m = MLG_PATTERN.match(name) + if not m: + return None + unit_id_raw = m.group(1) # e.g. "UM12345" + ts_str = m.group(2) # "YYYYMMDDHHMMSS" + try: + ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S") + except ValueError: + return None + # Normalize unit_id to uppercase for consistency + return unit_id_raw.upper(), ts + + +def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]: + """ + Scan THORDATA folder for Micromate MLG files. + + Expected structure: + C:\THORDATA\\\*.MLG + + Returns: + unit_map: { + "UM12345": { + "unit_id": "UM12345", + "project": "Clearwater - ECMS 57940", + "last_call": datetime(...), + "mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG" + }, + ... + } + """ + unit_map: Dict[str, Dict[str, Any]] = {} + + if not os.path.isdir(root): + debug(f"THORDATA_PATH does not exist or is not a directory: {root}") + return unit_map + + try: + project_names = os.listdir(root) + except OSError as e: + debug(f"Failed to list THORDATA root '{root}': {e}") + return unit_map + + for project_name in project_names: + project_path = os.path.join(root, project_name) + if not os.path.isdir(project_path): + continue + + # Each project contains UM#### subfolders + try: + unit_dirs = os.listdir(project_path) + except OSError as e: + debug(f"Failed to list project '{project_path}': {e}") + continue + + for unit_name in unit_dirs: + unit_path = os.path.join(project_path, unit_name) + if not os.path.isdir(unit_path): + continue + + # We expect folder names like "UM12345" + # but we'll parse filenames anyway, so we don't rely on folder naming. + try: + files = os.listdir(unit_path) + except OSError as e: + debug(f"Failed to list unit folder '{unit_path}': {e}") + continue + + for fname in files: + if not fname.upper().endswith(".MLG"): + continue + + parsed = parse_mlg_filename(fname) + if not parsed: + continue + + unit_id, ts = parsed + full_path = os.path.join(unit_path, fname) + + current = unit_map.get(unit_id) + if current is None or ts > current["last_call"]: + unit_map[unit_id] = { + "unit_id": unit_id, + "project": project_name, + "last_call": ts, + "mlg_path": full_path, + } + + return unit_map + + +def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]: + """ + Determine status (OK / LATE / STALE) based on age in days. + + Returns: + (status, age_days) + """ + if now is None: + now = datetime.now() + + age = now - last_call + # Protect against clocks being off; don't go negative. + if age.total_seconds() < 0: + age = timedelta(seconds=0) + + age_days = age.total_seconds() / 86400.0 + + if age_days < LATE_DAYS: + status = "OK" + elif age_days < STALE_DAYS: + status = "LATE" + else: + status = "STALE" + + return status, age_days + + +def format_age(td: timedelta) -> str: + """ + Format a timedelta into a human-readable age string. + Examples: + 1d 2h + 3h 15m + 42m + """ + total_seconds = int(td.total_seconds()) + if total_seconds < 0: + total_seconds = 0 + + days, rem = divmod(total_seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, _ = divmod(rem, 60) + + parts = [] + if days > 0: + parts.append(f"{days}d") + if hours > 0: + parts.append(f"{hours}h") + if days == 0 and minutes > 0: + parts.append(f"{minutes}m") # only show minutes if < 1d + + if not parts: + return "0m" + + return " ".join(parts) + + +def clear_console() -> None: + """Clear the console screen (Windows / *nix).""" + if os.name == "nt": + os.system("cls") + else: + os.system("clear") + + +def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None: + """ + Print a console heartbeat table of all units. + + Example: + UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940 + """ + now = datetime.now() + + clear_console() + print("Series 4 Emitter — Micromate Heartbeat (v0.1.0)") + print(f"THORDATA root: {THORDATA_PATH}") + print(f"Now: {now.strftime('%Y-%m-%d %H:%M:%S')}") + print("-" * 80) + + if not unit_map: + print("No units found (no .MLG files detected).") + return + + # Sort by unit_id for stable output + for unit_id in sorted(unit_map.keys()): + entry = unit_map[unit_id] + last_call = entry["last_call"] + project = entry["project"] + + age_td = now - last_call + status, _age_days = determine_status(last_call, now) + age_str = format_age(age_td) + last_str = last_call.strftime("%Y-%m-%d %H:%M:%S") + + print( + f"{unit_id:<8} {status:<6} Age: {age_str:<8} " + f"Last: {last_str} Project: {project}" + ) + + print("-" * 80) + print(f"Total units: {len(unit_map)}") + print(f"Next scan in {SCAN_INTERVAL} seconds...") + sys.stdout.flush() + + +def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: + """ + Build a JSON-serializable payload for SFM backend. + + Structure (example): + { + "source": "series4_emitter", + "generated_at": "2025-12-04T20:01:00", + "units": [ + { + "unit_id": "UM11719", + "type": "micromate", + "project_hint": "Clearwater - ECMS 57940", + "last_call": "2025-12-04T19:30:42", + "status": "OK", + "age_days": 0.04, + "age_hours": 0.9, + "mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG" + }, + ... + ] + } + """ + now = datetime.now() + payload_units = [] + + for unit_id, entry in unit_map.items(): + last_call: datetime = entry["last_call"] + project = entry["project"] + mlg_path = entry["mlg_path"] + + status, age_days = determine_status(last_call, now) + age_hours = age_days * 24.0 + + payload_units.append( + { + "unit_id": unit_id, + "type": "micromate", + "project_hint": project, + "last_call": last_call.isoformat(), + "status": status, + "age_days": age_days, + "age_hours": age_hours, + "mlg_path": mlg_path, + } + ) + + payload = { + "source": "series4_emitter", + "generated_at": now.isoformat(), + "units": payload_units, + } + return payload + + +def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None: + """ + Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured. + + This is intentionally conservative: + - If SFM_ENDPOINT is empty -> do nothing + - If any error occurs -> print to stderr, but do not crash the emitter + """ + if not SFM_ENDPOINT: + return + + if urllib is None: + print( + "[WARN] urllib not available; cannot POST to SFM. " + "Install standard Python or disable SFM_ENDPOINT.", + file=sys.stderr, + ) + return + + payload = build_sfm_payload(unit_map) + data = json.dumps(payload).encode("utf-8") + + req = urllib.request.Request( + SFM_ENDPOINT, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp: + _ = resp.read() # we don't care about the body for now + debug(f"SFM POST OK: HTTP {resp.status}") + except urllib.error.URLError as e: + print(f"[WARN] Failed to POST to SFM: {e}", file=sys.stderr) + except Exception as e: + print(f"[WARN] Unexpected error during SFM POST: {e}", file=sys.stderr) + + +def main() -> None: + print("Starting Series 4 Emitter (Micromate) v0.1.0") + print(f"THORDATA_PATH = {THORDATA_PATH}") + print(f"SCAN_INTERVAL = {SCAN_INTERVAL} seconds") + print(f"LATE_DAYS = {LATE_DAYS}, STALE_DAYS = {STALE_DAYS}") + if not os.path.isdir(THORDATA_PATH): + print(f"[WARN] THORDATA_PATH does not exist: {THORDATA_PATH}", file=sys.stderr) + + try: + while True: + try: + unit_map = scan_thordata(THORDATA_PATH) + print_heartbeat(unit_map) + emit_sfm_payload(unit_map) + except Exception as e: + # Catch-all so a single error doesn't kill the loop + print(f"[ERROR] Exception in main loop: {e}", file=sys.stderr) + + time.sleep(SCAN_INTERVAL) + except KeyboardInterrupt: + print("\nSeries 4 Emitter stopped by user.") + + +if __name__ == "__main__": + main()