diff --git a/.gitignore b/.gitignore index 9e47c48..46a811b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ # Thor Watcher local files # -------------------------- config.json +/example-data/ # ------------------------- # Python ignores diff --git a/CHANGELOG.md b/CHANGELOG.md index 29104ae..ced8640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.0] - 2026-05-19 + +### Added +- `event_forwarder.py` — forwards `.IDFH` (histogram) and `.IDFW` (waveform) event files plus their `TXT/.txt` sidecars to a seismo-relay SFM server's new `/db/import/idf_file` endpoint +- Sha256-keyed `thor_forwarded.json` state file for idempotency across restarts and re-scans (default path: `/thor_forwarded.json`) +- "SFM Forward" tab in Settings dialog: enable/URL/Test, forward interval, quiescence, missing-report grace, HTTP timeout, max forwards per pass, max event age, state file picker +- Forwarder status line in tray menu: `SFM OK | N fwd, M err | last 30s ago` +- Tray icon goes amber when the SFM forwarder is failing but the API heartbeat is still healthy +- Re-pair logic: events forwarded without their TXT are re-forwarded once the sidecar appears so the relay can refresh DB rows with device-authoritative PPV/ZCFreq/peak values +- `event_forwarder.py --seed-state` CLI for skipping historical backfill on a first deploy +- Version badge: `Thor Watcher vX.Y.Z` shown at the top of the tray menu and in the Settings dialog title bar — operators no longer have to crack open the .exe properties to tell which version is running + +### Changed +- Bumped `VERSION` to `0.3.0` +- Settings dialog tab order: Connection / Paths / Scanning / Logging / **SFM Forward** / Updates + ## [0.2.0] - 2026-03-20 ### Added diff --git a/README.md b/README.md index f1e501f..8a558ee 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Thor Watcher -**Version:** 0.2.0 +**Version:** 0.3.0 -Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, and sends heartbeat data to Terra-View. +Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, sends heartbeat data to Terra-View, and (optionally) forwards `.IDFH`/`.IDFW` event files to a seismo-relay SFM server. --- @@ -29,7 +29,7 @@ build.bat ``` Produces: -- `dist\thor-watcher-0.2.0.exe` — upload to Gitea release +- `dist\thor-watcher-0.3.0.exe` — upload to Gitea release - `dist\thor-watcher.exe` — use with Inno Setup Then run Inno Setup Compiler on `installer.iss` to produce `thor-watcher-setup.exe`. @@ -62,6 +62,27 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con | `log_retention_days` | integer | `30` | Days before log is auto-cleared | | `update_source` | string | `gitea` | Auto-update source: `gitea`, `url`, or `disabled` | | `update_url` | string | `""` | Base URL for `url` mode (e.g. Terra-View server) | +| `sfm_forward_enabled` | boolean | `false` | Forward `.IDFH`/`.IDFW` event files to a seismo-relay SFM server | +| `sfm_url` | string | `""` | Base URL of the seismo-relay SFM server (e.g. `http://10.0.0.44:8200`) | +| `sfm_forward_interval` | integer | `60` | Seconds between forwarder passes | +| `sfm_quiescence_seconds` | integer | `5` | Skip files modified within the last N seconds (avoid in-flight files) | +| `sfm_missing_report_grace_seconds` | integer | `60` | Forward a binary without its `.txt` sidecar if it hasn't appeared after N seconds | +| `sfm_http_timeout` | integer | `60` | HTTP timeout per forward POST | +| `sfm_state_file` | string | `""` | Path to the sha256-keyed state file. Blank → `\thor_forwarded.json` | +| `sfm_max_forwards_per_pass` | integer | `500` | Cap per pass to drip-feed large backfills | +| `sfm_max_event_age_days` | integer | `365` | Skip event files older than this many days | + +--- + +## Event Forwarding + +When `sfm_forward_enabled` is true and `sfm_url` is set, Thor Watcher walks the THORDATA tree each `sfm_forward_interval` seconds, finds `.IDFH` (histogram) and `.IDFW` (waveform) event binaries plus their `TXT/.txt` ASCII sidecars, and POSTs them to seismo-relay's `/db/import/idf_file` endpoint. + +- **Idempotent.** Every forwarded file is recorded by sha256 in `thor_forwarded.json`. Re-scans never re-POST. +- **Default off.** Operators must explicitly enable from the Settings → SFM Forward tab. +- **Re-pair logic.** If a binary is forwarded before its TXT sidecar appears (after the grace period), it's flagged `had_report=false` and re-forwarded once the TXT arrives so the SFM database row can be refreshed with device-authoritative PPV/ZCFreq/peak values. +- **TXT export must be enabled in Thor.** Thor's TXT sidecars are not produced automatically — operators should enable TXT export so the relay can extract rich metadata. Forwards without a TXT are still useful (binary gets indexed; rich fields stay NULL). +- **Backfill seeding.** To skip a large historical archive on first deploy, run `python event_forwarder.py --seed-state --thordata C:\THORDATA --state ` before flipping the switch. --- @@ -69,8 +90,8 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con | Color | Meaning | |-------|---------| -| Green | Running, API reporting OK | -| Amber | Running, API disabled or not configured | +| Green | Running, API reporting OK (and SFM forwarder healthy when enabled) | +| Amber | Running, API disabled OR SFM forwarder failing while API is healthy | | Red | Running, API failing | | Purple | Error — check logs | | Grey | Starting up | @@ -100,7 +121,7 @@ Posted to `api_url` on each API interval: { "source_id": "THOR-PC", "source_type": "series4_watcher", - "version": "0.2.0", + "version": "0.3.0", "generated_at": "2026-03-20T14:30:00Z", "log_tail": ["...last 25 log lines..."], "units": [ diff --git a/config.example.json b/config.example.json index f2bb763..8444b85 100644 --- a/config.example.json +++ b/config.example.json @@ -14,5 +14,15 @@ "log_retention_days": 30, "update_source": "gitea", - "update_url": "" + "update_url": "", + + "sfm_forward_enabled": false, + "sfm_url": "", + "sfm_forward_interval": 60, + "sfm_quiescence_seconds": 5, + "sfm_missing_report_grace_seconds": 60, + "sfm_http_timeout": 60, + "sfm_state_file": "", + "sfm_max_forwards_per_pass": 500, + "sfm_max_event_age_days": 365 } diff --git a/doc/file-path.md b/doc/file-path.md new file mode 100644 index 0000000..83a46e5 --- /dev/null +++ b/doc/file-path.md @@ -0,0 +1,62 @@ +# Thor File Stucture Guide + +This document is to explain how Thor formatis the file structure for units that call in via thor Autocall home. + +## Main Stucture + +Thor saves its data in a folder located in C:/ called 'THORDATA'. it then Creates folders for each user entered project. When a unit is added to that project, it creates a folder in project with that unit's serial number. Raw events (.IDFH for histogram and .IDFW for waveforms) plus .MLG monitor logs are then saved in this folder. if a unit is not assigned a project, it saves into a default project folder. If it matters, there is also a daily log file that gets created in the folder 'Logs' + +In each unit's folder, there are various formats saved in their own individual folders too. Most cases have CSV, HTML, PDF, TXT, and XML + +Here is the structure illustrated: + +C:/THORDATA +├───Mon-Fayette Express Way - Sec 53A1 +│ └───UM11402 +│ ├───CSV +│ ├───HTML +│ ├───PDF +│ ├───TXT +│ └───XML +├───P.J. Dick - 5th and Halket +│ ├───UM11719 +│ │ ├───CSV +│ │ ├───HTML +│ │ ├───PDF +│ │ ├───TXT +│ │ └───XML +│ UM12420 +│ ├───CSV +│ ├───HTML +│ ├───PDF +│ ├───TXT +│ └───XML + + + +Here is an expanded project folder with two events in it + + +└───Clearwater - ECMS 57940 + └───BE9439 + │ BE9439_20200713124250.MLG + │ BE9439_20200713124251.IDFH + │ BE9439_20200713131747.IDFW + │ BE9439_20200713131747.IDFW.CDB + │ + ├───CSV + │ BE9439_20200713124251.IDFH.csv + │ BE9439_20200713131747.IDFW.csv + │ + ├───PDF + │ BE9439_20200713124251.IDFH.pdf + │ BE9439_20200713131747.IDFW.pdf + │ + ├───TXT + │ BE9439_20200713124251.IDFH.txt + │ BE9439_20200713131747.IDFW.txt + │ + └───XML + BE9439_20200713124251_IDFH_XML.XML + BE9439_20200713131747_IDFW_XML.XML + diff --git a/event_forwarder.py b/event_forwarder.py new file mode 100644 index 0000000..a78abdf --- /dev/null +++ b/event_forwarder.py @@ -0,0 +1,823 @@ +""" +event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event +files to a seismo-relay SFM server. + +Walks the same `THORDATA_PATH///` tree the heartbeat path +scans. For each event binary that hasn't been forwarded yet, pairs it +with its `/TXT/.txt` ASCII report (when available) and +POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one +multipart request. + +This is a port of `series3-watcher/event_forwarder.py` adapted for the +Thor file layout. Key differences from the series3 forwarder: + +- **Filenames are literal `_.IDFH|.IDFW`** — + no base36 stem; the serial is right there in the prefix. +- **TXT sidecars live in a `TXT/` subfolder** next to the binaries + (Thor's exporter writes them there, not alongside the binary). +- **IDFH and IDFW are forwarded as independent events.** Each has + its own sha256-keyed state entry and its own POST. A single + timestamp can produce both a histogram (IDFH) and a waveform (IDFW), + and the seismo-relay endpoint dedupes on (serial, timestamp, kind), + so treating them as separate rows is the right model. + +Design notes +──────────── +- **stdlib only.** Matches the rest of the watcher (`urllib.request`). + Multipart encoding is hand-rolled. +- **Idempotent across restarts.** Forwarded files are tracked by + sha256 in a JSON state file (default: `/thor_forwarded.json`). + Re-scanning the watch tree doesn't re-POST anything. +- **Default-off.** Callers must enable via config + (`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x + deployments that auto-update stay non-forwarding until an operator + flips the switch. +- **Quiescence guard.** Files modified within the last few seconds + are skipped — Thor writes the .txt after the binary, so we wait + until both look stable before forwarding. +- **Best-effort report pairing.** When the .txt hasn't appeared yet + but the binary is older than `missing_report_grace_seconds`, the + binary is forwarded alone (seismo-relay accepts that and just skips + the rich fields — we'd rather get the binary indexed than block + forever waiting for a TXT that may never arrive, e.g. operator + hasn't enabled the TXT export in Thor). +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import re +import time +import urllib.error +import urllib.request +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + + +log = logging.getLogger(__name__) + +# Default tuning. All overridable via config.json sfm_* keys. +DEFAULT_QUIESCENCE_SECONDS = 5 +DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60 +DEFAULT_HTTP_TIMEOUT = 60.0 +STATE_SCHEMA_VERSION = 1 + + +# ── Filename matching ───────────────────────────────────────────────────────── +# +# Thor (Micromate Series IV) filename scheme: +# _. +# where SERIAL is the literal device serial (e.g. UM11719, BE9439), +# and KIND is IDFH (histogram) or IDFW (waveform). +# +# Examples: +# UM11719_20231219163444.IDFH +# UM11719_20231219162723.IDFW +# BE9439_20200713124251.IDFH +_EVENT_FILENAME_RE = re.compile( + r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$", + re.IGNORECASE, +) + +# Filenames we explicitly skip even if they look event-shaped. +_NON_EVENT_EXTS = { + ".mlg", # monitor-log files (separate heartbeat path) + ".txt", # ASCII reports — paired in, not primary + ".csv", # operator-facing derivative + ".html", # operator-facing derivative + ".pdf", # operator-facing derivative + ".xml", # operator-facing derivative + ".cdb", # IDFW.CDB cache-database variant — skip + ".log", + ".ini", + ".json", + ".sfm.json", + ".bak", + ".tmp", +} + + +def is_event_binary(path: str) -> bool: + """Return True if `path`'s basename looks like a Thor event binary.""" + name = os.path.basename(path) + lname = name.lower() + # Explicit reject for compound extensions like .IDFW.CDB + if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"): + return False + if not _EVENT_FILENAME_RE.match(name): + return False + ext = os.path.splitext(name)[1].lower() + if ext in _NON_EVENT_EXTS: + return False + return True + + +def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]: + """Parse `_.` -> (serial, timestamp, kind). + + `kind` is the upper-case extension without the dot — "IDFH" or "IDFW". + Returns None if the filename doesn't match. + """ + m = _EVENT_FILENAME_RE.match(name) + if not m: + return None + serial = m.group(1).upper() + try: + ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S") + except ValueError: + return None + kind = m.group(3).upper() + return serial, ts, kind + + +def idf_report_name(binary_name: str) -> str: + """Thor TXT-export convention: append `.txt` to the binary basename. + + UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt + """ + return binary_name + ".txt" + + +def idf_report_path(binary_path: str) -> str: + """Compute the expected TXT sidecar path for a Thor event binary. + + Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the + unit directory (verified against captured example-data). The + returned path is the *expected* location — caller is responsible + for checking that it actually exists. + """ + unit_dir = os.path.dirname(binary_path) + name = os.path.basename(binary_path) + return os.path.join(unit_dir, "TXT", idf_report_name(name)) + + +def is_histogram_event(filename: str) -> bool: + """True if this is an .IDFH (histogram) event. Used purely for + log clarity — Thor doesn't always export a TXT for histograms, so + a missing-report warning is suppressed for them.""" + name = os.path.basename(filename) + return name.lower().endswith(".idfh") + + +def serial_from_filename(name: str) -> Optional[str]: + """Extract the serial-number prefix from a Thor event filename.""" + parsed = parse_event_filename(name) + if parsed is None: + return None + return parsed[0] + + +# ── State file ──────────────────────────────────────────────────────────────── + + +class ForwardState: + """Idempotency record: which event files have we already forwarded? + + State file format (JSON): + + { + "version": 1, + "forwarded": { + "": { + "filename": "UM11719_20231219163444.IDFW", + "size": 8800, + "forwarded_at": "2026-05-19T...Z", + "had_report": true + }, + ... + } + } + + Keyed by sha256 (not filename) so identical content is recognised + as already-forwarded even if the file moved or got renamed. + """ + + def __init__(self, path: str): + self.path = path + self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}} + self._load() + + def _load(self) -> None: + try: + with open(self.path, "r", encoding="utf-8") as f: + d = json.load(f) + if not isinstance(d, dict): + raise ValueError("state file root is not an object") + if d.get("version") != STATE_SCHEMA_VERSION: + log.warning( + "forward state version mismatch (got %r, want %d) — starting fresh", + d.get("version"), STATE_SCHEMA_VERSION, + ) + return + forwarded = d.get("forwarded") + if isinstance(forwarded, dict): + self._data["forwarded"] = forwarded + except FileNotFoundError: + pass + except (OSError, ValueError, json.JSONDecodeError) as exc: + log.warning("failed to load forward state from %s: %s", self.path, exc) + + def _save(self) -> None: + tmp = self.path + ".tmp" + try: + d = os.path.dirname(self.path) + if d and not os.path.isdir(d): + os.makedirs(d, exist_ok=True) + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self._data, f, indent=2, sort_keys=True) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp, self.path) + except OSError as exc: + log.warning("failed to save forward state to %s: %s", self.path, exc) + + def is_forwarded(self, sha256: str) -> bool: + return sha256 in self._data["forwarded"] + + def status(self, sha256: str) -> Optional[bool]: + """Return forwarding status for *sha256*. + + Returns: + None — never forwarded. Eligible for a fresh forward. + True — forwarded successfully with its paired report. + NOT a candidate for re-forward. + False — forwarded WITHOUT its paired `.txt`. Eligible for + re-forward IF the TXT now exists, so seismo-relay's + upsert refreshes the DB row with authoritative + device-side values. + + Legacy entries without a `had_report` key default to True so + an upgrade doesn't unexpectedly re-forward every entry. + """ + entry = self._data["forwarded"].get(sha256) + if entry is None: + return None + return bool(entry.get("had_report", True)) + + def mark_forwarded( + self, + sha256: str, + filename: str, + size: int, + had_report: bool = True, + ) -> None: + """Record a successful forward. + + Set `had_report=False` when the forward shipped the binary + without its paired ASCII report. Such entries are re-checked + on subsequent scans and re-forwarded once the TXT appears. + + Idempotent: re-marking an existing sha256 with `had_report=True` + is the explicit promotion path used when a re-pair succeeds. + """ + self._data["forwarded"][sha256] = { + "filename": filename, + "size": size, + "forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "had_report": had_report, + } + self._save() + + def count(self) -> int: + return len(self._data["forwarded"]) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def sha256_of_file(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool: + """Return True if the file's mtime is at least `quiescence_seconds` + in the past — i.e. no longer being written.""" + try: + mtime = os.path.getmtime(path) + except OSError: + return False + return (now_ts - mtime) >= quiescence_seconds + + +def _iter_unit_dirs(thordata_root: str): + """Yield (project_name, unit_name, unit_path) for every Thor unit + folder beneath `thordata_root`. + + THORDATA layout: /// + """ + if not os.path.isdir(thordata_root): + return + try: + projects = os.listdir(thordata_root) + except OSError: + return + for proj in projects: + proj_path = os.path.join(thordata_root, proj) + if not os.path.isdir(proj_path): + continue + try: + units = os.listdir(proj_path) + except OSError: + continue + for unit in units: + unit_path = os.path.join(proj_path, unit) + if not os.path.isdir(unit_path): + continue + yield proj, unit, unit_path + + +def _find_txt_in_unit(unit_path: str, binary_name: str, + now_ts: float, quiescence_seconds: float, + _txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]: + """Look up the matching `.txt` sidecar for `binary_name` inside + `/TXT/`. Case-insensitive on Windows-shaped paths. + + Returns the absolute path if a quiescent sidecar exists, else None. + `_txt_cache` is mutated to memoize the lower-case → actual-name + map for each unit so repeated lookups in one scan don't restat. + """ + expected = idf_report_name(binary_name) + + if unit_path not in _txt_cache: + txt_dir = os.path.join(unit_path, "TXT") + listing: Dict[str, str] = {} + if os.path.isdir(txt_dir): + try: + for n in os.listdir(txt_dir): + listing[n.lower()] = n + except OSError: + pass + _txt_cache[unit_path] = listing + + listing = _txt_cache[unit_path] + actual = listing.get(expected.lower()) + if not actual: + return None + candidate = os.path.join(unit_path, "TXT", actual) + if _is_quiescent(candidate, now_ts, quiescence_seconds): + return candidate + return None + + +# ── Scan pass ───────────────────────────────────────────────────────────────── + + +def find_pending_events( + thordata_root: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + max_per_pass: int = 0, +) -> List[Tuple[str, Optional[str]]]: + """ + Walk `thordata_root` and return the list of (binary_path, txt_path_or_None) + pairs that need forwarding. + + Filtering rules: + - Filename must match the Thor event filename regex. + - File must be quiescent (mtime >= quiescence_seconds in the past). + - File must not exceed `max_age_days`. + - File's sha256 must NOT already be in the forwarded state + (unless it was forwarded without its TXT and the TXT is now + present — see ForwardState.status). + - If a `/TXT/.txt` exists and is quiescent, + we pair them. Otherwise, if the binary is older than + missing_report_grace_seconds, we forward without the TXT. + Younger binaries with a missing TXT are deferred. + - When `max_per_pass > 0`, return at most that many pairs. + Older files (lower mtime) are forwarded first so backfill + proceeds chronologically. + """ + if not os.path.isdir(thordata_root): + log.warning("forward scan: thordata root not found: %s", thordata_root) + return [] + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + # First pass: collect every event-binary entry with mtime for sorting. + candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path) + for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root): + try: + entries = list(os.scandir(unit_path)) + except OSError: + continue + for e in entries: + if not e.is_file(): + continue + if not is_event_binary(e.path): + continue + try: + mtime = e.stat().st_mtime + except OSError: + continue + if (now_ts - mtime) > max_age_seconds: + continue + candidates.append((mtime, unit_path, e.path)) + + # Sort oldest-first so backfill is chronological. + candidates.sort(key=lambda t: t[0]) + + pending: List[Tuple[str, Optional[str]]] = [] + skipped_inflight = 0 + skipped_already_forwarded = 0 + txt_cache: Dict[str, Dict[str, str]] = {} + + for mtime, unit_path, binary_path in candidates: + if not _is_quiescent(binary_path, now_ts, quiescence_seconds): + skipped_inflight += 1 + continue + + try: + digest = sha256_of_file(binary_path) + except OSError as exc: + log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc) + continue + fwd_status = state.status(digest) + if fwd_status is True: + skipped_already_forwarded += 1 + continue + + binary_name = os.path.basename(binary_path) + txt_path = _find_txt_in_unit( + unit_path, binary_name, now_ts, quiescence_seconds, txt_cache, + ) + + if fwd_status is False: + # Previously forwarded WITHOUT report. Re-forward only + # if the TXT is now present so seismo-relay's upsert can + # refresh the row with authoritative device values. + if txt_path is None: + skipped_already_forwarded += 1 + continue + elif txt_path is None: + # First-time forward and TXT not yet present. Wait for + # the grace period before forwarding alone. + if (now_ts - mtime) < missing_report_grace_seconds: + skipped_inflight += 1 + continue + + pending.append((binary_path, txt_path)) + + if max_per_pass and len(pending) >= max_per_pass: + break + + log.debug( + "forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d", + len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass, + ) + return pending + + +# ── Multipart upload ────────────────────────────────────────────────────────── + + +def _encode_multipart( + parts: List[Tuple[str, str, str, bytes]], +) -> Tuple[bytes, str]: + """Encode a list of (field_name, filename, content_type, data) tuples + as a multipart/form-data body. Returns (body_bytes, content_type + header value).""" + boundary = "----ThorWatcherBoundary" + os.urandom(8).hex() + chunks: List[bytes] = [] + for field_name, filename, content_type, data in parts: + chunks.append(("--" + boundary + "\r\n").encode("ascii")) + chunks.append( + (f'Content-Disposition: form-data; name="{field_name}"; ' + f'filename="{filename}"\r\n').encode("ascii") + ) + chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii")) + chunks.append(data) + chunks.append(b"\r\n") + chunks.append(("--" + boundary + "--\r\n").encode("ascii")) + body = b"".join(chunks) + content_type_hdr = f"multipart/form-data; boundary={boundary}" + return body, content_type_hdr + + +def _import_endpoint(sfm_url: str) -> str: + """Compose the import endpoint URL from a base SFM URL.""" + return sfm_url.rstrip("/") + "/db/import/idf_file" + + +def forward_event_pair( + sfm_url: str, + binary_path: str, + txt_path: Optional[str], + *, + serial_hint: Optional[str] = None, + timeout: float = DEFAULT_HTTP_TIMEOUT, +) -> Dict[str, Any]: + """POST a single event (binary + optional .txt) to the SFM import + endpoint. + + Returns a dict mirroring the per-file outcome the server returned + on success, or a dict with `status="error"` on transport/HTTP + failure. + """ + binary_name = os.path.basename(binary_path) + with open(binary_path, "rb") as f: + binary_bytes = f.read() + + parts = [("files", binary_name, "application/octet-stream", binary_bytes)] + if txt_path is not None: + with open(txt_path, "rb") as f: + txt_bytes = f.read() + parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes)) + + body, content_type = _encode_multipart(parts) + + # Auto-derive serial from filename if caller didn't supply one. + if not serial_hint: + serial_hint = serial_from_filename(binary_name) + + url = _import_endpoint(sfm_url) + if serial_hint: + sep = "&" if "?" in url else "?" + url = f"{url}{sep}serial={serial_hint}" + + req = urllib.request.Request( + url, data=body, method="POST", + headers={ + "Content-Type": content_type, + "Content-Length": str(len(body)), + "User-Agent": "thor-watcher/sfm-forwarder", + "Accept": "application/json", + }, + ) + + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8", errors="replace") + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return { + "status": "error", + "filename": binary_name, + "detail": f"server returned non-JSON: {raw[:200]!r}", + } + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name and entry.get("status") == "ok": + return entry + for entry in (payload.get("results") or []): + if entry.get("filename") == binary_name: + return entry + return { + "status": "error", + "filename": binary_name, + "detail": f"unexpected server response: {payload!r}", + } + except urllib.error.HTTPError as exc: + try: + body_excerpt = exc.read().decode("utf-8", errors="replace")[:300] + except Exception: + body_excerpt = "" + return { + "status": "error", + "filename": binary_name, + "detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}", + } + except urllib.error.URLError as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"connection error: {exc.reason}", + } + except (OSError, TimeoutError) as exc: + return { + "status": "error", + "filename": binary_name, + "detail": f"transport error: {exc}", + } + + +# ── Top-level orchestration ─────────────────────────────────────────────────── + + +def forward_pending( + thordata_root: str, + sfm_url: str, + state: ForwardState, + *, + max_age_days: int, + quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS, + missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS, + timeout: float = DEFAULT_HTTP_TIMEOUT, + max_per_pass: int = 0, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """ + Run one full pass: find pending events, POST each one, update state. + + Returns a counts dict suitable for logging: + + { + "scanned": , # event binaries selected for forward + "forwarded": , # successfully POSTed this pass + "errors": , # POST failures (will retry next pass) + "with_report":, # of forwarded, how many had a paired TXT + } + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + pending = find_pending_events( + thordata_root, state, + max_age_days=max_age_days, + quiescence_seconds=quiescence_seconds, + missing_report_grace_seconds=missing_report_grace_seconds, + max_per_pass=max_per_pass, + ) + + counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0} + + for binary_path, txt_path in pending: + result = forward_event_pair( + sfm_url, binary_path, txt_path, + timeout=timeout, + ) + if result.get("status") == "ok": + try: + digest = sha256_of_file(binary_path) + size = os.path.getsize(binary_path) + state.mark_forwarded( + digest, + os.path.basename(binary_path), + size, + had_report=(txt_path is not None), + ) + except OSError as exc: + _log(f"[forward] post-success state save failed for " + f"{os.path.basename(binary_path)}: {exc}") + counts["forwarded"] += 1 + if txt_path: + counts["with_report"] += 1 + + if txt_path: + report_token = "+ {} attached".format(os.path.basename(txt_path)) + elif is_histogram_event(binary_path): + report_token = "(histogram, no report expected)" + else: + report_token = "no report" + + _log( + "[forward] OK {} ({}B, {}, inserted={}, skipped={})".format( + os.path.basename(binary_path), + result.get("filesize", 0), + report_token, + result.get("inserted", 0), + result.get("skipped", 0), + ) + ) + else: + counts["errors"] += 1 + _log( + f"[forward] ERR {os.path.basename(binary_path)}: " + f"{result.get('detail', 'unknown error')}" + ) + + return counts + + +# ── Seed-state mode (skip historical backfill on first deploy) ──────────────── + + +def seed_state_from_folder( + thordata_root: str, + state: ForwardState, + *, + max_age_days: int = 365, + logger: Optional[Any] = None, +) -> Dict[str, int]: + """Walk `thordata_root` and mark every existing event binary as + already forwarded — without POSTing anything. + + Run this ONCE before enabling sfm_forward_enabled on a machine + with a large historical archive. The watcher then starts + forwarding only events that appear AFTER the seed run. + + Returns a counts dict: + {"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int} + """ + def _log(msg: str) -> None: + if logger: + logger(msg) + else: + log.info(msg) + + counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0} + + if not os.path.isdir(thordata_root): + _log(f"[seed] thordata root not found: {thordata_root}") + return counts + + now_ts = time.time() + max_age_seconds = max(1, int(max_age_days)) * 86400.0 + + for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root): + try: + entries = [e for e in os.scandir(unit_path) if e.is_file()] + except OSError: + continue + for e in entries: + if not is_event_binary(e.path): + continue + counts["scanned"] += 1 + try: + mtime = e.stat().st_mtime + size = e.stat().st_size + except OSError: + continue + if (now_ts - mtime) > max_age_seconds: + counts["skipped_too_old"] += 1 + continue + try: + digest = sha256_of_file(e.path) + except OSError as exc: + _log(f"[seed] sha256 failed for {e.path}: {exc}") + continue + if state.is_forwarded(digest): + counts["already_known"] += 1 + continue + state.mark_forwarded(digest, e.name, size) + counts["seeded"] += 1 + if counts["seeded"] % 1000 == 0: + _log(f"[seed] progress: {counts['seeded']} seeded so far...") + + _log( + f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} " + f"already_known={counts['already_known']} " + f"skipped_too_old={counts['skipped_too_old']}" + ) + return counts + + +# ── CLI entry point ───────────────────────────────────────────────────────── + + +def _main() -> int: + """Command-line interface for one-shot operations. + + python event_forwarder.py --seed-state \\ + --thordata "C:\\THORDATA" \\ + --state "" \\ + [--max-age-days 365] + """ + import argparse + parser = argparse.ArgumentParser( + description="Thor Watcher — SFM event forwarder utilities", + ) + parser.add_argument( + "--seed-state", action="store_true", + help="Mark every event binary in --thordata as already-forwarded " + "(without POSTing). Use this BEFORE enabling sfm_forward " + "on a machine with a large historical archive.", + ) + parser.add_argument( + "--thordata", required=True, + help="Path to the THORDATA root folder.", + ) + parser.add_argument( + "--state", required=True, + help="Path to the JSON state file. Will be created if missing.", + ) + parser.add_argument( + "--max-age-days", type=int, default=365, + help="Only seed files newer than this many days (default 365).", + ) + args = parser.parse_args() + + if not args.seed_state: + parser.error("specify --seed-state (no other modes supported yet)") + + print(f"[seed] thordata = {args.thordata}") + print(f"[seed] state = {args.state}") + print(f"[seed] max_age = {args.max_age_days} days") + + state = ForwardState(args.state) + print(f"[seed] state currently has {state.count()} entries") + seed_state_from_folder( + args.thordata, state, + max_age_days=args.max_age_days, + logger=lambda m: print(m), + ) + print(f"[seed] state now has {state.count()} entries") + return 0 + + +if __name__ == "__main__": + import sys + sys.exit(_main()) diff --git a/series4_ingest.py b/series4_ingest.py index bc48c3b..fcacc40 100644 --- a/series4_ingest.py +++ b/series4_ingest.py @@ -1,5 +1,5 @@ """ -Thor Watcher — Series 4 Ingest Agent v0.2.0 +Thor Watcher — Series 4 Ingest Agent v0.3.0 Micromate (Series 4) ingest agent for Terra-View. @@ -7,6 +7,8 @@ Behavior: - Scans C:\THORDATA\\\*.MLG - For each UM####, finds the newest .MLG by timestamp in the filename - Posts JSON heartbeat payload to Terra-View backend +- Forwards .IDFH/.IDFW event files (+ TXT sidecars) to a seismo-relay + SFM server when sfm_forward_enabled=true. See event_forwarder.py. - Tray-friendly: run_watcher(state, stop_event) for background thread use """ @@ -22,10 +24,12 @@ from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple from socket import gethostname +import event_forwarder + # ── Version ─────────────────────────────────────────────────────────────────── -VERSION = "0.2.0" +VERSION = "0.3.0" # ── Config ──────────────────────────────────────────────────────────────────── @@ -54,6 +58,17 @@ def load_config(config_path: str) -> Dict[str, Any]: "update_source": "gitea", "update_url": "", "debug": False, + + # SFM event forwarding — default OFF, opt-in via Settings. + "sfm_forward_enabled": False, + "sfm_url": "", # e.g. "http://10.0.0.44:8200" + "sfm_forward_interval": 60, # seconds between forward passes + "sfm_quiescence_seconds": 5, + "sfm_missing_report_grace_seconds": 60, + "sfm_http_timeout": 60, + "sfm_state_file": "", # blank → /thor_forwarded.json + "sfm_max_forwards_per_pass": 500, + "sfm_max_event_age_days": 365, } with open(config_path, "r", encoding="utf-8") as f: @@ -248,14 +263,17 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: Main watcher loop. Runs in a background thread when launched from the tray. state keys written each cycle: - state["status"] — "running" | "error" | "starting" - state["api_status"] — "ok" | "fail" | "disabled" - state["units"] — list of unit dicts for tray display - state["last_scan"] — datetime of last successful scan - state["last_error"] — last error string or None - state["log_dir"] — directory containing the log file - state["cfg"] — loaded config dict - state["update_available"] — set True when API response signals an update + state["status"] — "running" | "error" | "starting" + state["api_status"] — "ok" | "fail" | "disabled" + state["units"] — list of unit dicts for tray display + state["last_scan"] — datetime of last successful scan + state["last_error"] — last error string or None + state["log_dir"] — directory containing the log file + state["cfg"] — loaded config dict + state["update_available"] — set True when API response signals an update + state["sfm_status"] — "ok" | "fail" | "disabled" | "ready" + state["last_forward"] — datetime of last forwarder pass (or None) + state["last_forward_counts"] — dict from event_forwarder.forward_pending """ # Resolve config path if getattr(sys, "frozen", False): @@ -291,16 +309,56 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: API_INTERVAL = int(cfg["api_interval"]) ENABLE_LOGGING = bool(cfg["enable_logging"]) + # SFM forwarder config + SFM_FORWARD_ENABLED = bool(cfg.get("sfm_forward_enabled", False)) + SFM_URL = str(cfg.get("sfm_url", "")).strip() + SFM_FORWARD_INTERVAL = int(cfg.get("sfm_forward_interval", 60)) + SFM_QUIESCENCE = int(cfg.get("sfm_quiescence_seconds", 5)) + SFM_GRACE = int(cfg.get("sfm_missing_report_grace_seconds", 60)) + SFM_HTTP_TIMEOUT = int(cfg.get("sfm_http_timeout", 60)) + SFM_MAX_PER_PASS = int(cfg.get("sfm_max_forwards_per_pass", 500)) + SFM_MAX_AGE_DAYS = int(cfg.get("sfm_max_event_age_days", 365)) + sfm_state_path = str(cfg.get("sfm_state_file", "")).strip() or \ + os.path.join(state["log_dir"], "thor_forwarded.json") + log_message(log_file, ENABLE_LOGGING, - "[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format( - THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL) + "[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={} SFM={}".format( + THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL), + bool(SFM_FORWARD_ENABLED and SFM_URL), ) ) - print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format( - THORDATA_PATH, SCAN_INTERVAL, bool(API_URL) + print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={} SFM={}".format( + THORDATA_PATH, SCAN_INTERVAL, bool(API_URL), + bool(SFM_FORWARD_ENABLED and SFM_URL), )) + # Initialize SFM forwarder state (if enabled) + sfm_state_obj: Optional[event_forwarder.ForwardState] = None + if SFM_FORWARD_ENABLED and SFM_URL: + try: + sfm_state_obj = event_forwarder.ForwardState(sfm_state_path) + state["sfm_status"] = "ready" + log_message(log_file, ENABLE_LOGGING, + "[sfm] forwarder ready url={} state_file={} known={}".format( + SFM_URL, sfm_state_path, sfm_state_obj.count(), + ) + ) + print("[SFM] forwarder ready url={} known={}".format( + SFM_URL, sfm_state_obj.count(), + )) + except Exception as exc: + state["sfm_status"] = "fail" + state["last_error"] = "SFM init failed: {}".format(exc) + log_message(log_file, ENABLE_LOGGING, + "[sfm] init failed: {}".format(exc)) + else: + state["sfm_status"] = "disabled" + + state["last_forward"] = None + state["last_forward_counts"] = None + last_api_ts = 0.0 + last_forward_ts = 0.0 while not stop_event.is_set(): try: @@ -362,6 +420,37 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None: else: state["api_status"] = "disabled" + # ── SFM event forwarding ─────────────────────────────────────────── + if sfm_state_obj is not None: + now_ts = time.time() + if now_ts - last_forward_ts >= SFM_FORWARD_INTERVAL: + last_forward_ts = now_ts + try: + counts = event_forwarder.forward_pending( + THORDATA_PATH, SFM_URL, sfm_state_obj, + max_age_days=SFM_MAX_AGE_DAYS, + quiescence_seconds=SFM_QUIESCENCE, + missing_report_grace_seconds=SFM_GRACE, + timeout=SFM_HTTP_TIMEOUT, + max_per_pass=SFM_MAX_PER_PASS, + logger=lambda m: log_message(log_file, ENABLE_LOGGING, m), + ) + state["last_forward"] = datetime.now() + state["last_forward_counts"] = counts + if counts["errors"] > 0: + state["sfm_status"] = "fail" + else: + state["sfm_status"] = "ok" + summary = ("[sfm] pass scanned={scanned} forwarded={forwarded} " + "errors={errors} with_report={with_report}").format(**counts) + print(summary) + log_message(log_file, ENABLE_LOGGING, summary) + except Exception as exc: + state["sfm_status"] = "fail" + msg = "[sfm] pass failed: {}".format(exc) + print(msg) + log_message(log_file, ENABLE_LOGGING, msg) + except Exception as e: err = "[loop-error] {}".format(e) print(err) diff --git a/test_event_forwarder.py b/test_event_forwarder.py new file mode 100644 index 0000000..c9626b4 --- /dev/null +++ b/test_event_forwarder.py @@ -0,0 +1,716 @@ +""" +test_event_forwarder.py — unit tests for Thor Watcher's SFM event forwarder. + +Covers: + - is_event_binary() filename matching (positive + negative cases) + - parse_event_filename() / serial_from_filename() + - idf_report_path() — the TXT/ subfolder convention + - ForwardState load/save round-trip + idempotency check + - find_pending_events() against the THORDATA/// tree, + plus quiescence + grace-period + re-pair logic + - _encode_multipart() byte-level shape (boundary + headers) + - forward_event_pair() end-to-end against a tiny stdlib HTTP server + that mimics seismo-relay's POST /db/import/idf_file endpoint + - seed_state_from_folder() walks the tree without POSTing + +Stdlib only — runs with `python -m pytest test_event_forwarder.py` +on Python 3.8+ (the watcher's compat target). +""" + +from __future__ import annotations + +import http.server +import json +import os +import tempfile +import threading +import time +import unittest +from pathlib import Path + +import event_forwarder as ef + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _make_thordata(root: Path, project: str, unit: str) -> Path: + """Create a THORDATA/// folder pair; return unit_dir.""" + unit_dir = root / project / unit + unit_dir.mkdir(parents=True, exist_ok=True) + return unit_dir + + +def _touch_with_age(p: Path, age_seconds: float, content: bytes = b"x") -> Path: + """Create a file with controlled mtime.""" + p.write_bytes(content) + target = time.time() - age_seconds + os.utime(p, (target, target)) + return p + + +def _make_event(unit_dir: Path, name: str, age_seconds: float = 100, + content: bytes = b"x") -> Path: + return _touch_with_age(unit_dir / name, age_seconds, content) + + +def _make_txt(unit_dir: Path, base_name: str, age_seconds: float = 100, + content: bytes = b"r") -> Path: + txt_dir = unit_dir / "TXT" + txt_dir.mkdir(exist_ok=True) + return _touch_with_age(txt_dir / ef.idf_report_name(base_name), + age_seconds, content) + + +# ── is_event_binary() ──────────────────────────────────────────────────────── + + +class TestIsEventBinary(unittest.TestCase): + + def test_recognises_typical_thor_filenames(self): + for name in [ + "UM11719_20231219163444.IDFH", + "UM11719_20231219162723.IDFW", + "BE9439_20200713124251.IDFH", + "UM13981_20220808082418.IDFH", + # case-insensitive + "um11719_20231219163444.idfh", + ]: + self.assertTrue(ef.is_event_binary(name), name) + + def test_rejects_non_event_extensions(self): + for name in [ + "UM11719_20231219163436.MLG", # monitor log + "UM11719_20231219163444.IDFH.txt", # report sidecar + "UM11719_20231219164135.IDFW.CDB", # cache database variant + "UM11719_20231219164135.IDFH.CDB", + "agent.log", + "config.json", + "foo.bak", + "bar.tmp", + "UM11719_20231219163444.csv", + "UM11719_20231219163444.pdf", + "UM11719_20231219163444.html", + "UM11719_20231219163444.xml", + ]: + self.assertFalse(ef.is_event_binary(name), name) + + def test_rejects_malformed_filenames(self): + for name in [ + "", + "no_extension", + "UM_20231219163444.IDFH", # missing serial digits + "1234_20231219163444.IDFH", # serial must start with letters + "UM11719_2023121916.IDFH", # short timestamp + "UM11719_20231219163444.IDFX", # wrong kind + "UM11719-20231219163444.IDFH", # wrong separator + ]: + self.assertFalse(ef.is_event_binary(name), name) + + def test_parse_event_filename(self): + from datetime import datetime + parsed = ef.parse_event_filename("UM11719_20231219163444.IDFW") + self.assertIsNotNone(parsed) + serial, ts, kind = parsed + self.assertEqual(serial, "UM11719") + self.assertEqual(ts, datetime(2023, 12, 19, 16, 34, 44)) + self.assertEqual(kind, "IDFW") + + def test_serial_from_filename(self): + self.assertEqual(ef.serial_from_filename("UM11719_20231219163444.IDFH"), + "UM11719") + self.assertEqual(ef.serial_from_filename("BE9439_20200713124251.IDFH"), + "BE9439") + self.assertIsNone(ef.serial_from_filename("not_an_event.bin")) + + def test_idf_report_path_uses_txt_subfolder(self): + binary = "/foo/THORDATA/Project A/UM11719/UM11719_20231219163444.IDFW" + self.assertEqual( + ef.idf_report_path(binary), + os.path.join("/foo/THORDATA/Project A/UM11719", + "TXT", "UM11719_20231219163444.IDFW.txt"), + ) + + def test_is_histogram_event(self): + self.assertTrue(ef.is_histogram_event("UM11719_20231219163444.IDFH")) + self.assertTrue(ef.is_histogram_event("um11719_20231219163444.idfh")) + self.assertFalse(ef.is_histogram_event("UM11719_20231219162723.IDFW")) + + +# ── ForwardState ───────────────────────────────────────────────────────────── + + +class TestForwardState(unittest.TestCase): + + def test_round_trip_persists_marked_entries(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + s = ef.ForwardState(path) + self.assertFalse(s.is_forwarded("abc123")) + s.mark_forwarded("abc123", "UM11719_20231219163444.IDFW", 8800) + self.assertTrue(s.is_forwarded("abc123")) + + # Re-load from disk + s2 = ef.ForwardState(path) + self.assertTrue(s2.is_forwarded("abc123")) + self.assertEqual(s2.count(), 1) + + def test_corrupt_state_file_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + f.write("not valid json {{{") + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + def test_version_mismatch_starts_fresh(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + json.dump({"version": 999, "forwarded": {"x": {}}}, f) + s = ef.ForwardState(path) + self.assertEqual(s.count(), 0) + + def test_legacy_entries_default_to_had_report_true(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "fwd.json") + with open(path, "w") as f: + json.dump({ + "version": 1, + "forwarded": { + "abc123": { + "filename": "UM11719_20231219163444.IDFW", + "size": 123, + "forwarded_at": "2025-01-01T00:00:00Z", + # No had_report field — legacy entry + } + } + }, f) + state = ef.ForwardState(path) + self.assertIs(state.status("abc123"), True) + + def test_status_returns_none_for_unknown_sha(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + self.assertIs(state.status("never-seen"), None) + + def test_mark_with_had_report_false_then_promote(self): + with tempfile.TemporaryDirectory() as tmp: + state = ef.ForwardState(str(Path(tmp) / "fwd.json")) + state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100, + had_report=False) + self.assertIs(state.status("xyz"), False) + state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100, + had_report=True) + self.assertIs(state.status("xyz"), True) + + +# ── find_pending_events() ──────────────────────────────────────────────────── + + +class TestFindPendingEvents(unittest.TestCase): + + def test_returns_pair_when_both_files_present_and_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=120, content=b"binary") + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=100, content=b"report") + + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][0]), + "UM11719_20231219163444.IDFW") + self.assertEqual(os.path.basename(pending[0][1]), + "UM11719_20231219163444.IDFW.txt") + + def test_idfh_and_idfw_are_separate_events(self): + """A single timestamp produces both .IDFH and .IDFW — they + forward as two independent events with their own state entries.""" + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFH", + age_seconds=120, content=b"histogram") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=120, content=b"waveform") + _make_txt(unit_dir, "UM11719_20231219163444.IDFH", + age_seconds=100, content=b"hreport") + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=100, content=b"wreport") + + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 2) + names = sorted(os.path.basename(p[0]) for p in pending) + self.assertEqual(names, [ + "UM11719_20231219163444.IDFH", + "UM11719_20231219163444.IDFW", + ]) + + def test_pairing_when_txt_is_in_unit_root_does_not_match(self): + """Sidecars MUST live in the TXT/ subfolder. A stray .txt + next to the binary is not the canonical location and should + not be picked up as a sidecar.""" + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=200, content=b"bin") + # .txt is in the unit dir, not unit/TXT/ + _touch_with_age(unit_dir / "UM11719_20231219163444.IDFW.txt", + age_seconds=100, content=b"misplaced") + + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + # Forward proceeds (grace period elapsed), but WITHOUT pairing + self.assertEqual(len(pending), 1) + self.assertIsNone(pending[0][1]) + + def test_skips_if_already_forwarded(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=120, content=b"binary") + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=100, content=b"report") + + state = ef.ForwardState(str(root / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary")) + + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_if_too_fresh_to_be_quiescent(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=1, content=b"binary") + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=1, content=b"report") + + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_forwards_alone_after_grace_when_txt_missing(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFH", + age_seconds=200, content=b"binary") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + bin_path, txt_path = pending[0] + self.assertEqual(os.path.basename(bin_path), + "UM11719_20231219163444.IDFH") + self.assertIsNone(txt_path) + + def test_re_pair_after_late_arriving_txt(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=200, content=b"binary") + state = ef.ForwardState(str(root / "fwd.json")) + digest = ef.sha256_of_file(str(bin_p)) + state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", + len(b"binary"), had_report=False) + + # First scan: TXT not present → still skipped. + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(pending, []) + + # TXT finally appears. + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=100, content=b"report") + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 1) + self.assertEqual(os.path.basename(pending[0][1]), + "UM11719_20231219163444.IDFW.txt") + + def test_defers_when_txt_missing_and_within_grace(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=15, content=b"binary") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_skips_old_files_beyond_max_age_days(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=10 * 86400, content=b"binary") + _make_txt(unit_dir, "UM11719_20231219163444.IDFW", + age_seconds=10 * 86400, content=b"report") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=1, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_ignores_mlg_and_other_non_event_files(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_dir = _make_thordata(root, "Project A", "UM11719") + _make_event(unit_dir, "UM11719_20231219163436.MLG", + age_seconds=120, content=b"mlg") + _make_event(unit_dir, "UM11719_20231219164135.IDFW.CDB", + age_seconds=120, content=b"cache") + _touch_with_age(unit_dir / "agent.log", age_seconds=120, content=b"log") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_walks_multiple_projects_and_units(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit_a = _make_thordata(root, "Project A", "UM11719") + unit_b = _make_thordata(root, "Project B", "BE9439") + _make_event(unit_a, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"a") + _make_event(unit_b, "BE9439_20200713131747.IDFW", age_seconds=200, content=b"b") + _make_txt(unit_a, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"ar") + _make_txt(unit_b, "BE9439_20200713131747.IDFW", age_seconds=100, content=b"br") + + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=10000, # BE event is from 2020 + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 2) + names = sorted(os.path.basename(p[0]) for p in pending) + self.assertEqual(names, [ + "BE9439_20200713131747.IDFW", + "UM11719_20231219163444.IDFW", + ]) + + def test_max_per_pass_caps_returned_count(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + for i in range(5): + name = "UM11719_2023121916344{}.IDFW".format(i) + _make_event(unit, name, age_seconds=120 + i, content=("bin-" + str(i)).encode()) + _make_txt(unit, name, age_seconds=110 + i, content=b"report") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + max_per_pass=2, + ) + self.assertEqual(len(pending), 2) + + def test_max_per_pass_returns_oldest_first(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + ages = [200, 150, 100, 50] + for i, age in enumerate(ages): + name = "UM11719_2023121916344{}.IDFW".format(i) + _make_event(unit, name, age_seconds=age, content=("c" + str(i)).encode()) + _make_txt(unit, name, age_seconds=max(1, age - 10), content=b"r") + state = ef.ForwardState(str(root / "fwd.json")) + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + max_per_pass=2, + ) + names = [os.path.basename(p[0]) for p in pending] + # Oldest two should be index 0 (200s) and 1 (150s) + self.assertEqual(names, [ + "UM11719_20231219163440.IDFW", + "UM11719_20231219163441.IDFW", + ]) + + +# ── Seed-state mode ────────────────────────────────────────────────────────── + + +class TestSeedStateFromFolder(unittest.TestCase): + + def test_seeds_every_in_window_event_without_posting(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + for i in range(3): + _make_event(unit, "UM11719_2023121916344{}.IDFW".format(i), + age_seconds=120 + i, content=("e" + str(i)).encode()) + # Ignored + _make_event(unit, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg") + + state = ef.ForwardState(str(root / "seed.json")) + counts = ef.seed_state_from_folder(str(root), state, max_age_days=30) + self.assertEqual(counts["scanned"], 3) + self.assertEqual(counts["seeded"], 3) + self.assertEqual(counts["already_known"], 0) + self.assertEqual(state.count(), 3) + + def test_seeded_files_are_then_skipped_by_normal_scan(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + _make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x") + _make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=110, content=b"r") + _make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"y") + _make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=110, content=b"r") + + state = ef.ForwardState(str(root / "seed.json")) + ef.seed_state_from_folder(str(root), state, max_age_days=30) + + pending = ef.find_pending_events( + str(root), state, max_age_days=30, + quiescence_seconds=5, missing_report_grace_seconds=60, + ) + self.assertEqual(len(pending), 0) + + def test_seed_is_idempotent(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + _make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x") + + state = ef.ForwardState(str(root / "seed.json")) + counts1 = ef.seed_state_from_folder(str(root), state, max_age_days=30) + counts2 = ef.seed_state_from_folder(str(root), state, max_age_days=30) + self.assertEqual(counts1["seeded"], 1) + self.assertEqual(counts2["seeded"], 0) + self.assertEqual(counts2["already_known"], 1) + self.assertEqual(state.count(), 1) + + +# ── Multipart encoder ──────────────────────────────────────────────────────── + + +class TestMultipartEncoder(unittest.TestCase): + + def test_encodes_two_parts_with_proper_boundary(self): + body, content_type = ef._encode_multipart([ + ("files", "a.bin", "application/octet-stream", b"\x01\x02"), + ("files", "a.txt", "text/plain", b"hello"), + ]) + self.assertTrue(content_type.startswith("multipart/form-data; boundary=")) + boundary = content_type.split("boundary=", 1)[1] + self.assertIn(boundary.encode("ascii"), body) + + text = body.decode("latin-1") + self.assertIn('name="files"; filename="a.bin"', text) + self.assertIn('name="files"; filename="a.txt"', text) + self.assertIn("Content-Type: application/octet-stream", text) + self.assertIn("Content-Type: text/plain", text) + self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--")) + + +# ── End-to-end forward_event_pair against a fake server ────────────────────── + + +class _FakeImportHandler(http.server.BaseHTTPRequestHandler): + """Mimics seismo-relay's POST /db/import/idf_file response.""" + received = [] # class-level capture for test inspection + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + ctype = self.headers.get("Content-Type", "") + + parts = body.split(b"--" + ctype.split("boundary=")[-1].encode()) + filenames = [] + for p in parts: + for line in p.split(b"\r\n"): + if b'filename="' in line: + fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0] + filenames.append(fn.decode("latin-1")) + + self.__class__.received.append({ + "path": self.path, + "ctype": ctype, + "filenames": filenames, + }) + + results = [] + binary_fn = next( + (fn for fn in filenames if not fn.lower().endswith(".txt")), + None, + ) + if binary_fn: + results.append({ + "filename": binary_fn, + "status": "ok", + "stored_filename": binary_fn, + "filesize": len(body), + "sha256": "00" * 32, + "report_attached": any(fn.lower().endswith(".txt") for fn in filenames), + "inserted": 1, + "skipped": 0, + }) + + payload = json.dumps({"count": len(results), "results": results}).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, *_a, **_kw): # silence the test runner + pass + + +def _start_fake_server(): + server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler) + threading.Thread(target=server.serve_forever, daemon=True).start() + host, port = server.server_address + return server, f"http://{host}:{port}" + + +class TestForwardEventPair(unittest.TestCase): + + def setUp(self): + _FakeImportHandler.received = [] + self.server, self.base_url = _start_fake_server() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + + def test_post_with_paired_report(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_p = Path(tmp) + bin_p = tmp_p / "UM11719_20231219163444.IDFW" + txt_p = tmp_p / "UM11719_20231219163444.IDFW.txt" + bin_p.write_bytes(b"\x10\x20\x30 binary") + txt_p.write_bytes(b'"SerialNumber : UM11719"\n') + + result = ef.forward_event_pair( + self.base_url, str(bin_p), str(txt_p), timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertEqual(result["filename"], "UM11719_20231219163444.IDFW") + self.assertTrue(result["report_attached"]) + + self.assertEqual(len(_FakeImportHandler.received), 1) + req = _FakeImportHandler.received[0] + # Path includes the serial-hint auto-extracted from the filename + self.assertTrue(req["path"].startswith("/db/import/idf_file")) + self.assertIn("serial=UM11719", req["path"]) + self.assertIn("UM11719_20231219163444.IDFW", req["filenames"]) + self.assertIn("UM11719_20231219163444.IDFW.txt", req["filenames"]) + + def test_post_without_report(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "UM11719_20231219163444.IDFH" + bin_p.write_bytes(b"binary only") + + result = ef.forward_event_pair( + self.base_url, str(bin_p), None, timeout=5.0, + ) + self.assertEqual(result["status"], "ok") + self.assertFalse(result["report_attached"]) + req = _FakeImportHandler.received[0] + self.assertEqual(req["filenames"], ["UM11719_20231219163444.IDFH"]) + + def test_explicit_serial_hint_overrides_auto(self): + with tempfile.TemporaryDirectory() as tmp: + bin_p = Path(tmp) / "UM11719_20231219163444.IDFW" + bin_p.write_bytes(b"x") + ef.forward_event_pair( + self.base_url, str(bin_p), None, + serial_hint="OVERRIDE99", timeout=5.0, + ) + req = _FakeImportHandler.received[0] + self.assertIn("serial=OVERRIDE99", req["path"]) + + +# ── forward_pending() smoke test ───────────────────────────────────────────── + + +class TestForwardPending(unittest.TestCase): + """End-to-end: tree → find → POST → state-update → no re-POST.""" + + def setUp(self): + _FakeImportHandler.received = [] + self.server, self.base_url = _start_fake_server() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + + def test_pass_then_re_pass_is_idempotent(self): + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + unit = _make_thordata(root, "Project A", "UM11719") + _make_event(unit, "UM11719_20231219163444.IDFW", + age_seconds=200, content=b"binary") + _make_txt(unit, "UM11719_20231219163444.IDFW", + age_seconds=100, content=b"report") + _make_event(unit, "UM11719_20231219163444.IDFH", + age_seconds=200, content=b"histogram") + _make_txt(unit, "UM11719_20231219163444.IDFH", + age_seconds=100, content=b"hreport") + + state = ef.ForwardState(str(root / "fwd.json")) + counts = ef.forward_pending( + str(root), self.base_url, state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, timeout=5.0, + ) + self.assertEqual(counts["scanned"], 2) + self.assertEqual(counts["forwarded"], 2) + self.assertEqual(counts["errors"], 0) + self.assertEqual(counts["with_report"], 2) + self.assertEqual(state.count(), 2) + self.assertEqual(len(_FakeImportHandler.received), 2) + + # Re-pass: nothing pending; no new POSTs. + counts2 = ef.forward_pending( + str(root), self.base_url, state, + max_age_days=30, quiescence_seconds=5, + missing_report_grace_seconds=60, timeout=5.0, + ) + self.assertEqual(counts2["scanned"], 0) + self.assertEqual(counts2["forwarded"], 0) + self.assertEqual(len(_FakeImportHandler.received), 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/thor_settings_dialog.py b/thor_settings_dialog.py index 8bf756f..34e0554 100644 --- a/thor_settings_dialog.py +++ b/thor_settings_dialog.py @@ -1,5 +1,5 @@ """ -Thor Watcher — Settings Dialog v0.2.0 +Thor Watcher — Settings Dialog v0.3.0 Provides a Tkinter settings dialog that doubles as a first-run wizard. @@ -14,6 +14,8 @@ import tkinter as tk from tkinter import ttk, filedialog, messagebox from socket import gethostname +import series4_ingest as watcher + # ── Defaults (mirror config.example.json) ──────────────────────────────────── @@ -34,6 +36,17 @@ DEFAULTS = { "log_retention_days": 30, "update_source": "gitea", "update_url": "", + + # SFM forwarder defaults — mirror series4_ingest.load_config + "sfm_forward_enabled": False, + "sfm_url": "", + "sfm_forward_interval": 60, + "sfm_quiescence_seconds": 5, + "sfm_missing_report_grace_seconds": 60, + "sfm_http_timeout": 60, + "sfm_state_file": "", + "sfm_max_forwards_per_pass": 500, + "sfm_max_event_age_days": 365, } @@ -145,7 +158,8 @@ class SettingsDialog: self.saved = False self.root = parent - title = "Thor Watcher — Setup" if wizard else "Thor Watcher — Settings" + kind = "Setup" if wizard else "Settings" + title = "Thor Watcher v{} — {}".format(watcher.VERSION, kind) self.root.title(title) self.root.resizable(False, False) self.root.update_idletasks() @@ -192,6 +206,20 @@ class SettingsDialog: self.var_update_source = tk.StringVar(value=src) self.var_update_url = tk.StringVar(value=str(v.get("update_url", ""))) + # SFM Forwarder + sfm_en = v.get("sfm_forward_enabled", False) + self.var_sfm_enabled = tk.BooleanVar( + value=bool(sfm_en) if isinstance(sfm_en, bool) else str(sfm_en).lower() in ("true", "1", "yes") + ) + self.var_sfm_url = tk.StringVar(value=str(v.get("sfm_url", ""))) + self.var_sfm_forward_interval = tk.StringVar(value=str(v.get("sfm_forward_interval", 60))) + self.var_sfm_quiescence = tk.StringVar(value=str(v.get("sfm_quiescence_seconds", 5))) + self.var_sfm_grace = tk.StringVar(value=str(v.get("sfm_missing_report_grace_seconds", 60))) + self.var_sfm_http_timeout = tk.StringVar(value=str(v.get("sfm_http_timeout", 60))) + self.var_sfm_max_per_pass = tk.StringVar(value=str(v.get("sfm_max_forwards_per_pass", 500))) + self.var_sfm_max_age_days = tk.StringVar(value=str(v.get("sfm_max_event_age_days", 365))) + self.var_sfm_state_file = tk.StringVar(value=str(v.get("sfm_state_file", ""))) + # ── UI construction ─────────────────────────────────────────────────────── def _build_ui(self): @@ -216,6 +244,7 @@ class SettingsDialog: self._build_tab_paths(nb) self._build_tab_scanning(nb) self._build_tab_logging(nb) + self._build_tab_forwarding(nb) self._build_tab_updates(nb) btn_frame = tk.Frame(outer) @@ -347,6 +376,114 @@ class SettingsDialog: _add_label_check(f, 0, "Enable Logging", self.var_enable_logging) _add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365) + def _build_tab_forwarding(self, nb): + f = self._tab_frame(nb, "SFM Forward") + + # Row 0: enable checkbox + _add_label_check(f, 0, "Enable SFM Forwarding", self.var_sfm_enabled) + + # Row 1: SFM URL + Test button + tk.Label(f, text="SFM URL", anchor="w").grid( + row=1, column=0, sticky="w", padx=(8, 4), pady=4 + ) + url_frame = tk.Frame(f) + url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4) + url_frame.columnconfigure(0, weight=1) + + sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32) + sfm_entry.grid(row=0, column=0, sticky="ew") + + _hint = "http://10.0.0.44:8200" + if not self.var_sfm_url.get(): + sfm_entry.config(foreground="grey") + sfm_entry.insert(0, _hint) + + def _on_focus_in(e, ent=sfm_entry, h=_hint): + if ent.get() == h: + ent.delete(0, tk.END) + ent.config(foreground="black") + + def _on_focus_out(e, ent=sfm_entry, h=_hint, v=self.var_sfm_url): + if not ent.get(): + ent.config(foreground="grey") + ent.insert(0, h) + v.set("") + + sfm_entry.bind("", _on_focus_in) + sfm_entry.bind("", _on_focus_out) + + self._sfm_test_btn = ttk.Button(url_frame, text="Test", width=6, + command=self._test_sfm_connection) + self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0)) + + self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20) + self._sfm_test_status.grid(row=0, column=2, padx=(6, 0)) + + # Rows 2-7: timing/limits spinboxes + _add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 30, 3600) + _add_label_spinbox(f, 3, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60) + _add_label_spinbox(f, 4, "Missing-Report Grace (sec)", self.var_sfm_grace, 0, 3600) + _add_label_spinbox(f, 5, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 300) + _add_label_spinbox(f, 6, "Max Forwards Per Pass", self.var_sfm_max_per_pass, 1, 5000) + _add_label_spinbox(f, 7, "Max Event Age (days)", self.var_sfm_max_age_days, 1, 3650) + + # Row 8: state file browse + def browse_state(): + p = filedialog.asksaveasfilename( + title="Select SFM State File", + defaultextension=".json", + filetypes=[("JSON files", "*.json"), ("All files", "*.*")], + initialfile=os.path.basename(self.var_sfm_state_file.get() or "thor_forwarded.json"), + initialdir=os.path.dirname(self.var_sfm_state_file.get() or "C:\\"), + ) + if p: + self.var_sfm_state_file.set(p.replace("/", "\\")) + + _add_label_browse_entry(f, 8, "State File", self.var_sfm_state_file, browse_state) + + # Row 9: help text + help_text = ( + "Forwards .IDFH (histogram) and .IDFW (waveform) event files plus their\n" + "TXT/.txt sidecars to a seismo-relay SFM server.\n" + "Idempotent: each file is tracked by sha256, so re-scans never re-POST.\n" + "If the TXT sidecar appears AFTER the binary was forwarded alone, the\n" + "next pass will re-forward so the relay can refresh the DB row with\n" + "device-authoritative PPV/ZCFreq/peak values.\n" + "State file blank → defaults to \\thor_forwarded.json." + ) + tk.Label( + f, text=help_text, justify="left", fg="#555555", wraplength=420, + ).grid(row=9, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4)) + + def _test_sfm_connection(self): + import urllib.request + import urllib.error + + self._sfm_test_status.config(text="Testing...", foreground="grey") + self._sfm_test_btn.config(state="disabled") + self.root.update_idletasks() + + raw = self.var_sfm_url.get().strip() + if not raw or raw == "http://10.0.0.44:8200": + self._sfm_test_status.config(text="Enter a URL first", foreground="orange") + self._sfm_test_btn.config(state="normal") + return + + url = raw.rstrip("/") + "/health" + try: + with urllib.request.urlopen(urllib.request.Request(url), timeout=5) as resp: + if resp.status == 200: + self._sfm_test_status.config(text="Connected!", foreground="green") + else: + self._sfm_test_status.config(text="HTTP {}".format(resp.status), foreground="orange") + except urllib.error.URLError as e: + reason = str(e.reason) if hasattr(e, "reason") else str(e) + self._sfm_test_status.config(text="Failed: {}".format(reason[:30]), foreground="red") + except Exception as e: + self._sfm_test_status.config(text="Error: {}".format(str(e)[:30]), foreground="red") + finally: + self._sfm_test_btn.config(state="normal") + def _build_tab_updates(self, nb): f = self._tab_frame(nb, "Updates") @@ -421,9 +558,15 @@ class SettingsDialog: def _on_save(self): checks = [ - (self.var_api_interval, "API Interval", 30, 3600), - (self.var_scan_interval, "Scan Interval", 10, 3600), - (self.var_log_retention_days, "Log Retention Days", 1, 365), + (self.var_api_interval, "API Interval", 30, 3600), + (self.var_scan_interval, "Scan Interval", 10, 3600), + (self.var_log_retention_days, "Log Retention Days", 1, 365), + (self.var_sfm_forward_interval, "Forward Interval", 30, 3600), + (self.var_sfm_quiescence, "Quiescence", 1, 60), + (self.var_sfm_grace, "Missing-Report Grace", 0, 3600), + (self.var_sfm_http_timeout, "HTTP Timeout", 5, 300), + (self.var_sfm_max_per_pass, "Max Forwards Per Pass", 1, 5000), + (self.var_sfm_max_age_days, "Max Event Age (days)", 1, 3650), ] int_values = {} for var, name, mn, mx in checks: @@ -442,6 +585,11 @@ class SettingsDialog: else: api_url = api_url.rstrip("/") + "/api/series4/heartbeat" + sfm_url = self.var_sfm_url.get().strip() + if sfm_url == "http://10.0.0.44:8200": + sfm_url = "" + sfm_url = sfm_url.rstrip("/") # event_forwarder adds the endpoint path + values = { "thordata_path": self.var_thordata_path.get().strip(), "scan_interval": int_values["Scan Interval"], @@ -456,6 +604,16 @@ class SettingsDialog: "log_retention_days": int_values["Log Retention Days"], "update_source": self.var_update_source.get().strip() or "gitea", "update_url": self.var_update_url.get().strip(), + + "sfm_forward_enabled": self.var_sfm_enabled.get(), + "sfm_url": sfm_url, + "sfm_forward_interval": int_values["Forward Interval"], + "sfm_quiescence_seconds": int_values["Quiescence"], + "sfm_missing_report_grace_seconds": int_values["Missing-Report Grace"], + "sfm_http_timeout": int_values["HTTP Timeout"], + "sfm_max_forwards_per_pass": int_values["Max Forwards Per Pass"], + "sfm_max_event_age_days": int_values["Max Event Age (days)"], + "sfm_state_file": self.var_sfm_state_file.get().strip(), } try: diff --git a/thor_tray.py b/thor_tray.py index d7d98da..f2212db 100644 --- a/thor_tray.py +++ b/thor_tray.py @@ -1,5 +1,5 @@ """ -Thor Watcher — System Tray Launcher v0.2.0 +Thor Watcher — System Tray Launcher v0.3.0 Requires: pystray, Pillow, tkinter (stdlib) Run with: pythonw thor_tray.py (no console window) @@ -420,7 +420,25 @@ class WatcherTray: else: api_str = "API off" - return "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str) + base_line = "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str) + + sfm_status = self.state.get("sfm_status", "disabled") + if sfm_status in ("ok", "fail", "ready"): + counts = self.state.get("last_forward_counts") or {} + fwd = counts.get("forwarded", 0) + errs = counts.get("errors", 0) + last_fwd = self.state.get("last_forward") + if last_fwd is not None: + fwd_age = int((datetime.now() - last_fwd).total_seconds()) + fwd_age_str = "{}s ago".format(fwd_age) if fwd_age < 60 else "{}m ago".format(fwd_age // 60) + else: + fwd_age_str = "pending" + sfm_line = "SFM {} | {} fwd, {} err | last {}".format( + sfm_status.upper(), fwd, errs, fwd_age_str, + ) + return base_line + "\n" + sfm_line + + return base_line def _tray_status(self): status = self.state.get("status", "starting") @@ -431,12 +449,17 @@ class WatcherTray: api_status = self.state.get("api_status", "disabled") if api_status == "fail": return "missing" # red — API failing + sfm_status = self.state.get("sfm_status", "disabled") + if api_status == "ok" and sfm_status == "fail": + return "pending" # amber — heartbeat OK but forwarder is failing if api_status == "disabled": return "pending" # amber — running but not reporting return "ok" # green — running and API good def _build_menu(self): return pystray.Menu( + pystray.MenuItem("Thor Watcher v{}".format(_CURRENT_VERSION), None, enabled=False), + pystray.Menu.SEPARATOR, pystray.MenuItem(lambda item: self._status_text(), None, enabled=False), pystray.Menu.SEPARATOR, pystray.MenuItem("Settings...", self._open_settings),