387 lines
11 KiB
Python
387 lines
11 KiB
Python
"""
|
|
Series 4 Emitter — v0.1.0
|
|
|
|
Micromate (Series 4) heartbeat emitter for Seismo Fleet Manager (SFM).
|
|
|
|
Behavior:
|
|
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
|
|
- For each UM####, finds the newest .MLG by timestamp in the filename
|
|
- Computes "age" from last_call -> now
|
|
- Classifies status as OK / LATE / STALE
|
|
- Prints a console heartbeat
|
|
- (Optional) Posts JSON payload to SFM backend
|
|
|
|
No roster. SFM backend decides what to do with each unit.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional, Tuple
|
|
|
|
try:
|
|
# urllib is in stdlib; used instead of requests for portability
|
|
import urllib.request
|
|
import urllib.error
|
|
except ImportError:
|
|
urllib = None # type: ignore
|
|
|
|
# ---------------- Config ----------------
|
|
|
|
# Root THORDATA folder on the THOR PC / VM
|
|
THORDATA_PATH = r"C:\THORDATA"
|
|
|
|
# Scan interval in seconds
|
|
SCAN_INTERVAL = 60 # 1 minute
|
|
|
|
# Age thresholds (in days)
|
|
LATE_DAYS = 2 # between OK and STALE
|
|
STALE_DAYS = 60 # stale if last call >= STALE_DAYS
|
|
|
|
# Optional SFM backend endpoint (leave empty to disable HTTP)
|
|
SFM_ENDPOINT = "" # e.g. "http://sfm-backend.local/api/telemetry/series4"
|
|
SFM_TIMEOUT = 5 # seconds
|
|
|
|
# Enable/disable debug prints
|
|
DEBUG = True
|
|
|
|
# Regex: UM12345_YYYYMMDDHHMMSS.MLG
|
|
MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
|
|
|
|
|
|
# ---------------- Helpers ----------------
|
|
|
|
|
|
def debug(msg: str) -> None:
|
|
if DEBUG:
|
|
print(f"[DEBUG] {msg}", file=sys.stderr, flush=True)
|
|
|
|
|
|
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
|
|
"""
|
|
Parse a Micromate MLG filename of the form:
|
|
UM12345_20251204193042.MLG
|
|
|
|
Returns:
|
|
(unit_id, timestamp) or None if pattern doesn't match.
|
|
"""
|
|
m = MLG_PATTERN.match(name)
|
|
if not m:
|
|
return None
|
|
unit_id_raw = m.group(1) # e.g. "UM12345"
|
|
ts_str = m.group(2) # "YYYYMMDDHHMMSS"
|
|
try:
|
|
ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S")
|
|
except ValueError:
|
|
return None
|
|
# Normalize unit_id to uppercase for consistency
|
|
return unit_id_raw.upper(), ts
|
|
|
|
|
|
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Scan THORDATA folder for Micromate MLG files.
|
|
|
|
Expected structure:
|
|
C:\THORDATA\<Project>\<UM####>\*.MLG
|
|
|
|
Returns:
|
|
unit_map: {
|
|
"UM12345": {
|
|
"unit_id": "UM12345",
|
|
"project": "Clearwater - ECMS 57940",
|
|
"last_call": datetime(...),
|
|
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG"
|
|
},
|
|
...
|
|
}
|
|
"""
|
|
unit_map: Dict[str, Dict[str, Any]] = {}
|
|
|
|
if not os.path.isdir(root):
|
|
debug(f"THORDATA_PATH does not exist or is not a directory: {root}")
|
|
return unit_map
|
|
|
|
try:
|
|
project_names = os.listdir(root)
|
|
except OSError as e:
|
|
debug(f"Failed to list THORDATA root '{root}': {e}")
|
|
return unit_map
|
|
|
|
for project_name in project_names:
|
|
project_path = os.path.join(root, project_name)
|
|
if not os.path.isdir(project_path):
|
|
continue
|
|
|
|
# Each project contains UM#### subfolders
|
|
try:
|
|
unit_dirs = os.listdir(project_path)
|
|
except OSError as e:
|
|
debug(f"Failed to list project '{project_path}': {e}")
|
|
continue
|
|
|
|
for unit_name in unit_dirs:
|
|
unit_path = os.path.join(project_path, unit_name)
|
|
if not os.path.isdir(unit_path):
|
|
continue
|
|
|
|
# We expect folder names like "UM12345"
|
|
# but we'll parse filenames anyway, so we don't rely on folder naming.
|
|
try:
|
|
files = os.listdir(unit_path)
|
|
except OSError as e:
|
|
debug(f"Failed to list unit folder '{unit_path}': {e}")
|
|
continue
|
|
|
|
for fname in files:
|
|
if not fname.upper().endswith(".MLG"):
|
|
continue
|
|
|
|
parsed = parse_mlg_filename(fname)
|
|
if not parsed:
|
|
continue
|
|
|
|
unit_id, ts = parsed
|
|
full_path = os.path.join(unit_path, fname)
|
|
|
|
current = unit_map.get(unit_id)
|
|
if current is None or ts > current["last_call"]:
|
|
unit_map[unit_id] = {
|
|
"unit_id": unit_id,
|
|
"project": project_name,
|
|
"last_call": ts,
|
|
"mlg_path": full_path,
|
|
}
|
|
|
|
return unit_map
|
|
|
|
|
|
def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]:
|
|
"""
|
|
Determine status (OK / LATE / STALE) based on age in days.
|
|
|
|
Returns:
|
|
(status, age_days)
|
|
"""
|
|
if now is None:
|
|
now = datetime.now()
|
|
|
|
age = now - last_call
|
|
# Protect against clocks being off; don't go negative.
|
|
if age.total_seconds() < 0:
|
|
age = timedelta(seconds=0)
|
|
|
|
age_days = age.total_seconds() / 86400.0
|
|
|
|
if age_days < LATE_DAYS:
|
|
status = "OK"
|
|
elif age_days < STALE_DAYS:
|
|
status = "LATE"
|
|
else:
|
|
status = "STALE"
|
|
|
|
return status, age_days
|
|
|
|
|
|
def format_age(td: timedelta) -> str:
|
|
"""
|
|
Format a timedelta into a human-readable age string.
|
|
Examples:
|
|
1d 2h
|
|
3h 15m
|
|
42m
|
|
"""
|
|
total_seconds = int(td.total_seconds())
|
|
if total_seconds < 0:
|
|
total_seconds = 0
|
|
|
|
days, rem = divmod(total_seconds, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
minutes, _ = divmod(rem, 60)
|
|
|
|
parts = []
|
|
if days > 0:
|
|
parts.append(f"{days}d")
|
|
if hours > 0:
|
|
parts.append(f"{hours}h")
|
|
if days == 0 and minutes > 0:
|
|
parts.append(f"{minutes}m") # only show minutes if < 1d
|
|
|
|
if not parts:
|
|
return "0m"
|
|
|
|
return " ".join(parts)
|
|
|
|
|
|
def clear_console() -> None:
|
|
"""Clear the console screen (Windows / *nix)."""
|
|
if os.name == "nt":
|
|
os.system("cls")
|
|
else:
|
|
os.system("clear")
|
|
|
|
|
|
def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None:
|
|
"""
|
|
Print a console heartbeat table of all units.
|
|
|
|
Example:
|
|
UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940
|
|
"""
|
|
now = datetime.now()
|
|
|
|
clear_console()
|
|
print("Series 4 Emitter — Micromate Heartbeat (v0.1.0)")
|
|
print(f"THORDATA root: {THORDATA_PATH}")
|
|
print(f"Now: {now.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print("-" * 80)
|
|
|
|
if not unit_map:
|
|
print("No units found (no .MLG files detected).")
|
|
return
|
|
|
|
# Sort by unit_id for stable output
|
|
for unit_id in sorted(unit_map.keys()):
|
|
entry = unit_map[unit_id]
|
|
last_call = entry["last_call"]
|
|
project = entry["project"]
|
|
|
|
age_td = now - last_call
|
|
status, _age_days = determine_status(last_call, now)
|
|
age_str = format_age(age_td)
|
|
last_str = last_call.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
print(
|
|
f"{unit_id:<8} {status:<6} Age: {age_str:<8} "
|
|
f"Last: {last_str} Project: {project}"
|
|
)
|
|
|
|
print("-" * 80)
|
|
print(f"Total units: {len(unit_map)}")
|
|
print(f"Next scan in {SCAN_INTERVAL} seconds...")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Build a JSON-serializable payload for SFM backend.
|
|
|
|
Structure (example):
|
|
{
|
|
"source": "series4_emitter",
|
|
"generated_at": "2025-12-04T20:01:00",
|
|
"units": [
|
|
{
|
|
"unit_id": "UM11719",
|
|
"type": "micromate",
|
|
"project_hint": "Clearwater - ECMS 57940",
|
|
"last_call": "2025-12-04T19:30:42",
|
|
"status": "OK",
|
|
"age_days": 0.04,
|
|
"age_hours": 0.9,
|
|
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG"
|
|
},
|
|
...
|
|
]
|
|
}
|
|
"""
|
|
now = datetime.now()
|
|
payload_units = []
|
|
|
|
for unit_id, entry in unit_map.items():
|
|
last_call: datetime = entry["last_call"]
|
|
project = entry["project"]
|
|
mlg_path = entry["mlg_path"]
|
|
|
|
status, age_days = determine_status(last_call, now)
|
|
age_hours = age_days * 24.0
|
|
|
|
payload_units.append(
|
|
{
|
|
"unit_id": unit_id,
|
|
"type": "micromate",
|
|
"project_hint": project,
|
|
"last_call": last_call.isoformat(),
|
|
"status": status,
|
|
"age_days": age_days,
|
|
"age_hours": age_hours,
|
|
"mlg_path": mlg_path,
|
|
}
|
|
)
|
|
|
|
payload = {
|
|
"source": "series4_emitter",
|
|
"generated_at": now.isoformat(),
|
|
"units": payload_units,
|
|
}
|
|
return payload
|
|
|
|
|
|
def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None:
|
|
"""
|
|
Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured.
|
|
|
|
This is intentionally conservative:
|
|
- If SFM_ENDPOINT is empty -> do nothing
|
|
- If any error occurs -> print to stderr, but do not crash the emitter
|
|
"""
|
|
if not SFM_ENDPOINT:
|
|
return
|
|
|
|
if urllib is None:
|
|
print(
|
|
"[WARN] urllib not available; cannot POST to SFM. "
|
|
"Install standard Python or disable SFM_ENDPOINT.",
|
|
file=sys.stderr,
|
|
)
|
|
return
|
|
|
|
payload = build_sfm_payload(unit_map)
|
|
data = json.dumps(payload).encode("utf-8")
|
|
|
|
req = urllib.request.Request(
|
|
SFM_ENDPOINT,
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp:
|
|
_ = resp.read() # we don't care about the body for now
|
|
debug(f"SFM POST OK: HTTP {resp.status}")
|
|
except urllib.error.URLError as e:
|
|
print(f"[WARN] Failed to POST to SFM: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[WARN] Unexpected error during SFM POST: {e}", file=sys.stderr)
|
|
|
|
|
|
def main() -> None:
|
|
print("Starting Series 4 Emitter (Micromate) v0.1.0")
|
|
print(f"THORDATA_PATH = {THORDATA_PATH}")
|
|
print(f"SCAN_INTERVAL = {SCAN_INTERVAL} seconds")
|
|
print(f"LATE_DAYS = {LATE_DAYS}, STALE_DAYS = {STALE_DAYS}")
|
|
if not os.path.isdir(THORDATA_PATH):
|
|
print(f"[WARN] THORDATA_PATH does not exist: {THORDATA_PATH}", file=sys.stderr)
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
unit_map = scan_thordata(THORDATA_PATH)
|
|
print_heartbeat(unit_map)
|
|
emit_sfm_payload(unit_map)
|
|
except Exception as e:
|
|
# Catch-all so a single error doesn't kill the loop
|
|
print(f"[ERROR] Exception in main loop: {e}", file=sys.stderr)
|
|
|
|
time.sleep(SCAN_INTERVAL)
|
|
except KeyboardInterrupt:
|
|
print("\nSeries 4 Emitter stopped by user.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|