diff --git a/.gitignore b/.gitignore index 175dd94..6e3e0a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +config.ini + # ------------------------- # Python ignores # ------------------------- @@ -22,8 +24,8 @@ dist/ # ------------------------- # Logs + runtime artifacts # ------------------------- -emitter_logs/* -!emitter_logs/.gitkeep # keep the folder but ignore its contents +agent_logs/* +!agent_logs/.gitkeep # keep the folder but ignore its contents *.log diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d7981a..b93df6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,36 +1,83 @@ # Changelog -All notable changes to **Series3 Emitter** will be documented in this file. +All notable changes to **Series3 Agent** will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). --- -## [Unreleased] +## [1.2.1] - 2026-03-03 + +### Changed +- Changed the name of the program to "series3-agent", this was done to align with the s4/thor agent and because it represents the program's functionality better. +- All instances of "emitter" changed to agent. +- config.ini added to .gitignore, replaced with a template example file. +- README.md updated to reflect changes. --- -## [1.0.0] – 2025-09-02 +## [1.2.0] - 2025-12-04 + +### Changed +- Removed roster CSV dependency and all Dropbox refresh/hot-reload logic; heartbeat now only enumerates `.MLG` files. +- Added `MAX_EVENT_AGE_DAYS` filter to ignore stale events and log when no recent activity exists. +- Simplified heartbeat output/logging to show detected units only; logging hardened to never crash the agent. + +--- + +## [1.1.1] - 2025-12-02 + +### Added +- Example `config.ini` now ships with API heartbeat settings enabled (`API_ENABLED`, `API_URL`, `API_INTERVAL_SECONDS`, `SOURCE_ID`, `SOURCE_TYPE`). + +--- + +## [1.1.0] - 2025-12-01 + +### Added +- Standardized SFM telemetry payload builder and periodic HTTP heartbeat POST via `urllib`. +- Config support for API heartbeat (`API_ENABLED`, `API_URL`, `API_INTERVAL_SECONDS`, `SOURCE_ID`, `SOURCE_TYPE`); payload includes file path/size metadata. + +### Changed +- Refactored scanner to retain file paths and header sniff cache; reformatted logging/ANSI handling. + +--- + +## [1.0.1] - 2025-11-20 + +### Added +- `API_URL` config key and `report_to_server` per-unit POST hook (adds `requests` dependency). + +### Changed +- Example `config.ini` roster URL updated; merged into `main`. + +--- + +## [1.0.0] - 2025-11-17 ### Added - **Automatic roster refresh** from Dropbox at a configurable interval (`ROSTER_REFRESH_MIN_SECONDS`). - **Hot-reload** of roster file without restarting the script. - **Failsafe reload:** if the new roster is missing or invalid, the previous good roster is retained. -- **Atomic roster downloads** (temp file → replace) to avoid partial/corrupted CSVs. +- **Atomic roster downloads** (temp file in-place replace) to avoid partial/corrupted CSVs. - **Startup config echo** printing WATCH_PATH, ROSTER_FILE, and ROSTER_URL visibility. - **Active / Bench / Ignored** unit categories for clearer fleet status mapping. ### Fixed -- Removed stray `note=note_suffix` bug in the “Unexpected Units” section. +- Removed stray `note=note_suffix` bug in the Unexpected Units section. - Removed duplicate `import time`. - Removed duplicate roster load during startup (roster now loads once). - Cleaned indentation for Python 3.8 compatibility. ### Changed -- Reset versioning from legacy `v5.9 beta` → **v1.0.0** (clean semver baseline). -- Main script normalized as `series3_emitter.py`. +- Reset versioning from legacy `v5.9 beta` to **v1.0.0** (clean semver baseline). +- Main script normalized as `series3_emitter.py` (later renamed to `series3_agent.py` in v1.2.1). --- -[Unreleased]: https://example.com/compare/v1.0.0...HEAD -[1.0.0]: https://example.com/releases/v1.0.0 \ No newline at end of file +[Unreleased]: https://example.com/compare/v1.2.0...HEAD +[1.2.0]: https://example.com/releases/v1.2.0 +[1.1.1]: https://example.com/releases/v1.1.1 +[1.1.0]: https://example.com/releases/v1.1.0 +[1.0.1]: https://example.com/releases/v1.0.1 +[1.0.0]: https://example.com/releases/v1.0.0 diff --git a/README.md b/README.md index 290f1f5..69b6a90 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +# Series3 Ingest Agent v1.2 + A lightweight Python script that monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. It scans the event folder, reads `.MLG` headers to identify unit IDs, and prints a live status table showing: @@ -5,9 +7,8 @@ It scans the event folder, reads `.MLG` headers to identify unit IDs, and prints - Last event received - Age since last call-in - OK / Pending / Missing states -- Bench and ignored units -- Unexpected units -- Notes from the roster file +- Detected units (no roster required) +- Optional API heartbeat to Seismograph Fleet Manager backend This script is part of the larger **Seismograph Fleet Manager** project. @@ -17,7 +18,6 @@ This script is part of the larger **Seismograph Fleet Manager** project. - Python 3.8 (Windows 7 compatible) - Blastware 10 event folder available locally -- `series3_roster.csv` in the configured path - `config.ini` in the same directory as the script Install dependencies with: @@ -28,18 +28,17 @@ Install dependencies with: ## Usage -Run the emitter from the folder containing the script: +Run the agent from the folder containing the script: -`python series3_emitter.py` +`python series3_agent.py` The script will: -1. Load the roster file -2. Scan the Blastware event folder for `.MLG` files -3. Sniff each file header for the unit ID -4. Print a status line for each active unit -5. Refresh the roster automatically if `ROSTER_URL` is set -6. Write logs into the `emitter_logs/` folder +1. Scan the Blastware event folder for `.MLG` files (within a max age window). +2. Sniff each file header for the unit ID. +3. Print a status line for each detected unit (OK / Pending / Missing). +4. Optionally POST a heartbeat payload on an interval when `API_ENABLED=true`. +5. Write logs into the `agent_logs/` folder and auto-clean old logs. --- @@ -49,17 +48,24 @@ All settings are stored in `config.ini`. Key fields: -- `SERIES3_PATH` – folder containing `.MLG` files -- `ROSTER_FILE` – path to the local roster CSV -- `ROSTER_URL` – optional URL for automatic roster downloads -- `SCAN_INTERVAL_SECONDS` – how often to scan -- `OK_HOURS` / `MISSING_HOURS` – thresholds for status +- `SERIES3_PATH` — folder containing `.MLG` files +- `SCAN_INTERVAL_SECONDS` — how often to scan +- `OK_HOURS` / `MISSING_HOURS` — thresholds for status +- `MLG_HEADER_BYTES` — how many bytes to sniff from each `.MLG` header +- `RECENT_WARN_DAYS` — log unsniffable files newer than this window +- `MAX_EVENT_AGE_DAYS` — ignore events older than this many days +- `API_ENABLED` — enable/disable heartbeat POST +- `API_URL` — heartbeat endpoint +- `API_INTERVAL_SECONDS` — heartbeat frequency +- `SOURCE_ID` / `SOURCE_TYPE` — identifiers included in the API payload +- `LOG_RETENTION_DAYS` — auto-delete logs older than this many days +- `COLORIZE` — ANSI color output (off by default for Win7) --- ## Logs -Logs are stored under `emitter_logs/`. +Logs are stored under `agent_logs/`. Git ignores all log files but keeps the folder itself. --- @@ -68,7 +74,7 @@ Git ignores all log files but keeps the folder itself. This repo follows **Semantic Versioning (SemVer)**. -Current release: **v1.0.0** – stable baseline emitter. +Current release: **v1.2.1** — renamed to series3 ingest agent. See `CHANGELOG.md` for details. --- @@ -76,4 +82,3 @@ See `CHANGELOG.md` for details. ## License Private / internal project. -``` \ No newline at end of file diff --git a/README_DL2.md b/README_DL2.md index 9096a87..43418e4 100644 --- a/README_DL2.md +++ b/README_DL2.md @@ -1,10 +1,10 @@ -# Series 3 Emitter — v1_0(py38-safe) for DL2 +# Series 3 Ingest Agent — v1_0(py38-safe) for DL2 **Target**: Windows 7 + Python 3.8.10 **Baseline**: v5_4 (no logic changes) ## Files -- series3_emitter_v1_0_py38.py — main script (py38-safe) +- series3_agent_v1_0_py38.py — main script (py38-safe) - config.ini — your config (already included) - series3_roster.csv — your roster (already included, this auto updates from a URL to a dropbox file) - requirements.txt — none beyond stdlib @@ -15,7 +15,7 @@ 3) Open CMD: ```cmd cd C:\SeismoEmitter - python series3_emitter_v1_0_py38.py + python series3_agent_v1_0_py38.py ``` (If the console shows escape codes on Win7, set `COLORIZE = False` in `config.ini`.) @@ -23,4 +23,4 @@ - Heartbeat prints Local/UTC timestamps - One line per active roster unit with OK/Pending/Missing, Age, Last, File - Unexpected units block shows .MLG not in roster -- emitter.log rotates per LOG_RETENTION_DAYS +- agent.log rotates per LOG_RETENTION_DAYS diff --git a/config.ini b/config-template.ini similarity index 56% rename from config.ini rename to config-template.ini index 5b4c3dc..849a4cd 100644 --- a/config.ini +++ b/config-template.ini @@ -1,9 +1,15 @@ -[emitter] +[agent] + +# --- API Heartbeat Settings --- +API_ENABLED = true +API_URL = +API_INTERVAL_SECONDS = 300 +SOURCE_ID = #computer that is running agent. +SOURCE_TYPE = series3_agent + # Paths SERIES3_PATH = C:\Blastware 10\Event\autocall home -ROSTER_FILE = C:\SeismoEmitter\series3_roster.csv -ROSTER_URL = https://www.dropbox.com/scl/fi/gadrpjj2nif3f6q5k60zy/series3_roster.csv?rlkey=fkycemzg4s86pmlxpih4f3pkx&st=hvx0mgln&dl=1 -ROSTER_REFRESH_MIN_SECONDS = 0 +MAX_EVENT_AGE_DAYS = 365 # Scanning @@ -13,10 +19,10 @@ MISSING_HOURS = 24 # Logging ENABLE_LOGGING = True -LOG_FILE = C:\SeismoEmitter\emitter_logs\series3_emitter.log +LOG_FILE = C:\SeismoEmitter\agent_logs\series3_agent.log LOG_RETENTION_DAYS = 30 -# Console colors +# Console colors - (Doesn't work on windows 7) COLORIZE = FALSE # .MLG parsing diff --git a/series3_agent.py b/series3_agent.py new file mode 100644 index 0000000..080544e --- /dev/null +++ b/series3_agent.py @@ -0,0 +1,400 @@ +""" +Series 3 Ingest Agent — v1.2.1 + +Environment: +- Python 3.8 (Windows 7 compatible) +- Runs on DL2 with Blastware 10 event path + +Key Features: +- Config-driven paths, intervals, and logging +- Compact console heartbeat with status per unit +- Logging with retention auto-clean (days configurable) +- Safe .MLG header sniff for unit IDs (BE#### / BA####) +- Standardized SFM Telemetry JSON payload (source-agnostic) +- Periodic HTTP heartbeat POST to SFM backend +- NEW in v1.2.0: + - No local roster / CSV dependency + - Only scans .MLG files newer than MAX_EVENT_AGE_DAYS +""" + +import os +import re +import time +import json +import configparser +import urllib.request +import urllib.error +from datetime import datetime, timezone, timedelta +from typing import Dict, Any, Optional, Tuple +from socket import gethostname + + +# ---------------- Config ---------------- +def load_config(path: str) -> Dict[str, Any]: + """Load INI with tolerant inline comments and a required [agent] section.""" + cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#")) + cp.optionxform = str # preserve key case + with open(path, "r", encoding="utf-8") as f: + txt = f.read() + # Ensure we have a section header + if not re.search(r"^\s*\[", txt, flags=re.M): + txt = "[agent]\n" + txt + cp.read_string(txt) + sec = cp["agent"] + + def get_str(k: str, dflt: str) -> str: + return sec.get(k, dflt).strip() + + def get_int(k: str, dflt: int) -> int: + try: + return int(sec.get(k, str(dflt)).strip()) + except Exception: + return dflt + + def get_bool(k: str, dflt: bool) -> bool: + v = sec.get(k, None) + if v is None: + return dflt + return v.strip().lower() in ("1", "true", "on", "yes", "y") + + return { + "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), + "SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300), + "OK_HOURS": float(get_int("OK_HOURS", 12)), + "MISSING_HOURS": float(get_int("MISSING_HOURS", 24)), + "ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True), + "LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\agent_logs\series3_agent.log"), + "LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30), + "COLORIZE": get_bool("COLORIZE", False), # Win7 default off + "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), + "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), + "MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365), + + # API heartbeat / SFM telemetry + "API_ENABLED": get_bool("API_ENABLED", False), + "API_URL": get_str("API_URL", ""), + "API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300), + "SOURCE_ID": get_str("SOURCE_ID", gethostname()), + "SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_ingest_agent"), + } + + +# --------------- ANSI helpers --------------- +def ansi(enabled: bool, code: str) -> str: + return code if enabled else "" + + +# --------------- Logging -------------------- +def log_message(path: str, enabled: bool, msg: str) -> None: + if not enabled: + return + try: + d = os.path.dirname(path) or "." + if not os.path.exists(d): + os.makedirs(d) + with open(path, "a", encoding="utf-8") as f: + f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) + except Exception: + # Logging must never crash the agent + pass + + +def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None: + if not enabled or retention_days <= 0: + return + stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt") + now = datetime.now(timezone.utc) + last = None + try: + if os.path.exists(stamp_file): + with open(stamp_file, "r", encoding="utf-8") as f: + last = datetime.fromisoformat(f.read().strip()) + except Exception: + last = None + if (last is None) or (now - last > timedelta(days=retention_days)): + try: + if os.path.exists(log_file): + open(log_file, "w", encoding="utf-8").close() + with open(stamp_file, "w", encoding="utf-8") as f: + f.write(now.isoformat()) + print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S"))) + log_message(log_file, enabled, "Logs auto-cleared") + except Exception: + pass + + +# --------------- .MLG sniff ------------------ +UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") + + +def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: + """Return BE####/BA#### from header bytes, or None.""" + try: + with open(path, "rb") as f: + chunk = f.read(max(256, min(header_bytes, 65536))) + m = UNIT_BYTES_RE.search(chunk) + if not m: + return None + raw = m.group(0) + cleaned = re.sub(rb"[^A-Z0-9]", b"", raw) + try: + return cleaned.decode("ascii").upper() + except Exception: + return None + except Exception: + return None + + +# --------------- Scan helpers --------------- +def fmt_last(ts: float) -> str: + return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") + + +def fmt_age(now_epoch: float, mtime: float) -> str: + mins = int((now_epoch - mtime) // 60) + if mins < 0: + mins = 0 + return "{}h {}m".format(mins // 60, mins % 60) + + +def scan_latest( + watch: str, + header_bytes: int, + cache: Dict[str, Tuple[float, str]], + recent_cutoff: float, + max_age_days: int, + logger=None, +) -> Dict[str, Dict[str, Any]]: + """ + Return newest .MLG per unit, only for files newer than max_age_days: + {uid: {'mtime': float, 'fname': str, 'path': str}} + """ + latest: Dict[str, Dict[str, Any]] = {} + if not os.path.exists(watch): + print("[WARN] Watch path not found:", watch) + return latest + + now_ts = time.time() + max_age_days = max(1, int(max_age_days)) # sanity floor + max_age_seconds = max_age_days * 86400.0 + + try: + with os.scandir(watch) as it: + for e in it: + if (not e.is_file()) or (not e.name.lower().endswith(".mlg")): + continue + fpath = e.path + try: + mtime = e.stat().st_mtime + except Exception: + continue + + # Skip very old events (beyond retention window) + age_seconds = now_ts - mtime + if age_seconds < 0: + age_seconds = 0 + if age_seconds > max_age_seconds: + continue # too old, ignore this file + + cached = cache.get(fpath) + if cached is not None and cached[0] == mtime: + uid = cached[1] + else: + uid = sniff_unit_from_mlg(fpath, header_bytes) + if not uid: + # If unsniffable but very recent, log for later inspection + if (recent_cutoff is not None) and (mtime >= recent_cutoff): + if logger: + logger(f"[unsniffable-recent] {fpath}") + continue # skip file if no unit ID found in header + cache[fpath] = (mtime, uid) + + if (uid not in latest) or (mtime > latest[uid]["mtime"]): + latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath} + except Exception as ex: + print("[WARN] Scan error:", ex) + return latest + + +# --- API heartbeat / SFM telemetry helpers --- +def send_api_payload(payload: dict, api_url: str) -> None: + if not api_url: + return + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=5) as res: + print(f"[API] POST success: {res.status}") + except urllib.error.URLError as e: + print(f"[API] POST failed: {e}") + + +def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict: + """ + Build SFM Telemetry JSON v1 payload from latest-unit dict. + Schema is source-agnostic and future-proof. + """ + now_iso = datetime.now(timezone.utc).isoformat() + now_ts = time.time() + + payload = { + "source_id": cfg.get("SOURCE_ID", gethostname()), + "source_type": cfg.get("SOURCE_TYPE", "series3_ingest_agent"), + "timestamp": now_iso, + "units": [], + } + + for unit_id, info in units_dict.items(): + mtime = info.get("mtime") + if mtime is not None: + last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() + age_minutes = int(max(0, (now_ts - mtime) // 60)) + else: + last_event_iso = None + age_minutes = None + + file_path = info.get("path") + file_size = None + if file_path: + try: + file_size = os.path.getsize(file_path) + except Exception: + file_size = None + + payload["units"].append( + { + "unit_id": unit_id, + "last_event_time": last_event_iso, + "age_minutes": age_minutes, + "observation_method": "mlg_scan", + "event_metadata": { + "file_name": info.get("fname"), + "file_path": file_path, + "file_size_bytes": file_size, + "event_number": None, + "event_type": None, + }, + } + ) + + return payload + + +# --------------- Main loop ------------------ +def main() -> None: + here = os.path.dirname(__file__) or "." + cfg = load_config(os.path.join(here, "config.ini")) + + WATCH_PATH = cfg["WATCH_PATH"] + SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"]) + OK_HOURS = float(cfg["OK_HOURS"]) + MISSING_HOURS = float(cfg["MISSING_HOURS"]) + ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"]) + LOG_FILE = cfg["LOG_FILE"] + LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"]) + COLORIZE = bool(cfg["COLORIZE"]) + MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) + RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"]) + MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"]) + + C_OK = ansi(COLORIZE, "\033[92m") + C_PEN = ansi(COLORIZE, "\033[93m") + C_MIS = ansi(COLORIZE, "\033[91m") + C_RST = ansi(COLORIZE, "\033[0m") + + print( + "[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( + WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) + ) + ) + log_message( + LOG_FILE, + ENABLE_LOGGING, + "[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format( + WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False)) + ), + ) + + # cache for scanning + sniff_cache: Dict[str, Tuple[float, str]] = {} + + last_api_ts: float = 0.0 + + while True: + try: + now_local = datetime.now().isoformat() + now_utc = datetime.now(timezone.utc).isoformat() + print("-" * 110) + print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) + print("-" * 110) + + clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) + recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400) + logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) + + latest = scan_latest( + WATCH_PATH, + MLG_HEADER_BYTES, + sniff_cache, + recent_cutoff, + MAX_EVENT_AGE_DAYS, + logger_fn, + ) + now_epoch = time.time() + + # Detected units summary (no roster dependency) + if latest: + print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS)) + for uid in sorted(latest.keys()): + info = latest[uid] + age_hours = (now_epoch - info["mtime"]) / 3600.0 + if age_hours > MISSING_HOURS: + status, col = "Missing", C_MIS + elif age_hours > OK_HOURS: + status, col = "Pending", C_PEN + else: + status, col = "OK", C_OK + + line = ( + "{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){rst}".format( + col=col, + uid=uid, + status=status, + age=fmt_age(now_epoch, info["mtime"]), + last=fmt_last(info["mtime"]), + fname=info["fname"], + rst=C_RST, + ) + ) + print(line) + log_message(LOG_FILE, ENABLE_LOGGING, line) + else: + print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS)) + log_message( + LOG_FILE, + ENABLE_LOGGING, + "[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS), + ) + + # ---- API heartbeat to SFM ---- + if cfg.get("API_ENABLED", False): + now_ts = time.time() + interval = int(cfg.get("API_INTERVAL_SECONDS", 300)) + if now_ts - last_api_ts >= interval: + payload = build_sfm_payload(latest, cfg) + send_api_payload(payload, cfg.get("API_URL", "")) + last_api_ts = now_ts + + except KeyboardInterrupt: + print("\nStopping...") + break + except Exception as e: + err = "[loop-error] {}".format(e) + print(err) + log_message(LOG_FILE, ENABLE_LOGGING, err) + + time.sleep(SCAN_INTERVAL) + + +if __name__ == "__main__": + main() diff --git a/series3_emitter.py b/series3_emitter.py deleted file mode 100644 index deb6b6c..0000000 --- a/series3_emitter.py +++ /dev/null @@ -1,459 +0,0 @@ -""" -Series 3 Emitter — v1.0.0 (Stable Baseline, SemVer Reset) - -Environment: -- Python 3.8 (Windows 7 compatible) -- Runs on DL2 with Blastware 10 event path - -Key Features: -- Atomic roster downloads from Dropbox (no partial files) -- Automatic roster refresh from Dropbox at configurable interval -- Automatic hot-reload into memory when roster CSV changes -- Failsafe reload: keeps previous roster if new file is invalid or empty -- Config-driven paths, intervals, and logging -- Compact console heartbeat with status per unit -- Logging with retention auto-clean (days configurable) -- Safe .MLG header sniff for unit IDs (BE#### / BA####) - -Changelog: -- Reset to semantic versioning (from legacy v5.9 beta) -- Fixed stray `note=note_suffix` bug in Unexpected Units block -- Removed duplicate imports and redundant roster load at startup -- Added startup config echo (paths + URL status) -""" - -import os -import re -import csv -import time -import configparser -import urllib.request -from datetime import datetime, timezone, timedelta -from typing import Dict, Any, Optional, Tuple, Set, List - -# ---------------- Config ---------------- -def load_config(path: str) -> Dict[str, Any]: - """Load INI with tolerant inline comments and a required [emitter] section.""" - cp = configparser.ConfigParser(inline_comment_prefixes=(';', '#')) - cp.optionxform = str # preserve key case - with open(path, "r", encoding="utf-8") as f: - txt = f.read() - # Ensure we have a section header - if not re.search(r'^\s*\[', txt, flags=re.M): - txt = "[emitter]\n" + txt - cp.read_string(txt) - sec = cp["emitter"] - - def get_str(k: str, dflt: str) -> str: - return sec.get(k, dflt).strip() - - def get_int(k: str, dflt: int) -> int: - try: - return int(sec.get(k, str(dflt)).strip()) - except Exception: - return dflt - - def get_bool(k: str, dflt: bool) -> bool: - v = sec.get(k, None) - if v is None: - return dflt - return v.strip().lower() in ("1","true","on","yes","y") - - return { - "WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"), - "ROSTER_FILE": get_str("ROSTER_FILE", r"C:\SeismoEmitter\series3_roster.csv"), - "ROSTER_URL": get_str("ROSTER_URL", ""), - "ROSTER_REFRESH_MIN_SECONDS": get_int("ROSTER_REFRESH_MIN_SECONDS", 300), - "SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300), - "OK_HOURS": float(get_int("OK_HOURS", 12)), - "MISSING_HOURS": float(get_int("MISSING_HOURS", 24)), - "ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True), - "LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\emitter_logs\series3_emitter.log"), - "LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30), - "COLORIZE": get_bool("COLORIZE", False), # Win7 default off - "MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)), - "RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30), - } - -# --------------- ANSI helpers --------------- -def ansi(enabled: bool, code: str) -> str: - return code if enabled else "" - -# --------------- Logging -------------------- -def log_message(path: str, enabled: bool, msg: str) -> None: - if not enabled: - return - try: - d = os.path.dirname(path) or "." - if not os.path.exists(d): - os.makedirs(d) - with open(path, "a", encoding="utf-8") as f: - f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg)) - except Exception: - pass - -def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None: - if not enabled or retention_days <= 0: - return - stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt") - now = datetime.now(timezone.utc) - last = None - try: - if os.path.exists(stamp_file): - with open(stamp_file, "r", encoding="utf-8") as f: - last = datetime.fromisoformat(f.read().strip()) - except Exception: - last = None - if (last is None) or (now - last > timedelta(days=retention_days)): - try: - if os.path.exists(log_file): - open(log_file, "w", encoding="utf-8").close() - with open(stamp_file, "w", encoding="utf-8") as f: - f.write(now.isoformat()) - print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S"))) - log_message(log_file, enabled, "Logs auto-cleared") - except Exception: - pass - -# --------------- Roster --------------------- -def normalize_id(uid: str) -> str: - if uid is None: - return "" - return uid.replace(" ", "").strip().upper() - -def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]: - """CSV tolerant of commas in notes: device_id, active, notes... - Returns: active, bench, ignored, notes_by_unit - """ - active: Set[str] = set() - bench: Set[str] = set() - ignored: Set[str] = set() - notes_by_unit: Dict[str, str] = {} - - if not os.path.exists(path): - print("[WARN] Roster not found:", path) - return active, notes_by_unit - try: - with open(path, "r", encoding="utf-8-sig", newline="") as f: - rdr = csv.reader(f) - try: - headers = next(rdr) - except StopIteration: - return active, notes_by_unit - headers = [(h or "").strip().lower() for h in headers] - def idx_of(name: str, fallbacks: List[str]) -> Optional[int]: - if name in headers: - return headers.index(name) - for fb in fallbacks: - if fb in headers: - return headers.index(fb) - return None - i_id = idx_of("device_id", ["unitid","id"]) - i_ac = idx_of("active", []) - i_no = idx_of("notes", ["note","location"]) - if i_id is None or i_ac is None: - print("[WARN] Roster missing device_id/active columns") - return active, notes_by_unit - for row in rdr: - if len(row) <= max(i_id, i_ac): - continue - uid = normalize_id(row[i_id]) - note = "" - if i_no is not None: - extra = row[i_no:] - note = ",".join([c or "" for c in extra]).strip().rstrip(",") - notes_by_unit[uid] = note - if not uid: - continue - is_active = (row[i_ac] or "").strip().lower() in ("yes","y","true","1","on") - flag = (row[i_ac] or "").strip().lower() - if flag in ("yes","y","true","1","on"): - active.add(uid) - elif flag in ("no","n","off","0"): - bench.add(uid) - elif flag in ("ignore","retired","old"): - ignored.add(uid) - - except Exception as e: - print("[WARN] Roster read error:", e) - return active, bench, ignored, notes_by_unit - -# --------------- .MLG sniff ------------------ -UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)") - -def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]: - """Return BE####/BA#### from header bytes, or None.""" - try: - with open(path, "rb") as f: - chunk = f.read(max(256, min(header_bytes, 65536))) - m = UNIT_BYTES_RE.search(chunk) - if not m: - return None - raw = m.group(0) - cleaned = re.sub(rb"[^A-Z0-9]", b"", raw) - try: - return cleaned.decode("ascii").upper() - except Exception: - return None - except Exception: - return None - -# --------------- Scan helpers --------------- -def fmt_last(ts: float) -> str: - return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") - -def fmt_age(now_epoch: float, mtime: float) -> str: - mins = int((now_epoch - mtime) // 60) - if mins < 0: mins = 0 - return "{}h {}m".format(mins//60, mins%60) - -def scan_latest(watch: str, header_bytes: int, - cache: Dict[str, Tuple[float, str]], - recent_cutoff: float = None, - logger=None): - - """Return newest .MLG per unit: {uid: {'mtime': float, 'fname': str}}""" - latest: Dict[str, Dict[str, Any]] = {} - if not os.path.exists(watch): - print("[WARN] Watch path not found:", watch) - return latest - try: - with os.scandir(watch) as it: - for e in it: - if (not e.is_file()) or (not e.name.lower().endswith(".mlg")): - continue - fpath = e.path - try: - mtime = e.stat().st_mtime - except Exception: - continue - cached = cache.get(fpath) - if cached is not None and cached[0] == mtime: - uid = cached[1] - else: - uid = sniff_unit_from_mlg(fpath, header_bytes) - if not uid: - if (recent_cutoff is not None) and (mtime >= recent_cutoff): - if logger: - logger(f"[unsniffable-recent] {fpath}") - continue # skip file if no unit ID found in header - cache[fpath] = (mtime, uid) - if (uid not in latest) or (mtime > latest[uid]["mtime"]): - latest[uid] = {"mtime": mtime, "fname": e.name} - except Exception as ex: - print("[WARN] Scan error:", ex) - return latest - -# --- Roster fetch (Dropbox/HTTPS) helper --- -def refresh_roster_from_url(url: str, dest: str, min_seconds: int, - state: dict, logger=None): - now = time.time() - - # throttle fetches; only pull if enough time elapsed - if now - state.get("t", 0) < max(0, int(min_seconds or 0)): - return - - try: - with urllib.request.urlopen(url, timeout=15) as r: - data = r.read() - if data and data.strip(): - with open(dest, "wb") as f: - f.write(data) - state["t"] = now - if logger: - from datetime import datetime - logger(f"[roster] refreshed from {url} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} " - f"-> {dest} ({len(data)} bytes)") - except Exception as e: - if logger: - logger(f"[roster-fetch-error] {e}") - - -# --- config helper: case-insensitive key lookup --- -def cfg_get(cfg: dict, key: str, default=None): - return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default))) - -# --------------- Main loop ------------------ -def main() -> None: - here = os.path.dirname(__file__) or "." - cfg = load_config(os.path.join(here, "config.ini")) - - WATCH_PATH = cfg["WATCH_PATH"] - ROSTER_FILE = cfg["ROSTER_FILE"] - SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"]) - OK_HOURS = float(cfg["OK_HOURS"]) - MISSING_HOURS = float(cfg["MISSING_HOURS"]) - ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"]) - LOG_FILE = cfg["LOG_FILE"] - LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"]) - COLORIZE = bool(cfg["COLORIZE"]) - MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"]) - - C_OK = ansi(COLORIZE, "\033[92m") - C_PEN = ansi(COLORIZE, "\033[93m") - C_MIS = ansi(COLORIZE, "\033[91m") - C_UNX = ansi(COLORIZE, "\033[95m") - C_RST = ansi(COLORIZE, "\033[0m") - - # --- Dropbox roster refresh (pull CSV to local cache) --- - roster_state = {} - url = str(cfg_get(cfg, "ROSTER_URL", "") or "") - # --- Dropbox roster refresh (pull CSV to local cache) --- - roster_state = {} - url = str(cfg_get(cfg, "ROSTER_URL", "") or "") - - # 🔎 Patch 3: startup config echo (helps debugging) - print(f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") - # (optional, also write it to the log file) - log_message(LOG_FILE, ENABLE_LOGGING, - f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}") - - if url.lower().startswith("http"): - refresh_roster_from_url( - url, - ROSTER_FILE, - int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)), - roster_state, - lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), - ) - - # cache for scanning - sniff_cache: Dict[str, Tuple[float, str]] = {} - - # Always load the (possibly refreshed) local roster - try: - active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE) - except Exception as ex: - log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}") - active = set() - bench = set() - ignored = set() - notes_by_unit = {} - - # track roster file modification time - try: - roster_mtime = os.path.getmtime(ROSTER_FILE) - except Exception: - roster_mtime = None - - - - while True: - try: - now_local = datetime.now().isoformat() - now_utc = datetime.now(timezone.utc).isoformat() - print("-" * 110) - print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc)) - print("-" * 110) - - # Periodically refresh roster file from Dropbox - if url.lower().startswith("http"): - refresh_roster_from_url( - url, - ROSTER_FILE, - int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)), - roster_state, - lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m), - ) - - # Reload roster into memory if the file changed - try: - m = os.path.getmtime(ROSTER_FILE) - except Exception: - m = None - - if m is not None and m != roster_mtime: - roster_mtime = m - try: - new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE) - if new_active or new_bench or new_ignored: - active, bench, ignored, notes_by_unit = new_active, new_bench, new_ignored, new_notes_by_unit - print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}") - log_message(LOG_FILE, ENABLE_LOGGING, - f"[roster] reloaded {len(active)} active units") - else: - print("[ROSTER] Reload skipped — no valid active units in new file") - log_message(LOG_FILE, ENABLE_LOGGING, - "[roster] reload skipped — roster parse failed or empty") - except Exception as ex: - print(f"[ROSTER] Reload failed, keeping previous roster: {ex}") - log_message(LOG_FILE, ENABLE_LOGGING, - f"[roster] reload failed, keeping previous roster: {ex}") - - clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS) - recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400) - logger = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m) - latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger) - now_epoch = time.time() - - for uid in sorted(active): - info = latest.get(uid) - if info is not None: - age_hours = (now_epoch - info["mtime"]) / 3600.0 - if age_hours > MISSING_HOURS: - status, col = "Missing", C_MIS - elif age_hours > OK_HOURS: - status, col = "Pending", C_PEN - else: - status, col = "OK", C_OK - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" - line = ("{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}" - .format(col=col, uid=uid, status=status, - age=fmt_age(now_epoch, info["mtime"]), - last=fmt_last(info["mtime"]), fname=info["fname"], note=note_suffix, rst=C_RST)) - else: - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" - line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(col=C_MIS, uid=uid, note=note_suffix, rst=C_RST) - print(line) - log_message(LOG_FILE, ENABLE_LOGGING, line) - - # Bench Units (rostered but not active in field) - print("\nBench Units (rostered, not active):") - for uid in sorted(bench): - info = latest.get(uid) - note = notes_by_unit.get(uid, "") - note_suffix = f" [{note}]" if note else "" - if info: - line = (f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}") - else: - line = (f"{uid:<8} Bench Last: ---{note_suffix}") - print(line) - log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line) - - # Ignored Units (retired, broken, or do-not-care) -# ignored_detected = [u for u in latest.keys() if u in ignored] -# if ignored_detected: -# print("\nIgnored Units:") -# for uid in sorted(ignored_detected): -# info = latest[uid] -# note = notes_by_unit.get(uid, "") -# note_suffix = f" [{note}]" if note else "" -# line = (f"{uid:<8} Ignored Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}") -# print(line) -# log_message(LOG_FILE, ENABLE_LOGGING, "[ignored] " + line) - unexpected = [ - u for u in latest.keys() - if u not in active and u not in bench and u not in ignored and u not in notes_by_unit - ] - if unexpected: - print("\nUnexpected Units Detected:") - for uid in sorted(unexpected): - info = latest[uid] - line = ("{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}" - .format(col=C_UNX, uid=uid, last=fmt_last(info["mtime"]), fname=info["fname"], rst=C_RST)) - print(line) - log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line) - - except KeyboardInterrupt: - print("\nStopping...") - break - except Exception as e: - err = "[loop-error] {}".format(e) - print(err) - log_message(LOG_FILE, ENABLE_LOGGING, err) - time.sleep(SCAN_INTERVAL) - -if __name__ == "__main__": - main()