17 Commits

Author SHA1 Message Date
c133932b29 Merge pull request 'Merge: dev to main, refactor rename' (#1) from dev into main
Reviewed-on: serversdown/series3-agent#1
2026-03-03 17:12:58 -05:00
serversdwn
d404bf6542 refactor: Rename emitter to agent, update related files and logging paths 2026-03-03 17:10:47 -05:00
serversdwn
0d5fa7677f chore: Config-template.ini added to repo. .gitignore updated. 2026-03-03 16:13:01 -05:00
serversdwn
44476248c3 chore: config.ini now added to git ignore. See config template for schema 2026-03-03 16:09:39 -05:00
serversdwn
fa56b93c8f Merge branch 'main' into dev 2025-12-04 17:25:36 -05:00
serversdwn
58ba506f54 docs updated 2025-12-04 17:24:20 -05:00
serversdwn
62a4ca2b1c Update README header formatting 2025-12-04 17:22:30 -05:00
serversdwn
f29943f8e4 Add version information to README 2025-12-04 17:16:59 -05:00
serversdwn
35e3292f01 Merge pull request #4 from serversdwn/dev
Roster deprecated, v1.2
2025-12-04 17:16:23 -05:00
serversdwn
73204ee92e Roster deprecated 2025-12-04 16:22:31 -05:00
serversdwn
47718e7cad Merge pull request #3 from serversdwn/dev
Update to 1.1.0
2025-12-02 01:33:29 -05:00
serversdwn
9074277ff3 config update 2025-12-02 01:31:37 -05:00
serversdwn
551fdae106 v1.1 w/ api funtion added 2025-12-01 16:30:08 -05:00
serversdwn
a03d4a1f05 Add API_URL support + POST reporting logic 2025-11-20 18:24:57 -05:00
serversdwn
142998251c Merge pull request #2 from serversdwn/dev
Merge pull request #1 from serversdwn/main
2025-11-20 18:03:59 -05:00
serversdwn
de3b46a09e Merge pull request #1 from serversdwn/main
Update ROSTER_URL in config.ini
2025-11-20 17:09:12 -05:00
serversdwn
08c54f992c Update ROSTER_URL in config.ini
example config
2025-11-18 02:57:14 -05:00
7 changed files with 501 additions and 500 deletions

6
.gitignore vendored
View File

@@ -1,3 +1,5 @@
config.ini
# ------------------------- # -------------------------
# Python ignores # Python ignores
# ------------------------- # -------------------------
@@ -22,8 +24,8 @@ dist/
# ------------------------- # -------------------------
# Logs + runtime artifacts # Logs + runtime artifacts
# ------------------------- # -------------------------
emitter_logs/* agent_logs/*
!emitter_logs/.gitkeep # keep the folder but ignore its contents !agent_logs/.gitkeep # keep the folder but ignore its contents
*.log *.log

View File

@@ -1,36 +1,83 @@
# Changelog # Changelog
All notable changes to **Series3 Emitter** will be documented in this file. All notable changes to **Series3 Agent** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
--- ---
## [Unreleased] ## [1.2.1] - 2026-03-03
### Changed
- Changed the name of the program to "series3-agent", this was done to align with the s4/thor agent and because it represents the program's functionality better.
- All instances of "emitter" changed to agent.
- config.ini added to .gitignore, replaced with a template example file.
- README.md updated to reflect changes.
--- ---
## [1.0.0] 2025-09-02 ## [1.2.0] - 2025-12-04
### Changed
- Removed roster CSV dependency and all Dropbox refresh/hot-reload logic; heartbeat now only enumerates `.MLG` files.
- Added `MAX_EVENT_AGE_DAYS` filter to ignore stale events and log when no recent activity exists.
- Simplified heartbeat output/logging to show detected units only; logging hardened to never crash the agent.
---
## [1.1.1] - 2025-12-02
### Added
- Example `config.ini` now ships with API heartbeat settings enabled (`API_ENABLED`, `API_URL`, `API_INTERVAL_SECONDS`, `SOURCE_ID`, `SOURCE_TYPE`).
---
## [1.1.0] - 2025-12-01
### Added
- Standardized SFM telemetry payload builder and periodic HTTP heartbeat POST via `urllib`.
- Config support for API heartbeat (`API_ENABLED`, `API_URL`, `API_INTERVAL_SECONDS`, `SOURCE_ID`, `SOURCE_TYPE`); payload includes file path/size metadata.
### Changed
- Refactored scanner to retain file paths and header sniff cache; reformatted logging/ANSI handling.
---
## [1.0.1] - 2025-11-20
### Added
- `API_URL` config key and `report_to_server` per-unit POST hook (adds `requests` dependency).
### Changed
- Example `config.ini` roster URL updated; merged into `main`.
---
## [1.0.0] - 2025-11-17
### Added ### Added
- **Automatic roster refresh** from Dropbox at a configurable interval (`ROSTER_REFRESH_MIN_SECONDS`). - **Automatic roster refresh** from Dropbox at a configurable interval (`ROSTER_REFRESH_MIN_SECONDS`).
- **Hot-reload** of roster file without restarting the script. - **Hot-reload** of roster file without restarting the script.
- **Failsafe reload:** if the new roster is missing or invalid, the previous good roster is retained. - **Failsafe reload:** if the new roster is missing or invalid, the previous good roster is retained.
- **Atomic roster downloads** (temp file replace) to avoid partial/corrupted CSVs. - **Atomic roster downloads** (temp file in-place replace) to avoid partial/corrupted CSVs.
- **Startup config echo** printing WATCH_PATH, ROSTER_FILE, and ROSTER_URL visibility. - **Startup config echo** printing WATCH_PATH, ROSTER_FILE, and ROSTER_URL visibility.
- **Active / Bench / Ignored** unit categories for clearer fleet status mapping. - **Active / Bench / Ignored** unit categories for clearer fleet status mapping.
### Fixed ### Fixed
- Removed stray `note=note_suffix` bug in the Unexpected Units section. - Removed stray `note=note_suffix` bug in the Unexpected Units section.
- Removed duplicate `import time`. - Removed duplicate `import time`.
- Removed duplicate roster load during startup (roster now loads once). - Removed duplicate roster load during startup (roster now loads once).
- Cleaned indentation for Python 3.8 compatibility. - Cleaned indentation for Python 3.8 compatibility.
### Changed ### Changed
- Reset versioning from legacy `v5.9 beta` **v1.0.0** (clean semver baseline). - Reset versioning from legacy `v5.9 beta` to **v1.0.0** (clean semver baseline).
- Main script normalized as `series3_emitter.py`. - Main script normalized as `series3_emitter.py` (later renamed to `series3_agent.py` in v1.2.1).
--- ---
[Unreleased]: https://example.com/compare/v1.0.0...HEAD [Unreleased]: https://example.com/compare/v1.2.0...HEAD
[1.2.0]: https://example.com/releases/v1.2.0
[1.1.1]: https://example.com/releases/v1.1.1
[1.1.0]: https://example.com/releases/v1.1.0
[1.0.1]: https://example.com/releases/v1.0.1
[1.0.0]: https://example.com/releases/v1.0.0 [1.0.0]: https://example.com/releases/v1.0.0

View File

@@ -1,3 +1,5 @@
# Series3 Ingest Agent v1.2
A lightweight Python script that monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server. A lightweight Python script that monitors Instantel **Series 3 (Minimate)** call-in activity on a Blastware server.
It scans the event folder, reads `.MLG` headers to identify unit IDs, and prints a live status table showing: It scans the event folder, reads `.MLG` headers to identify unit IDs, and prints a live status table showing:
@@ -5,9 +7,8 @@ It scans the event folder, reads `.MLG` headers to identify unit IDs, and prints
- Last event received - Last event received
- Age since last call-in - Age since last call-in
- OK / Pending / Missing states - OK / Pending / Missing states
- Bench and ignored units - Detected units (no roster required)
- Unexpected units - Optional API heartbeat to Seismograph Fleet Manager backend
- Notes from the roster file
This script is part of the larger **Seismograph Fleet Manager** project. This script is part of the larger **Seismograph Fleet Manager** project.
@@ -17,7 +18,6 @@ This script is part of the larger **Seismograph Fleet Manager** project.
- Python 3.8 (Windows 7 compatible) - Python 3.8 (Windows 7 compatible)
- Blastware 10 event folder available locally - Blastware 10 event folder available locally
- `series3_roster.csv` in the configured path
- `config.ini` in the same directory as the script - `config.ini` in the same directory as the script
Install dependencies with: Install dependencies with:
@@ -28,18 +28,17 @@ Install dependencies with:
## Usage ## Usage
Run the emitter from the folder containing the script: Run the agent from the folder containing the script:
`python series3_emitter.py` `python series3_agent.py`
The script will: The script will:
1. Load the roster file 1. Scan the Blastware event folder for `.MLG` files (within a max age window).
2. Scan the Blastware event folder for `.MLG` files 2. Sniff each file header for the unit ID.
3. Sniff each file header for the unit ID 3. Print a status line for each detected unit (OK / Pending / Missing).
4. Print a status line for each active unit 4. Optionally POST a heartbeat payload on an interval when `API_ENABLED=true`.
5. Refresh the roster automatically if `ROSTER_URL` is set 5. Write logs into the `agent_logs/` folder and auto-clean old logs.
6. Write logs into the `emitter_logs/` folder
--- ---
@@ -49,17 +48,24 @@ All settings are stored in `config.ini`.
Key fields: Key fields:
- `SERIES3_PATH` folder containing `.MLG` files - `SERIES3_PATH` folder containing `.MLG` files
- `ROSTER_FILE` path to the local roster CSV - `SCAN_INTERVAL_SECONDS` — how often to scan
- `ROSTER_URL` optional URL for automatic roster downloads - `OK_HOURS` / `MISSING_HOURS` — thresholds for status
- `SCAN_INTERVAL_SECONDS` how often to scan - `MLG_HEADER_BYTES` how many bytes to sniff from each `.MLG` header
- `OK_HOURS` / `MISSING_HOURS` thresholds for status - `RECENT_WARN_DAYS` — log unsniffable files newer than this window
- `MAX_EVENT_AGE_DAYS` — ignore events older than this many days
- `API_ENABLED` — enable/disable heartbeat POST
- `API_URL` — heartbeat endpoint
- `API_INTERVAL_SECONDS` — heartbeat frequency
- `SOURCE_ID` / `SOURCE_TYPE` — identifiers included in the API payload
- `LOG_RETENTION_DAYS` — auto-delete logs older than this many days
- `COLORIZE` — ANSI color output (off by default for Win7)
--- ---
## Logs ## Logs
Logs are stored under `emitter_logs/`. Logs are stored under `agent_logs/`.
Git ignores all log files but keeps the folder itself. Git ignores all log files but keeps the folder itself.
--- ---
@@ -68,7 +74,7 @@ Git ignores all log files but keeps the folder itself.
This repo follows **Semantic Versioning (SemVer)**. This repo follows **Semantic Versioning (SemVer)**.
Current release: **v1.0.0** stable baseline emitter. Current release: **v1.2.1** — renamed to series3 ingest agent.
See `CHANGELOG.md` for details. See `CHANGELOG.md` for details.
--- ---
@@ -76,4 +82,3 @@ See `CHANGELOG.md` for details.
## License ## License
Private / internal project. Private / internal project.
```

View File

@@ -1,10 +1,10 @@
# Series 3 Emitter — v1_0(py38-safe) for DL2 # Series 3 Ingest Agent — v1_0(py38-safe) for DL2
**Target**: Windows 7 + Python 3.8.10 **Target**: Windows 7 + Python 3.8.10
**Baseline**: v5_4 (no logic changes) **Baseline**: v5_4 (no logic changes)
## Files ## Files
- series3_emitter_v1_0_py38.py — main script (py38-safe) - series3_agent_v1_0_py38.py — main script (py38-safe)
- config.ini — your config (already included) - config.ini — your config (already included)
- series3_roster.csv — your roster (already included, this auto updates from a URL to a dropbox file) - series3_roster.csv — your roster (already included, this auto updates from a URL to a dropbox file)
- requirements.txt — none beyond stdlib - requirements.txt — none beyond stdlib
@@ -15,7 +15,7 @@
3) Open CMD: 3) Open CMD:
```cmd ```cmd
cd C:\SeismoEmitter cd C:\SeismoEmitter
python series3_emitter_v1_0_py38.py python series3_agent_v1_0_py38.py
``` ```
(If the console shows escape codes on Win7, set `COLORIZE = False` in `config.ini`.) (If the console shows escape codes on Win7, set `COLORIZE = False` in `config.ini`.)
@@ -23,4 +23,4 @@
- Heartbeat prints Local/UTC timestamps - Heartbeat prints Local/UTC timestamps
- One line per active roster unit with OK/Pending/Missing, Age, Last, File - One line per active roster unit with OK/Pending/Missing, Age, Last, File
- Unexpected units block shows .MLG not in roster - Unexpected units block shows .MLG not in roster
- emitter.log rotates per LOG_RETENTION_DAYS - agent.log rotates per LOG_RETENTION_DAYS

View File

@@ -1,9 +1,15 @@
[emitter] [agent]
# --- API Heartbeat Settings ---
API_ENABLED = true
API_URL =
API_INTERVAL_SECONDS = 300
SOURCE_ID = #computer that is running agent.
SOURCE_TYPE = series3_agent
# Paths # Paths
SERIES3_PATH = C:\Blastware 10\Event\autocall home SERIES3_PATH = C:\Blastware 10\Event\autocall home
ROSTER_FILE = C:\SeismoEmitter\series3_roster.csv MAX_EVENT_AGE_DAYS = 365
ROSTER_URL = https://www.dropbox.com/scl/fi/gadrpjj2nif3f6q5k60zy/series3_roster.csv?rlkey=fkycemzg4s86pmlxpih4f3pkx&st=hvx0mgln&dl=1
ROSTER_REFRESH_MIN_SECONDS = 0
# Scanning # Scanning
@@ -13,10 +19,10 @@ MISSING_HOURS = 24
# Logging # Logging
ENABLE_LOGGING = True ENABLE_LOGGING = True
LOG_FILE = C:\SeismoEmitter\emitter_logs\series3_emitter.log LOG_FILE = C:\SeismoEmitter\agent_logs\series3_agent.log
LOG_RETENTION_DAYS = 30 LOG_RETENTION_DAYS = 30
# Console colors # Console colors - (Doesn't work on windows 7)
COLORIZE = FALSE COLORIZE = FALSE
# .MLG parsing # .MLG parsing

400
series3_agent.py Normal file
View File

@@ -0,0 +1,400 @@
"""
Series 3 Ingest Agent — v1.2.1
Environment:
- Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path
Key Features:
- Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
- Standardized SFM Telemetry JSON payload (source-agnostic)
- Periodic HTTP heartbeat POST to SFM backend
- NEW in v1.2.0:
- No local roster / CSV dependency
- Only scans .MLG files newer than MAX_EVENT_AGE_DAYS
"""
import os
import re
import time
import json
import configparser
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple
from socket import gethostname
# ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [agent] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
# Ensure we have a section header
if not re.search(r"^\s*\[", txt, flags=re.M):
txt = "[agent]\n" + txt
cp.read_string(txt)
sec = cp["agent"]
def get_str(k: str, dflt: str) -> str:
return sec.get(k, dflt).strip()
def get_int(k: str, dflt: int) -> int:
try:
return int(sec.get(k, str(dflt)).strip())
except Exception:
return dflt
def get_bool(k: str, dflt: bool) -> bool:
v = sec.get(k, None)
if v is None:
return dflt
return v.strip().lower() in ("1", "true", "on", "yes", "y")
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\agent_logs\series3_agent.log"),
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
"MAX_EVENT_AGE_DAYS": get_int("MAX_EVENT_AGE_DAYS", 365),
# API heartbeat / SFM telemetry
"API_ENABLED": get_bool("API_ENABLED", False),
"API_URL": get_str("API_URL", ""),
"API_INTERVAL_SECONDS": get_int("API_INTERVAL_SECONDS", 300),
"SOURCE_ID": get_str("SOURCE_ID", gethostname()),
"SOURCE_TYPE": get_str("SOURCE_TYPE", "series3_ingest_agent"),
}
# --------------- ANSI helpers ---------------
def ansi(enabled: bool, code: str) -> str:
return code if enabled else ""
# --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
# Logging must never crash the agent
pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0:
return
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
now = datetime.now(timezone.utc)
last = None
try:
if os.path.exists(stamp_file):
with open(stamp_file, "r", encoding="utf-8") as f:
last = datetime.fromisoformat(f.read().strip())
except Exception:
last = None
if (last is None) or (now - last > timedelta(days=retention_days)):
try:
if os.path.exists(log_file):
open(log_file, "w", encoding="utf-8").close()
with open(stamp_file, "w", encoding="utf-8") as f:
f.write(now.isoformat())
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
log_message(log_file, enabled, "Logs auto-cleared")
except Exception:
pass
# --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None."""
try:
with open(path, "rb") as f:
chunk = f.read(max(256, min(header_bytes, 65536)))
m = UNIT_BYTES_RE.search(chunk)
if not m:
return None
raw = m.group(0)
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
try:
return cleaned.decode("ascii").upper()
except Exception:
return None
except Exception:
return None
# --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60)
if mins < 0:
mins = 0
return "{}h {}m".format(mins // 60, mins % 60)
def scan_latest(
watch: str,
header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float,
max_age_days: int,
logger=None,
) -> Dict[str, Dict[str, Any]]:
"""
Return newest .MLG per unit, only for files newer than max_age_days:
{uid: {'mtime': float, 'fname': str, 'path': str}}
"""
latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch)
return latest
now_ts = time.time()
max_age_days = max(1, int(max_age_days)) # sanity floor
max_age_seconds = max_age_days * 86400.0
try:
with os.scandir(watch) as it:
for e in it:
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
continue
fpath = e.path
try:
mtime = e.stat().st_mtime
except Exception:
continue
# Skip very old events (beyond retention window)
age_seconds = now_ts - mtime
if age_seconds < 0:
age_seconds = 0
if age_seconds > max_age_seconds:
continue # too old, ignore this file
cached = cache.get(fpath)
if cached is not None and cached[0] == mtime:
uid = cached[1]
else:
uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid:
# If unsniffable but very recent, log for later inspection
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger:
logger(f"[unsniffable-recent] {fpath}")
continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name, "path": fpath}
except Exception as ex:
print("[WARN] Scan error:", ex)
return latest
# --- API heartbeat / SFM telemetry helpers ---
def send_api_payload(payload: dict, api_url: str) -> None:
if not api_url:
return
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(api_url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=5) as res:
print(f"[API] POST success: {res.status}")
except urllib.error.URLError as e:
print(f"[API] POST failed: {e}")
def build_sfm_payload(units_dict: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
"""
Build SFM Telemetry JSON v1 payload from latest-unit dict.
Schema is source-agnostic and future-proof.
"""
now_iso = datetime.now(timezone.utc).isoformat()
now_ts = time.time()
payload = {
"source_id": cfg.get("SOURCE_ID", gethostname()),
"source_type": cfg.get("SOURCE_TYPE", "series3_ingest_agent"),
"timestamp": now_iso,
"units": [],
}
for unit_id, info in units_dict.items():
mtime = info.get("mtime")
if mtime is not None:
last_event_iso = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
age_minutes = int(max(0, (now_ts - mtime) // 60))
else:
last_event_iso = None
age_minutes = None
file_path = info.get("path")
file_size = None
if file_path:
try:
file_size = os.path.getsize(file_path)
except Exception:
file_size = None
payload["units"].append(
{
"unit_id": unit_id,
"last_event_time": last_event_iso,
"age_minutes": age_minutes,
"observation_method": "mlg_scan",
"event_metadata": {
"file_name": info.get("fname"),
"file_path": file_path,
"file_size_bytes": file_size,
"event_number": None,
"event_type": None,
},
}
)
return payload
# --------------- Main loop ------------------
def main() -> None:
here = os.path.dirname(__file__) or "."
cfg = load_config(os.path.join(here, "config.ini"))
WATCH_PATH = cfg["WATCH_PATH"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
OK_HOURS = float(cfg["OK_HOURS"])
MISSING_HOURS = float(cfg["MISSING_HOURS"])
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
LOG_FILE = cfg["LOG_FILE"]
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
RECENT_WARN_DAYS = int(cfg["RECENT_WARN_DAYS"])
MAX_EVENT_AGE_DAYS = int(cfg["MAX_EVENT_AGE_DAYS"])
C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m")
C_RST = ansi(COLORIZE, "\033[0m")
print(
"[CFG] WATCH_PATH={} SCAN_INTERVAL={}s MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
)
)
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[cfg] WATCH_PATH={} SCAN_INTERVAL={} MAX_EVENT_AGE_DAYS={} API_ENABLED={}".format(
WATCH_PATH, SCAN_INTERVAL, MAX_EVENT_AGE_DAYS, bool(cfg.get("API_ENABLED", False))
),
)
# cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {}
last_api_ts: float = 0.0
while True:
try:
now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110)
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(RECENT_WARN_DAYS) * 86400)
logger_fn = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(
WATCH_PATH,
MLG_HEADER_BYTES,
sniff_cache,
recent_cutoff,
MAX_EVENT_AGE_DAYS,
logger_fn,
)
now_epoch = time.time()
# Detected units summary (no roster dependency)
if latest:
print("\nDetected Units (within last {} days):".format(MAX_EVENT_AGE_DAYS))
for uid in sorted(latest.keys()):
info = latest[uid]
age_hours = (now_epoch - info["mtime"]) / 3600.0
if age_hours > MISSING_HOURS:
status, col = "Missing", C_MIS
elif age_hours > OK_HOURS:
status, col = "Pending", C_PEN
else:
status, col = "OK", C_OK
line = (
"{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){rst}".format(
col=col,
uid=uid,
status=status,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]),
fname=info["fname"],
rst=C_RST,
)
)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line)
else:
print("\nNo recent .MLG activity found within last {} days.".format(MAX_EVENT_AGE_DAYS))
log_message(
LOG_FILE,
ENABLE_LOGGING,
"[info] no recent MLG activity within {} days".format(MAX_EVENT_AGE_DAYS),
)
# ---- API heartbeat to SFM ----
if cfg.get("API_ENABLED", False):
now_ts = time.time()
interval = int(cfg.get("API_INTERVAL_SECONDS", 300))
if now_ts - last_api_ts >= interval:
payload = build_sfm_payload(latest, cfg)
send_api_payload(payload, cfg.get("API_URL", ""))
last_api_ts = now_ts
except KeyboardInterrupt:
print("\nStopping...")
break
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
time.sleep(SCAN_INTERVAL)
if __name__ == "__main__":
main()

View File

@@ -1,459 +0,0 @@
"""
Series 3 Emitter — v1.0.0 (Stable Baseline, SemVer Reset)
Environment:
- Python 3.8 (Windows 7 compatible)
- Runs on DL2 with Blastware 10 event path
Key Features:
- Atomic roster downloads from Dropbox (no partial files)
- Automatic roster refresh from Dropbox at configurable interval
- Automatic hot-reload into memory when roster CSV changes
- Failsafe reload: keeps previous roster if new file is invalid or empty
- Config-driven paths, intervals, and logging
- Compact console heartbeat with status per unit
- Logging with retention auto-clean (days configurable)
- Safe .MLG header sniff for unit IDs (BE#### / BA####)
Changelog:
- Reset to semantic versioning (from legacy v5.9 beta)
- Fixed stray `note=note_suffix` bug in Unexpected Units block
- Removed duplicate imports and redundant roster load at startup
- Added startup config echo (paths + URL status)
"""
import os
import re
import csv
import time
import configparser
import urllib.request
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple, Set, List
# ---------------- Config ----------------
def load_config(path: str) -> Dict[str, Any]:
"""Load INI with tolerant inline comments and a required [emitter] section."""
cp = configparser.ConfigParser(inline_comment_prefixes=(';', '#'))
cp.optionxform = str # preserve key case
with open(path, "r", encoding="utf-8") as f:
txt = f.read()
# Ensure we have a section header
if not re.search(r'^\s*\[', txt, flags=re.M):
txt = "[emitter]\n" + txt
cp.read_string(txt)
sec = cp["emitter"]
def get_str(k: str, dflt: str) -> str:
return sec.get(k, dflt).strip()
def get_int(k: str, dflt: int) -> int:
try:
return int(sec.get(k, str(dflt)).strip())
except Exception:
return dflt
def get_bool(k: str, dflt: bool) -> bool:
v = sec.get(k, None)
if v is None:
return dflt
return v.strip().lower() in ("1","true","on","yes","y")
return {
"WATCH_PATH": get_str("SERIES3_PATH", r"C:\Blastware 10\Event\autocall home"),
"ROSTER_FILE": get_str("ROSTER_FILE", r"C:\SeismoEmitter\series3_roster.csv"),
"ROSTER_URL": get_str("ROSTER_URL", ""),
"ROSTER_REFRESH_MIN_SECONDS": get_int("ROSTER_REFRESH_MIN_SECONDS", 300),
"SCAN_INTERVAL": get_int("SCAN_INTERVAL_SECONDS", 300),
"OK_HOURS": float(get_int("OK_HOURS", 12)),
"MISSING_HOURS": float(get_int("MISSING_HOURS", 24)),
"ENABLE_LOGGING": get_bool("ENABLE_LOGGING", True),
"LOG_FILE": get_str("LOG_FILE", r"C:\SeismoEmitter\emitter_logs\series3_emitter.log"),
"LOG_RETENTION_DAYS": get_int("LOG_RETENTION_DAYS", 30),
"COLORIZE": get_bool("COLORIZE", False), # Win7 default off
"MLG_HEADER_BYTES": max(256, min(get_int("MLG_HEADER_BYTES", 2048), 65536)),
"RECENT_WARN_DAYS": get_int("RECENT_WARN_DAYS", 30),
}
# --------------- ANSI helpers ---------------
def ansi(enabled: bool, code: str) -> str:
return code if enabled else ""
# --------------- Logging --------------------
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try:
d = os.path.dirname(path) or "."
if not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
except Exception:
pass
def clear_logs_if_needed(log_file: str, enabled: bool, retention_days: int) -> None:
if not enabled or retention_days <= 0:
return
stamp_file = os.path.join(os.path.dirname(log_file) or ".", "last_clean.txt")
now = datetime.now(timezone.utc)
last = None
try:
if os.path.exists(stamp_file):
with open(stamp_file, "r", encoding="utf-8") as f:
last = datetime.fromisoformat(f.read().strip())
except Exception:
last = None
if (last is None) or (now - last > timedelta(days=retention_days)):
try:
if os.path.exists(log_file):
open(log_file, "w", encoding="utf-8").close()
with open(stamp_file, "w", encoding="utf-8") as f:
f.write(now.isoformat())
print("Log cleared on {}".format(now.astimezone().strftime("%Y-%m-%d %H:%M:%S")))
log_message(log_file, enabled, "Logs auto-cleared")
except Exception:
pass
# --------------- Roster ---------------------
def normalize_id(uid: str) -> str:
if uid is None:
return ""
return uid.replace(" ", "").strip().upper()
def load_roster(path: str) -> Tuple[Set[str], Set[str], Set[str], Dict[str, str]]:
"""CSV tolerant of commas in notes: device_id, active, notes...
Returns: active, bench, ignored, notes_by_unit
"""
active: Set[str] = set()
bench: Set[str] = set()
ignored: Set[str] = set()
notes_by_unit: Dict[str, str] = {}
if not os.path.exists(path):
print("[WARN] Roster not found:", path)
return active, notes_by_unit
try:
with open(path, "r", encoding="utf-8-sig", newline="") as f:
rdr = csv.reader(f)
try:
headers = next(rdr)
except StopIteration:
return active, notes_by_unit
headers = [(h or "").strip().lower() for h in headers]
def idx_of(name: str, fallbacks: List[str]) -> Optional[int]:
if name in headers:
return headers.index(name)
for fb in fallbacks:
if fb in headers:
return headers.index(fb)
return None
i_id = idx_of("device_id", ["unitid","id"])
i_ac = idx_of("active", [])
i_no = idx_of("notes", ["note","location"])
if i_id is None or i_ac is None:
print("[WARN] Roster missing device_id/active columns")
return active, notes_by_unit
for row in rdr:
if len(row) <= max(i_id, i_ac):
continue
uid = normalize_id(row[i_id])
note = ""
if i_no is not None:
extra = row[i_no:]
note = ",".join([c or "" for c in extra]).strip().rstrip(",")
notes_by_unit[uid] = note
if not uid:
continue
is_active = (row[i_ac] or "").strip().lower() in ("yes","y","true","1","on")
flag = (row[i_ac] or "").strip().lower()
if flag in ("yes","y","true","1","on"):
active.add(uid)
elif flag in ("no","n","off","0"):
bench.add(uid)
elif flag in ("ignore","retired","old"):
ignored.add(uid)
except Exception as e:
print("[WARN] Roster read error:", e)
return active, bench, ignored, notes_by_unit
# --------------- .MLG sniff ------------------
UNIT_BYTES_RE = re.compile(rb"(?:^|[^A-Z])(BE|BA)\d{4,5}(?:[^0-9]|$)")
def sniff_unit_from_mlg(path: str, header_bytes: int) -> Optional[str]:
"""Return BE####/BA#### from header bytes, or None."""
try:
with open(path, "rb") as f:
chunk = f.read(max(256, min(header_bytes, 65536)))
m = UNIT_BYTES_RE.search(chunk)
if not m:
return None
raw = m.group(0)
cleaned = re.sub(rb"[^A-Z0-9]", b"", raw)
try:
return cleaned.decode("ascii").upper()
except Exception:
return None
except Exception:
return None
# --------------- Scan helpers ---------------
def fmt_last(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def fmt_age(now_epoch: float, mtime: float) -> str:
mins = int((now_epoch - mtime) // 60)
if mins < 0: mins = 0
return "{}h {}m".format(mins//60, mins%60)
def scan_latest(watch: str, header_bytes: int,
cache: Dict[str, Tuple[float, str]],
recent_cutoff: float = None,
logger=None):
"""Return newest .MLG per unit: {uid: {'mtime': float, 'fname': str}}"""
latest: Dict[str, Dict[str, Any]] = {}
if not os.path.exists(watch):
print("[WARN] Watch path not found:", watch)
return latest
try:
with os.scandir(watch) as it:
for e in it:
if (not e.is_file()) or (not e.name.lower().endswith(".mlg")):
continue
fpath = e.path
try:
mtime = e.stat().st_mtime
except Exception:
continue
cached = cache.get(fpath)
if cached is not None and cached[0] == mtime:
uid = cached[1]
else:
uid = sniff_unit_from_mlg(fpath, header_bytes)
if not uid:
if (recent_cutoff is not None) and (mtime >= recent_cutoff):
if logger:
logger(f"[unsniffable-recent] {fpath}")
continue # skip file if no unit ID found in header
cache[fpath] = (mtime, uid)
if (uid not in latest) or (mtime > latest[uid]["mtime"]):
latest[uid] = {"mtime": mtime, "fname": e.name}
except Exception as ex:
print("[WARN] Scan error:", ex)
return latest
# --- Roster fetch (Dropbox/HTTPS) helper ---
def refresh_roster_from_url(url: str, dest: str, min_seconds: int,
state: dict, logger=None):
now = time.time()
# throttle fetches; only pull if enough time elapsed
if now - state.get("t", 0) < max(0, int(min_seconds or 0)):
return
try:
with urllib.request.urlopen(url, timeout=15) as r:
data = r.read()
if data and data.strip():
with open(dest, "wb") as f:
f.write(data)
state["t"] = now
if logger:
from datetime import datetime
logger(f"[roster] refreshed from {url} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} "
f"-> {dest} ({len(data)} bytes)")
except Exception as e:
if logger:
logger(f"[roster-fetch-error] {e}")
# --- config helper: case-insensitive key lookup ---
def cfg_get(cfg: dict, key: str, default=None):
return cfg.get(key, cfg.get(key.lower(), cfg.get(key.upper(), default)))
# --------------- Main loop ------------------
def main() -> None:
here = os.path.dirname(__file__) or "."
cfg = load_config(os.path.join(here, "config.ini"))
WATCH_PATH = cfg["WATCH_PATH"]
ROSTER_FILE = cfg["ROSTER_FILE"]
SCAN_INTERVAL = int(cfg["SCAN_INTERVAL"])
OK_HOURS = float(cfg["OK_HOURS"])
MISSING_HOURS = float(cfg["MISSING_HOURS"])
ENABLE_LOGGING = bool(cfg["ENABLE_LOGGING"])
LOG_FILE = cfg["LOG_FILE"]
LOG_RETENTION_DAYS = int(cfg["LOG_RETENTION_DAYS"])
COLORIZE = bool(cfg["COLORIZE"])
MLG_HEADER_BYTES = int(cfg["MLG_HEADER_BYTES"])
C_OK = ansi(COLORIZE, "\033[92m")
C_PEN = ansi(COLORIZE, "\033[93m")
C_MIS = ansi(COLORIZE, "\033[91m")
C_UNX = ansi(COLORIZE, "\033[95m")
C_RST = ansi(COLORIZE, "\033[0m")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# --- Dropbox roster refresh (pull CSV to local cache) ---
roster_state = {}
url = str(cfg_get(cfg, "ROSTER_URL", "") or "")
# 🔎 Patch 3: startup config echo (helps debugging)
print(f"[CFG] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}")
# (optional, also write it to the log file)
log_message(LOG_FILE, ENABLE_LOGGING,
f"[cfg] WATCH_PATH={WATCH_PATH} ROSTER_FILE={ROSTER_FILE} ROSTER_URL={'set' if url else 'not set'}")
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
# cache for scanning
sniff_cache: Dict[str, Tuple[float, str]] = {}
# Always load the (possibly refreshed) local roster
try:
active, bench, ignored, notes_by_unit = load_roster(ROSTER_FILE)
except Exception as ex:
log_message(LOG_FILE, ENABLE_LOGGING, f"[WARN] roster load failed: {ex}")
active = set()
bench = set()
ignored = set()
notes_by_unit = {}
# track roster file modification time
try:
roster_mtime = os.path.getmtime(ROSTER_FILE)
except Exception:
roster_mtime = None
while True:
try:
now_local = datetime.now().isoformat()
now_utc = datetime.now(timezone.utc).isoformat()
print("-" * 110)
print("Heartbeat @ {} (Local) | {} (UTC)".format(now_local, now_utc))
print("-" * 110)
# Periodically refresh roster file from Dropbox
if url.lower().startswith("http"):
refresh_roster_from_url(
url,
ROSTER_FILE,
int(cfg_get(cfg, "ROSTER_REFRESH_MIN_SECONDS", 300)),
roster_state,
lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m),
)
# Reload roster into memory if the file changed
try:
m = os.path.getmtime(ROSTER_FILE)
except Exception:
m = None
if m is not None and m != roster_mtime:
roster_mtime = m
try:
new_active, new_bench, new_ignored, new_notes_by_unit = load_roster(ROSTER_FILE)
if new_active or new_bench or new_ignored:
active, bench, ignored, notes_by_unit = new_active, new_bench, new_ignored, new_notes_by_unit
print(f"[ROSTER] Reloaded: {len(active)} active unit(s) from {ROSTER_FILE}")
log_message(LOG_FILE, ENABLE_LOGGING,
f"[roster] reloaded {len(active)} active units")
else:
print("[ROSTER] Reload skipped — no valid active units in new file")
log_message(LOG_FILE, ENABLE_LOGGING,
"[roster] reload skipped — roster parse failed or empty")
except Exception as ex:
print(f"[ROSTER] Reload failed, keeping previous roster: {ex}")
log_message(LOG_FILE, ENABLE_LOGGING,
f"[roster] reload failed, keeping previous roster: {ex}")
clear_logs_if_needed(LOG_FILE, ENABLE_LOGGING, LOG_RETENTION_DAYS)
recent_cutoff = time.time() - (float(cfg.get("RECENT_WARN_DAYS", 30)) * 86400)
logger = lambda m: log_message(LOG_FILE, ENABLE_LOGGING, m)
latest = scan_latest(WATCH_PATH, MLG_HEADER_BYTES, sniff_cache, recent_cutoff, logger)
now_epoch = time.time()
for uid in sorted(active):
info = latest.get(uid)
if info is not None:
age_hours = (now_epoch - info["mtime"]) / 3600.0
if age_hours > MISSING_HOURS:
status, col = "Missing", C_MIS
elif age_hours > OK_HOURS:
status, col = "Pending", C_PEN
else:
status, col = "OK", C_OK
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = ("{col}{uid:<8} {status:<8} Age: {age:<7} Last: {last} (File: {fname}){note}{rst}"
.format(col=col, uid=uid, status=status,
age=fmt_age(now_epoch, info["mtime"]),
last=fmt_last(info["mtime"]), fname=info["fname"], note=note_suffix, rst=C_RST))
else:
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
line = "{col}{uid:<8} Missing Age: N/A Last: ---{note}{rst}".format(col=C_MIS, uid=uid, note=note_suffix, rst=C_RST)
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, line)
# Bench Units (rostered but not active in field)
print("\nBench Units (rostered, not active):")
for uid in sorted(bench):
info = latest.get(uid)
note = notes_by_unit.get(uid, "")
note_suffix = f" [{note}]" if note else ""
if info:
line = (f"{uid:<8} Bench Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}")
else:
line = (f"{uid:<8} Bench Last: ---{note_suffix}")
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[bench] " + line)
# Ignored Units (retired, broken, or do-not-care)
# ignored_detected = [u for u in latest.keys() if u in ignored]
# if ignored_detected:
# print("\nIgnored Units:")
# for uid in sorted(ignored_detected):
# info = latest[uid]
# note = notes_by_unit.get(uid, "")
# note_suffix = f" [{note}]" if note else ""
# line = (f"{uid:<8} Ignored Last: {fmt_last(info['mtime'])} (File: {info['fname']}){note_suffix}")
# print(line)
# log_message(LOG_FILE, ENABLE_LOGGING, "[ignored] " + line)
unexpected = [
u for u in latest.keys()
if u not in active and u not in bench and u not in ignored and u not in notes_by_unit
]
if unexpected:
print("\nUnexpected Units Detected:")
for uid in sorted(unexpected):
info = latest[uid]
line = ("{col}{uid:<8} Age: - Last: {last} (File: {fname}){rst}"
.format(col=C_UNX, uid=uid, last=fmt_last(info["mtime"]), fname=info["fname"], rst=C_RST))
print(line)
log_message(LOG_FILE, ENABLE_LOGGING, "[unexpected] " + line)
except KeyboardInterrupt:
print("\nStopping...")
break
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(LOG_FILE, ENABLE_LOGGING, err)
time.sleep(SCAN_INTERVAL)
if __name__ == "__main__":
main()