Initial commit: Series-4 emitter v0.1.0

This commit is contained in:
serversdwn
2025-12-08 15:06:24 -05:00
commit 1a5d864c13
3 changed files with 393 additions and 0 deletions

386
series4_emitter.py Normal file
View File

@@ -0,0 +1,386 @@
"""
Series 4 Emitter — v0.1.0
Micromate (Series 4) heartbeat emitter for Seismo Fleet Manager (SFM).
Behavior:
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
- For each UM####, finds the newest .MLG by timestamp in the filename
- Computes "age" from last_call -> now
- Classifies status as OK / LATE / STALE
- Prints a console heartbeat
- (Optional) Posts JSON payload to SFM backend
No roster. SFM backend decides what to do with each unit.
"""
import os
import re
import time
import json
import sys
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
try:
# urllib is in stdlib; used instead of requests for portability
import urllib.request
import urllib.error
except ImportError:
urllib = None # type: ignore
# ---------------- Config ----------------
# Root THORDATA folder on the THOR PC / VM
THORDATA_PATH = r"C:\THORDATA"
# Scan interval in seconds
SCAN_INTERVAL = 60 # 1 minute
# Age thresholds (in days)
LATE_DAYS = 2 # between OK and STALE
STALE_DAYS = 60 # stale if last call >= STALE_DAYS
# Optional SFM backend endpoint (leave empty to disable HTTP)
SFM_ENDPOINT = "" # e.g. "http://sfm-backend.local/api/telemetry/series4"
SFM_TIMEOUT = 5 # seconds
# Enable/disable debug prints
DEBUG = False
# Regex: UM12345_YYYYMMDDHHMMSS.MLG
MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
# ---------------- Helpers ----------------
def debug(msg: str) -> None:
if DEBUG:
print(f"[DEBUG] {msg}", file=sys.stderr, flush=True)
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
"""
Parse a Micromate MLG filename of the form:
UM12345_20251204193042.MLG
Returns:
(unit_id, timestamp) or None if pattern doesn't match.
"""
m = MLG_PATTERN.match(name)
if not m:
return None
unit_id_raw = m.group(1) # e.g. "UM12345"
ts_str = m.group(2) # "YYYYMMDDHHMMSS"
try:
ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S")
except ValueError:
return None
# Normalize unit_id to uppercase for consistency
return unit_id_raw.upper(), ts
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
"""
Scan THORDATA folder for Micromate MLG files.
Expected structure:
C:\THORDATA\<Project>\<UM####>\*.MLG
Returns:
unit_map: {
"UM12345": {
"unit_id": "UM12345",
"project": "Clearwater - ECMS 57940",
"last_call": datetime(...),
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG"
},
...
}
"""
unit_map: Dict[str, Dict[str, Any]] = {}
if not os.path.isdir(root):
debug(f"THORDATA_PATH does not exist or is not a directory: {root}")
return unit_map
try:
project_names = os.listdir(root)
except OSError as e:
debug(f"Failed to list THORDATA root '{root}': {e}")
return unit_map
for project_name in project_names:
project_path = os.path.join(root, project_name)
if not os.path.isdir(project_path):
continue
# Each project contains UM#### subfolders
try:
unit_dirs = os.listdir(project_path)
except OSError as e:
debug(f"Failed to list project '{project_path}': {e}")
continue
for unit_name in unit_dirs:
unit_path = os.path.join(project_path, unit_name)
if not os.path.isdir(unit_path):
continue
# We expect folder names like "UM12345"
# but we'll parse filenames anyway, so we don't rely on folder naming.
try:
files = os.listdir(unit_path)
except OSError as e:
debug(f"Failed to list unit folder '{unit_path}': {e}")
continue
for fname in files:
if not fname.upper().endswith(".MLG"):
continue
parsed = parse_mlg_filename(fname)
if not parsed:
continue
unit_id, ts = parsed
full_path = os.path.join(unit_path, fname)
current = unit_map.get(unit_id)
if current is None or ts > current["last_call"]:
unit_map[unit_id] = {
"unit_id": unit_id,
"project": project_name,
"last_call": ts,
"mlg_path": full_path,
}
return unit_map
def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]:
"""
Determine status (OK / LATE / STALE) based on age in days.
Returns:
(status, age_days)
"""
if now is None:
now = datetime.now()
age = now - last_call
# Protect against clocks being off; don't go negative.
if age.total_seconds() < 0:
age = timedelta(seconds=0)
age_days = age.total_seconds() / 86400.0
if age_days < LATE_DAYS:
status = "OK"
elif age_days < STALE_DAYS:
status = "LATE"
else:
status = "STALE"
return status, age_days
def format_age(td: timedelta) -> str:
"""
Format a timedelta into a human-readable age string.
Examples:
1d 2h
3h 15m
42m
"""
total_seconds = int(td.total_seconds())
if total_seconds < 0:
total_seconds = 0
days, rem = divmod(total_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if days == 0 and minutes > 0:
parts.append(f"{minutes}m") # only show minutes if < 1d
if not parts:
return "0m"
return " ".join(parts)
def clear_console() -> None:
"""Clear the console screen (Windows / *nix)."""
if os.name == "nt":
os.system("cls")
else:
os.system("clear")
def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None:
"""
Print a console heartbeat table of all units.
Example:
UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940
"""
now = datetime.now()
clear_console()
print("Series 4 Emitter — Micromate Heartbeat (v0.1.0)")
print(f"THORDATA root: {THORDATA_PATH}")
print(f"Now: {now.strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 80)
if not unit_map:
print("No units found (no .MLG files detected).")
return
# Sort by unit_id for stable output
for unit_id in sorted(unit_map.keys()):
entry = unit_map[unit_id]
last_call = entry["last_call"]
project = entry["project"]
age_td = now - last_call
status, _age_days = determine_status(last_call, now)
age_str = format_age(age_td)
last_str = last_call.strftime("%Y-%m-%d %H:%M:%S")
print(
f"{unit_id:<8} {status:<6} Age: {age_str:<8} "
f"Last: {last_str} Project: {project}"
)
print("-" * 80)
print(f"Total units: {len(unit_map)}")
print(f"Next scan in {SCAN_INTERVAL} seconds...")
sys.stdout.flush()
def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""
Build a JSON-serializable payload for SFM backend.
Structure (example):
{
"source": "series4_emitter",
"generated_at": "2025-12-04T20:01:00",
"units": [
{
"unit_id": "UM11719",
"type": "micromate",
"project_hint": "Clearwater - ECMS 57940",
"last_call": "2025-12-04T19:30:42",
"status": "OK",
"age_days": 0.04,
"age_hours": 0.9,
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG"
},
...
]
}
"""
now = datetime.now()
payload_units = []
for unit_id, entry in unit_map.items():
last_call: datetime = entry["last_call"]
project = entry["project"]
mlg_path = entry["mlg_path"]
status, age_days = determine_status(last_call, now)
age_hours = age_days * 24.0
payload_units.append(
{
"unit_id": unit_id,
"type": "micromate",
"project_hint": project,
"last_call": last_call.isoformat(),
"status": status,
"age_days": age_days,
"age_hours": age_hours,
"mlg_path": mlg_path,
}
)
payload = {
"source": "series4_emitter",
"generated_at": now.isoformat(),
"units": payload_units,
}
return payload
def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None:
"""
Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured.
This is intentionally conservative:
- If SFM_ENDPOINT is empty -> do nothing
- If any error occurs -> print to stderr, but do not crash the emitter
"""
if not SFM_ENDPOINT:
return
if urllib is None:
print(
"[WARN] urllib not available; cannot POST to SFM. "
"Install standard Python or disable SFM_ENDPOINT.",
file=sys.stderr,
)
return
payload = build_sfm_payload(unit_map)
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
SFM_ENDPOINT,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp:
_ = resp.read() # we don't care about the body for now
debug(f"SFM POST OK: HTTP {resp.status}")
except urllib.error.URLError as e:
print(f"[WARN] Failed to POST to SFM: {e}", file=sys.stderr)
except Exception as e:
print(f"[WARN] Unexpected error during SFM POST: {e}", file=sys.stderr)
def main() -> None:
print("Starting Series 4 Emitter (Micromate) v0.1.0")
print(f"THORDATA_PATH = {THORDATA_PATH}")
print(f"SCAN_INTERVAL = {SCAN_INTERVAL} seconds")
print(f"LATE_DAYS = {LATE_DAYS}, STALE_DAYS = {STALE_DAYS}")
if not os.path.isdir(THORDATA_PATH):
print(f"[WARN] THORDATA_PATH does not exist: {THORDATA_PATH}", file=sys.stderr)
try:
while True:
try:
unit_map = scan_thordata(THORDATA_PATH)
print_heartbeat(unit_map)
emit_sfm_payload(unit_map)
except Exception as e:
# Catch-all so a single error doesn't kill the loop
print(f"[ERROR] Exception in main loop: {e}", file=sys.stderr)
time.sleep(SCAN_INTERVAL)
except KeyboardInterrupt:
print("\nSeries 4 Emitter stopped by user.")
if __name__ == "__main__":
main()