refactor: Rename emitter to agent, update related files and logging paths

This commit is contained in:
serversdwn
2026-03-03 17:10:47 -05:00
parent 0d5fa7677f
commit d404bf6542
6 changed files with 32 additions and 26 deletions

400
series3_agent.py Normal file
View File

@@ -0,0 +1,400 @@
"""
Series 3 Ingest Agent — v1.2.1
Environment:
- Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path
Key Features:
- Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
- Standardized SFM Telemetry JSON payload (source-agnostic)
- Periodic HTTP heartbeat POST to SFM backend
- NEW in v1.2.0:
- No local roster / CSV dependency
- Only scans .MLG files newer than MAX_EVENT_AGE_DAYS
"""
import os
import re
import time
import json
import configparser
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple
from socket import gethostname
# ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [agent] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
# Ensure we have a section header
if not re.search(r"^\s*\[", txt, flags=re.M):
txt = "[agent]\n" + txt
cp.read_string(txt)
sec = cp["agent"]
def get_str(k: str, dflt: str) -> str:
return sec.get(k, dflt).strip()
def get_int(k: str, dflt: int) -> int:
try:
return int(sec.get(k, str(dflt)).strip())
except Exception:
return dflt
def get_bool(k: str, dflt: bool) -> bool:
v = sec.get(k, None)
if v is None:
return dflt
return v.strip().lower() in ("1", "true", "on", "yes", "y")
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\agent_logs\series3_agent.log"),
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
"MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365),
# API heartbeat / SFM telemetry
"API_ENABLED": get_bool("API_ENABLED", False),
"API_URL": get_str("API_URL", ""),
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_ingest_agent"),
}
# --------------- ANSI helpers ---------------
def ansi(enabled: bool, code: str) -> str:
return code if enabled else ""
# --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
# Logging must never crash the agent
pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0:
return
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
now = datetime.now(timezone.utc)
last = None
try:
if os.path.exists(stamp_file):
with open(stamp_file, "r", encoding="utf-8") as f:
last = datetime.fromisoformat(f.read().strip())
except Exception:
last = None
if (last is None) or (now - last > timedelta(days=retention_days)):
try:
if os.path.exists(log_file):
open(log_file, "w", encoding="utf-8").close()
with open(stamp_file, "w", encoding="utf-8") as f:
f.write(now.isoformat())
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
log_message(log_file, enabled, "Logs auto-cleared")
except Exception:
pass
# --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None."""
try:
with open(path, "rb") as f:
chunk = f.read(max(256, min(header_bytes, 65536)))
m = UNIT_BYTES_RE.search(chunk)
if not m:
return None
raw = m.group(0)
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
try:
return cleaned.decode("ascii").upper()
except Exception:
return None
except Exception:
return None
# --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60)
if mins < 0:
mins = 0
return "{}h {}m".format(mins // 60, mins % 60)
def scan_latest(
watch: str,
header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float,
max_age_days: int,
logger=None,
) -> Dict[str, Dict[str, Any]]:
"""
Return newest .MLG per unit, only for files newer than max_age_days:
{uid: {'mtime': float, 'fname': str, 'path': str}}
"""
latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch)
return latest
now_ts = time.time()
max_age_days = max(1, int(max_age_days)) # sanity floor
max_age_seconds = max_age_days * 86400.0
try:
with os.scandir(watch) as it:
for e in it:
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
continue
fpath = e.path
try:
mtime = e.stat().st_mtime
except Exception:
continue
# Skip very old events (beyond retention window)
age_seconds = now_ts - mtime
if age_seconds < 0:
age_seconds = 0
if age_seconds > max_age_seconds:
continue # too old, ignore this file
cached = cache.get(fpath)
if cached is not None and cached[0] == mtime:
uid = cached[1]
else:
uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid:
# If unsniffable but very recent, log for later inspection
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger:
logger(f"[unsniffable-recent] {fpath}")
continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath}
except Exception as ex:
print("[WARN] Scan error:", ex)
return latest
# --- API heartbeat / SFM telemetry helpers ---
def send_api_payload(payload: dict, api_url: str) -> None:
if not api_url:
return
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as res:
print(f"[API] POST success: {res.status}")
except urllib.error.URLError as e:
print(f"[API] POST failed: {e}")
def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""
Build SFM Telemetry JSON v1 payload from latest-unit dict.
Schema is source-agnostic and future-proof.
"""
now_iso = datetime.now(timezone.utc).isoformat()
now_ts = time.time()
payload = {
"source_id": cfg.get("SOURCE_ID", gethostname()),
"source_type": cfg.get("SOURCE_TYPE", "series3_ingest_agent"),
"timestamp": now_iso,
"units": [],
}
for unit_id, info in units_dict.items():
mtime = info.get("mtime")
if mtime is not None:
last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
age_minutes = int(max(0, (now_ts - mtime) // 60))
else:
last_event_iso = None
age_minutes = None
file_path = info.get("path")
file_size = None
if file_path:
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
payload["units"].append(
{
"unit_id": unit_id,
"last_event_time": last_event_iso,
"age_minutes": age_minutes,
"observation_method": "mlg_scan",
"event_metadata": {
"file_name": info.get("fname"),
"file_path": file_path,
"file_size_bytes": file_size,
"event_number": None,
"event_type": None,
},
}
)
return payload
# --------------- Main loop ------------------
def main() -> None:
here = os.path.dirname(__file__) or "."
cfg = load_config(os.path.join(here, "config.ini"))
WATCH_PATH = cfg["WATCH_PATH"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
OK_HOURS = float(cfg["OK_HOURS"])
MISSING_HOURS = float(cfg["MISSING_HOURS"])
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
LOG_FILE = cfg["LOG_FILE"]
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"])
MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"])
C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m")
C_RST = ansi(COLORIZE, "\033[0m")
print(
"[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
)
)
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
),
)
# cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {}
last_api_ts: float = 0.0
while True:
try:
now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110)
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400)
logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(
WATCH_PATH,
MLG_HEADER_BYTES,
sniff_cache,
recent_cutoff,
MAX_EVENT_AGE_DAYS,
logger_fn,
)
now_epoch = time.time()
# Detected units summary (no roster dependency)
if latest:
print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS))
for uid in sorted(latest.keys()):
info = latest[uid]
age_hours = (now_epoch - info["mtime"]) / 3600.0
if age_hours > MISSING_HOURS:
status, col = "Missing", C_MIS
elif age_hours > OK_HOURS:
status, col = "Pending", C_PEN
else:
status, col = "OK", C_OK
line = (
"{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){rst}".format(
col=col,
uid=uid,
status=status,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]),
fname=info["fname"],
rst=C_RST,
)
)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line)
else:
print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS))
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS),
)
# ---- API heartbeat to SFM ----
if cfg.get("API_ENABLED", False):
now_ts = time.time()
interval = int(cfg.get("API_INTERVAL_SECONDS", 300))
if now_ts - last_api_ts >= interval:
payload = build_sfm_payload(latest, cfg)
send_api_payload(payload, cfg.get("API_URL", ""))
last_api_ts = now_ts
except KeyboardInterrupt:
print("\nStopping...")
break
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
time.sleep(SCAN_INTERVAL)
if __name__ == "__main__":
main()