2 Commits

10 changed files with 1956 additions and 29 deletions
+1
View File
@@ -2,6 +2,7 @@
# Thor Watcher local files
# --------------------------
config.json
/example-data/
# -------------------------
# Python ignores
+16
View File
@@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.3.0] - 2026-05-19
### Added
- `event_forwarder.py` — forwards `.IDFH` (histogram) and `.IDFW` (waveform) event files plus their `TXT/<basename>.txt` sidecars to a seismo-relay SFM server's new `/db/import/idf_file` endpoint
- Sha256-keyed `thor_forwarded.json` state file for idempotency across restarts and re-scans (default path: `<log_dir>/thor_forwarded.json`)
- "SFM Forward" tab in Settings dialog: enable/URL/Test, forward interval, quiescence, missing-report grace, HTTP timeout, max forwards per pass, max event age, state file picker
- Forwarder status line in tray menu: `SFM OK | N fwd, M err | last 30s ago`
- Tray icon goes amber when the SFM forwarder is failing but the API heartbeat is still healthy
- Re-pair logic: events forwarded without their TXT are re-forwarded once the sidecar appears so the relay can refresh DB rows with device-authoritative PPV/ZCFreq/peak values
- `event_forwarder.py --seed-state` CLI for skipping historical backfill on a first deploy
- Version badge: `Thor Watcher vX.Y.Z` shown at the top of the tray menu and in the Settings dialog title bar — operators no longer have to crack open the .exe properties to tell which version is running
### Changed
- Bumped `VERSION` to `0.3.0`
- Settings dialog tab order: Connection / Paths / Scanning / Logging / **SFM Forward** / Updates
## [0.2.0] - 2026-03-20
### Added
+27 -6
View File
@@ -1,8 +1,8 @@
# Thor Watcher
**Version:** 0.2.0
**Version:** 0.3.0
Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, and sends heartbeat data to Terra-View.
Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, sends heartbeat data to Terra-View, and (optionally) forwards `.IDFH`/`.IDFW` event files to a seismo-relay SFM server.
---
@@ -29,7 +29,7 @@ build.bat
```
Produces:
- `dist\thor-watcher-0.2.0.exe` — upload to Gitea release
- `dist\thor-watcher-0.3.0.exe` — upload to Gitea release
- `dist\thor-watcher.exe` — use with Inno Setup
Then run Inno Setup Compiler on `installer.iss` to produce `thor-watcher-setup.exe`.
@@ -62,6 +62,27 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con
| `log_retention_days` | integer | `30` | Days before log is auto-cleared |
| `update_source` | string | `gitea` | Auto-update source: `gitea`, `url`, or `disabled` |
| `update_url` | string | `""` | Base URL for `url` mode (e.g. Terra-View server) |
| `sfm_forward_enabled` | boolean | `false` | Forward `.IDFH`/`.IDFW` event files to a seismo-relay SFM server |
| `sfm_url` | string | `""` | Base URL of the seismo-relay SFM server (e.g. `http://10.0.0.44:8200`) |
| `sfm_forward_interval` | integer | `60` | Seconds between forwarder passes |
| `sfm_quiescence_seconds` | integer | `5` | Skip files modified within the last N seconds (avoid in-flight files) |
| `sfm_missing_report_grace_seconds` | integer | `60` | Forward a binary without its `.txt` sidecar if it hasn't appeared after N seconds |
| `sfm_http_timeout` | integer | `60` | HTTP timeout per forward POST |
| `sfm_state_file` | string | `""` | Path to the sha256-keyed state file. Blank → `<log_dir>\thor_forwarded.json` |
| `sfm_max_forwards_per_pass` | integer | `500` | Cap per pass to drip-feed large backfills |
| `sfm_max_event_age_days` | integer | `365` | Skip event files older than this many days |
---
## Event Forwarding
When `sfm_forward_enabled` is true and `sfm_url` is set, Thor Watcher walks the THORDATA tree each `sfm_forward_interval` seconds, finds `.IDFH` (histogram) and `.IDFW` (waveform) event binaries plus their `TXT/<basename>.txt` ASCII sidecars, and POSTs them to seismo-relay's `/db/import/idf_file` endpoint.
- **Idempotent.** Every forwarded file is recorded by sha256 in `thor_forwarded.json`. Re-scans never re-POST.
- **Default off.** Operators must explicitly enable from the Settings → SFM Forward tab.
- **Re-pair logic.** If a binary is forwarded before its TXT sidecar appears (after the grace period), it's flagged `had_report=false` and re-forwarded once the TXT arrives so the SFM database row can be refreshed with device-authoritative PPV/ZCFreq/peak values.
- **TXT export must be enabled in Thor.** Thor's TXT sidecars are not produced automatically — operators should enable TXT export so the relay can extract rich metadata. Forwards without a TXT are still useful (binary gets indexed; rich fields stay NULL).
- **Backfill seeding.** To skip a large historical archive on first deploy, run `python event_forwarder.py --seed-state --thordata C:\THORDATA --state <state file>` before flipping the switch.
---
@@ -69,8 +90,8 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con
| Color | Meaning |
|-------|---------|
| Green | Running, API reporting OK |
| Amber | Running, API disabled or not configured |
| Green | Running, API reporting OK (and SFM forwarder healthy when enabled) |
| Amber | Running, API disabled OR SFM forwarder failing while API is healthy |
| Red | Running, API failing |
| Purple | Error — check logs |
| Grey | Starting up |
@@ -100,7 +121,7 @@ Posted to `api_url` on each API interval:
{
"source_id": "THOR-PC",
"source_type": "series4_watcher",
"version": "0.2.0",
"version": "0.3.0",
"generated_at": "2026-03-20T14:30:00Z",
"log_tail": ["...last 25 log lines..."],
"units": [
+11 -1
View File
@@ -14,5 +14,15 @@
"log_retention_days": 30,
"update_source": "gitea",
"update_url": ""
"update_url": "",
"sfm_forward_enabled": false,
"sfm_url": "",
"sfm_forward_interval": 60,
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "",
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365
}
+62
View File
@@ -0,0 +1,62 @@
# Thor File Stucture Guide
This document is to explain how Thor formatis the file structure for units that call in via thor Autocall home.
## Main Stucture
Thor saves its data in a folder located in C:/ called 'THORDATA'. it then Creates folders for each user entered project. When a unit is added to that project, it creates a folder in project with that unit's serial number. Raw events (.IDFH for histogram and .IDFW for waveforms) plus .MLG monitor logs are then saved in this folder. if a unit is not assigned a project, it saves into a default project folder. If it matters, there is also a daily log file that gets created in the folder 'Logs'
In each unit's folder, there are various formats saved in their own individual folders too. Most cases have CSV, HTML, PDF, TXT, and XML
Here is the structure illustrated:
C:/THORDATA
├───Mon-Fayette Express Way - Sec 53A1
│ └───UM11402
│ ├───CSV
│ ├───HTML
│ ├───PDF
│ ├───TXT
│ └───XML
├───P.J. Dick - 5th and Halket
│ ├───UM11719
│ │ ├───CSV
│ │ ├───HTML
│ │ ├───PDF
│ │ ├───TXT
│ │ └───XML
│ UM12420
│ ├───CSV
│ ├───HTML
│ ├───PDF
│ ├───TXT
│ └───XML
Here is an expanded project folder with two events in it
└───Clearwater - ECMS 57940
└───BE9439
│ BE9439_20200713124250.MLG
│ BE9439_20200713124251.IDFH
│ BE9439_20200713131747.IDFW
│ BE9439_20200713131747.IDFW.CDB
├───CSV
│ BE9439_20200713124251.IDFH.csv
│ BE9439_20200713131747.IDFW.csv
├───PDF
│ BE9439_20200713124251.IDFH.pdf
│ BE9439_20200713131747.IDFW.pdf
├───TXT
│ BE9439_20200713124251.IDFH.txt
│ BE9439_20200713131747.IDFW.txt
└───XML
BE9439_20200713124251_IDFH_XML.XML
BE9439_20200713131747_IDFW_XML.XML
+823
View File
@@ -0,0 +1,823 @@
"""
event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event
files to a seismo-relay SFM server.
Walks the same `THORDATA_PATH/<Project>/<Unit>/` tree the heartbeat path
scans. For each event binary that hasn't been forwarded yet, pairs it
with its `<unit>/TXT/<basename>.txt` ASCII report (when available) and
POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one
multipart request.
This is a port of `series3-watcher/event_forwarder.py` adapted for the
Thor file layout. Key differences from the series3 forwarder:
- **Filenames are literal `<SERIAL>_<YYYYMMDDHHMMSS>.IDFH|.IDFW`** —
no base36 stem; the serial is right there in the prefix.
- **TXT sidecars live in a `TXT/` subfolder** next to the binaries
(Thor's exporter writes them there, not alongside the binary).
- **IDFH and IDFW are forwarded as independent events.** Each has
its own sha256-keyed state entry and its own POST. A single
timestamp can produce both a histogram (IDFH) and a waveform (IDFW),
and the seismo-relay endpoint dedupes on (serial, timestamp, kind),
so treating them as separate rows is the right model.
Design notes
────────────
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
Multipart encoding is hand-rolled.
- **Idempotent across restarts.** Forwarded files are tracked by
sha256 in a JSON state file (default: `<log_dir>/thor_forwarded.json`).
Re-scanning the watch tree doesn't re-POST anything.
- **Default-off.** Callers must enable via config
(`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x
deployments that auto-update stay non-forwarding until an operator
flips the switch.
- **Quiescence guard.** Files modified within the last few seconds
are skipped — Thor writes the .txt after the binary, so we wait
until both look stable before forwarding.
- **Best-effort report pairing.** When the .txt hasn't appeared yet
but the binary is older than `missing_report_grace_seconds`, the
binary is forwarded alone (seismo-relay accepts that and just skips
the rich fields — we'd rather get the binary indexed than block
forever waiting for a TXT that may never arrive, e.g. operator
hasn't enabled the TXT export in Thor).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
log = logging.getLogger(__name__)
# Default tuning. All overridable via config.json sfm_* keys.
DEFAULT_QUIESCENCE_SECONDS = 5
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60
DEFAULT_HTTP_TIMEOUT = 60.0
STATE_SCHEMA_VERSION = 1
# ── Filename matching ─────────────────────────────────────────────────────────
#
# Thor (Micromate Series IV) filename scheme:
# <SERIAL>_<YYYYMMDDHHMMSS>.<KIND>
# where SERIAL is the literal device serial (e.g. UM11719, BE9439),
# and KIND is IDFH (histogram) or IDFW (waveform).
#
# Examples:
# UM11719_20231219163444.IDFH
# UM11719_20231219162723.IDFW
# BE9439_20200713124251.IDFH
_EVENT_FILENAME_RE = re.compile(
r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
re.IGNORECASE,
)
# Filenames we explicitly skip even if they look event-shaped.
_NON_EVENT_EXTS = {
".mlg", # monitor-log files (separate heartbeat path)
".txt", # ASCII reports — paired in, not primary
".csv", # operator-facing derivative
".html", # operator-facing derivative
".pdf", # operator-facing derivative
".xml", # operator-facing derivative
".cdb", # IDFW.CDB cache-database variant — skip
".log",
".ini",
".json",
".sfm.json",
".bak",
".tmp",
}
def is_event_binary(path: str) -> bool:
"""Return True if `path`'s basename looks like a Thor event binary."""
name = os.path.basename(path)
lname = name.lower()
# Explicit reject for compound extensions like .IDFW.CDB
if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"):
return False
if not _EVENT_FILENAME_RE.match(name):
return False
ext = os.path.splitext(name)[1].lower()
if ext in _NON_EVENT_EXTS:
return False
return True
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]:
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` -> (serial, timestamp, kind).
`kind` is the upper-case extension without the dot — "IDFH" or "IDFW".
Returns None if the filename doesn't match.
"""
m = _EVENT_FILENAME_RE.match(name)
if not m:
return None
serial = m.group(1).upper()
try:
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError:
return None
kind = m.group(3).upper()
return serial, ts, kind
def idf_report_name(binary_name: str) -> str:
"""Thor TXT-export convention: append `.txt` to the binary basename.
UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt
"""
return binary_name + ".txt"
def idf_report_path(binary_path: str) -> str:
"""Compute the expected TXT sidecar path for a Thor event binary.
Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the
unit directory (verified against captured example-data). The
returned path is the *expected* location — caller is responsible
for checking that it actually exists.
"""
unit_dir = os.path.dirname(binary_path)
name = os.path.basename(binary_path)
return os.path.join(unit_dir, "TXT", idf_report_name(name))
def is_histogram_event(filename: str) -> bool:
"""True if this is an .IDFH (histogram) event. Used purely for
log clarity — Thor doesn't always export a TXT for histograms, so
a missing-report warning is suppressed for them."""
name = os.path.basename(filename)
return name.lower().endswith(".idfh")
def serial_from_filename(name: str) -> Optional[str]:
"""Extract the serial-number prefix from a Thor event filename."""
parsed = parse_event_filename(name)
if parsed is None:
return None
return parsed[0]
# ── State file ────────────────────────────────────────────────────────────────
class ForwardState:
"""Idempotency record: which event files have we already forwarded?
State file format (JSON):
{
"version": 1,
"forwarded": {
"<sha256>": {
"filename": "UM11719_20231219163444.IDFW",
"size": 8800,
"forwarded_at": "2026-05-19T...Z",
"had_report": true
},
...
}
}
Keyed by sha256 (not filename) so identical content is recognised
as already-forwarded even if the file moved or got renamed.
"""
def __init__(self, path: str):
self.path = path
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
self._load()
def _load(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as f:
d = json.load(f)
if not isinstance(d, dict):
raise ValueError("state file root is not an object")
if d.get("version") != STATE_SCHEMA_VERSION:
log.warning(
"forward state version mismatch (got %r, want %d) — starting fresh",
d.get("version"), STATE_SCHEMA_VERSION,
)
return
forwarded = d.get("forwarded")
if isinstance(forwarded, dict):
self._data["forwarded"] = forwarded
except FileNotFoundError:
pass
except (OSError, ValueError, json.JSONDecodeError) as exc:
log.warning("failed to load forward state from %s: %s", self.path, exc)
def _save(self) -> None:
tmp = self.path + ".tmp"
try:
d = os.path.dirname(self.path)
if d and not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except OSError as exc:
log.warning("failed to save forward state to %s: %s", self.path, exc)
def is_forwarded(self, sha256: str) -> bool:
return sha256 in self._data["forwarded"]
def status(self, sha256: str) -> Optional[bool]:
"""Return forwarding status for *sha256*.
Returns:
None — never forwarded. Eligible for a fresh forward.
True — forwarded successfully with its paired report.
NOT a candidate for re-forward.
False — forwarded WITHOUT its paired `.txt`. Eligible for
re-forward IF the TXT now exists, so seismo-relay's
upsert refreshes the DB row with authoritative
device-side values.
Legacy entries without a `had_report` key default to True so
an upgrade doesn't unexpectedly re-forward every entry.
"""
entry = self._data["forwarded"].get(sha256)
if entry is None:
return None
return bool(entry.get("had_report", True))
def mark_forwarded(
self,
sha256: str,
filename: str,
size: int,
had_report: bool = True,
) -> None:
"""Record a successful forward.
Set `had_report=False` when the forward shipped the binary
without its paired ASCII report. Such entries are re-checked
on subsequent scans and re-forwarded once the TXT appears.
Idempotent: re-marking an existing sha256 with `had_report=True`
is the explicit promotion path used when a re-pair succeeds.
"""
self._data["forwarded"][sha256] = {
"filename": filename,
"size": size,
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"had_report": had_report,
}
self._save()
def count(self) -> int:
return len(self._data["forwarded"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def sha256_of_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
"""Return True if the file's mtime is at least `quiescence_seconds`
in the past — i.e. no longer being written."""
try:
mtime = os.path.getmtime(path)
except OSError:
return False
return (now_ts - mtime) >= quiescence_seconds
def _iter_unit_dirs(thordata_root: str):
"""Yield (project_name, unit_name, unit_path) for every Thor unit
folder beneath `thordata_root`.
THORDATA layout: <root>/<Project>/<UnitSerial>/
"""
if not os.path.isdir(thordata_root):
return
try:
projects = os.listdir(thordata_root)
except OSError:
return
for proj in projects:
proj_path = os.path.join(thordata_root, proj)
if not os.path.isdir(proj_path):
continue
try:
units = os.listdir(proj_path)
except OSError:
continue
for unit in units:
unit_path = os.path.join(proj_path, unit)
if not os.path.isdir(unit_path):
continue
yield proj, unit, unit_path
def _find_txt_in_unit(unit_path: str, binary_name: str,
now_ts: float, quiescence_seconds: float,
_txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]:
"""Look up the matching `.txt` sidecar for `binary_name` inside
`<unit_path>/TXT/`. Case-insensitive on Windows-shaped paths.
Returns the absolute path if a quiescent sidecar exists, else None.
`_txt_cache` is mutated to memoize the lower-case → actual-name
map for each unit so repeated lookups in one scan don't restat.
"""
expected = idf_report_name(binary_name)
if unit_path not in _txt_cache:
txt_dir = os.path.join(unit_path, "TXT")
listing: Dict[str, str] = {}
if os.path.isdir(txt_dir):
try:
for n in os.listdir(txt_dir):
listing[n.lower()] = n
except OSError:
pass
_txt_cache[unit_path] = listing
listing = _txt_cache[unit_path]
actual = listing.get(expected.lower())
if not actual:
return None
candidate = os.path.join(unit_path, "TXT", actual)
if _is_quiescent(candidate, now_ts, quiescence_seconds):
return candidate
return None
# ── Scan pass ─────────────────────────────────────────────────────────────────
def find_pending_events(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
max_per_pass: int = 0,
) -> List[Tuple[str, Optional[str]]]:
"""
Walk `thordata_root` and return the list of (binary_path, txt_path_or_None)
pairs that need forwarding.
Filtering rules:
- Filename must match the Thor event filename regex.
- File must be quiescent (mtime >= quiescence_seconds in the past).
- File must not exceed `max_age_days`.
- File's sha256 must NOT already be in the forwarded state
(unless it was forwarded without its TXT and the TXT is now
present — see ForwardState.status).
- If a `<unit>/TXT/<basename>.txt` exists and is quiescent,
we pair them. Otherwise, if the binary is older than
missing_report_grace_seconds, we forward without the TXT.
Younger binaries with a missing TXT are deferred.
- When `max_per_pass > 0`, return at most that many pairs.
Older files (lower mtime) are forwarded first so backfill
proceeds chronologically.
"""
if not os.path.isdir(thordata_root):
log.warning("forward scan: thordata root not found: %s", thordata_root)
return []
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
# First pass: collect every event-binary entry with mtime for sorting.
candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path)
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = list(os.scandir(unit_path))
except OSError:
continue
for e in entries:
if not e.is_file():
continue
if not is_event_binary(e.path):
continue
try:
mtime = e.stat().st_mtime
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
continue
candidates.append((mtime, unit_path, e.path))
# Sort oldest-first so backfill is chronological.
candidates.sort(key=lambda t: t[0])
pending: List[Tuple[str, Optional[str]]] = []
skipped_inflight = 0
skipped_already_forwarded = 0
txt_cache: Dict[str, Dict[str, str]] = {}
for mtime, unit_path, binary_path in candidates:
if not _is_quiescent(binary_path, now_ts, quiescence_seconds):
skipped_inflight += 1
continue
try:
digest = sha256_of_file(binary_path)
except OSError as exc:
log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc)
continue
fwd_status = state.status(digest)
if fwd_status is True:
skipped_already_forwarded += 1
continue
binary_name = os.path.basename(binary_path)
txt_path = _find_txt_in_unit(
unit_path, binary_name, now_ts, quiescence_seconds, txt_cache,
)
if fwd_status is False:
# Previously forwarded WITHOUT report. Re-forward only
# if the TXT is now present so seismo-relay's upsert can
# refresh the row with authoritative device values.
if txt_path is None:
skipped_already_forwarded += 1
continue
elif txt_path is None:
# First-time forward and TXT not yet present. Wait for
# the grace period before forwarding alone.
if (now_ts - mtime) < missing_report_grace_seconds:
skipped_inflight += 1
continue
pending.append((binary_path, txt_path))
if max_per_pass and len(pending) >= max_per_pass:
break
log.debug(
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
)
return pending
# ── Multipart upload ──────────────────────────────────────────────────────────
def _encode_multipart(
parts: List[Tuple[str, str, str, bytes]],
) -> Tuple[bytes, str]:
"""Encode a list of (field_name, filename, content_type, data) tuples
as a multipart/form-data body. Returns (body_bytes, content_type
header value)."""
boundary = "----ThorWatcherBoundary" + os.urandom(8).hex()
chunks: List[bytes] = []
for field_name, filename, content_type, data in parts:
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
chunks.append(
(f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode("ascii")
)
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
chunks.append(data)
chunks.append(b"\r\n")
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
body = b"".join(chunks)
content_type_hdr = f"multipart/form-data; boundary={boundary}"
return body, content_type_hdr
def _import_endpoint(sfm_url: str) -> str:
"""Compose the import endpoint URL from a base SFM URL."""
return sfm_url.rstrip("/") + "/db/import/idf_file"
def forward_event_pair(
sfm_url: str,
binary_path: str,
txt_path: Optional[str],
*,
serial_hint: Optional[str] = None,
timeout: float = DEFAULT_HTTP_TIMEOUT,
) -> Dict[str, Any]:
"""POST a single event (binary + optional .txt) to the SFM import
endpoint.
Returns a dict mirroring the per-file outcome the server returned
on success, or a dict with `status="error"` on transport/HTTP
failure.
"""
binary_name = os.path.basename(binary_path)
with open(binary_path, "rb") as f:
binary_bytes = f.read()
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
if txt_path is not None:
with open(txt_path, "rb") as f:
txt_bytes = f.read()
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
body, content_type = _encode_multipart(parts)
# Auto-derive serial from filename if caller didn't supply one.
if not serial_hint:
serial_hint = serial_from_filename(binary_name)
url = _import_endpoint(sfm_url)
if serial_hint:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}serial={serial_hint}"
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Content-Type": content_type,
"Content-Length": str(len(body)),
"User-Agent": "thor-watcher/sfm-forwarder",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return {
"status": "error",
"filename": binary_name,
"detail": f"server returned non-JSON: {raw[:200]!r}",
}
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name and entry.get("status") == "ok":
return entry
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name:
return entry
return {
"status": "error",
"filename": binary_name,
"detail": f"unexpected server response: {payload!r}",
}
except urllib.error.HTTPError as exc:
try:
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
except Exception:
body_excerpt = ""
return {
"status": "error",
"filename": binary_name,
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
}
except urllib.error.URLError as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"connection error: {exc.reason}",
}
except (OSError, TimeoutError) as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"transport error: {exc}",
}
# ── Top-level orchestration ───────────────────────────────────────────────────
def forward_pending(
thordata_root: str,
sfm_url: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
timeout: float = DEFAULT_HTTP_TIMEOUT,
max_per_pass: int = 0,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""
Run one full pass: find pending events, POST each one, update state.
Returns a counts dict suitable for logging:
{
"scanned": <int>, # event binaries selected for forward
"forwarded": <int>, # successfully POSTed this pass
"errors": <int>, # POST failures (will retry next pass)
"with_report":<int>, # of forwarded, how many had a paired TXT
}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
pending = find_pending_events(
thordata_root, state,
max_age_days=max_age_days,
quiescence_seconds=quiescence_seconds,
missing_report_grace_seconds=missing_report_grace_seconds,
max_per_pass=max_per_pass,
)
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
for binary_path, txt_path in pending:
result = forward_event_pair(
sfm_url, binary_path, txt_path,
timeout=timeout,
)
if result.get("status") == "ok":
try:
digest = sha256_of_file(binary_path)
size = os.path.getsize(binary_path)
state.mark_forwarded(
digest,
os.path.basename(binary_path),
size,
had_report=(txt_path is not None),
)
except OSError as exc:
_log(f"[forward] post-success state save failed for "
f"{os.path.basename(binary_path)}: {exc}")
counts["forwarded"] += 1
if txt_path:
counts["with_report"] += 1
if txt_path:
report_token = "+ {} attached".format(os.path.basename(txt_path))
elif is_histogram_event(binary_path):
report_token = "(histogram, no report expected)"
else:
report_token = "no report"
_log(
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
os.path.basename(binary_path),
result.get("filesize", 0),
report_token,
result.get("inserted", 0),
result.get("skipped", 0),
)
)
else:
counts["errors"] += 1
_log(
f"[forward] ERR {os.path.basename(binary_path)}: "
f"{result.get('detail', 'unknown error')}"
)
return counts
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
def seed_state_from_folder(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int = 365,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""Walk `thordata_root` and mark every existing event binary as
already forwarded — without POSTing anything.
Run this ONCE before enabling sfm_forward_enabled on a machine
with a large historical archive. The watcher then starts
forwarding only events that appear AFTER the seed run.
Returns a counts dict:
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
if not os.path.isdir(thordata_root):
_log(f"[seed] thordata root not found: {thordata_root}")
return counts
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = [e for e in os.scandir(unit_path) if e.is_file()]
except OSError:
continue
for e in entries:
if not is_event_binary(e.path):
continue
counts["scanned"] += 1
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
counts["skipped_too_old"] += 1
continue
try:
digest = sha256_of_file(e.path)
except OSError as exc:
_log(f"[seed] sha256 failed for {e.path}: {exc}")
continue
if state.is_forwarded(digest):
counts["already_known"] += 1
continue
state.mark_forwarded(digest, e.name, size)
counts["seeded"] += 1
if counts["seeded"] % 1000 == 0:
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
_log(
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
f"already_known={counts['already_known']} "
f"skipped_too_old={counts['skipped_too_old']}"
)
return counts
# ── CLI entry point ─────────────────────────────────────────────────────────
def _main() -> int:
"""Command-line interface for one-shot operations.
python event_forwarder.py --seed-state \\
--thordata "C:\\THORDATA" \\
--state "<path/to/thor_forwarded.json>" \\
[--max-age-days 365]
"""
import argparse
parser = argparse.ArgumentParser(
description="Thor Watcher — SFM event forwarder utilities",
)
parser.add_argument(
"--seed-state", action="store_true",
help="Mark every event binary in --thordata as already-forwarded "
"(without POSTing). Use this BEFORE enabling sfm_forward "
"on a machine with a large historical archive.",
)
parser.add_argument(
"--thordata", required=True,
help="Path to the THORDATA root folder.",
)
parser.add_argument(
"--state", required=True,
help="Path to the JSON state file. Will be created if missing.",
)
parser.add_argument(
"--max-age-days", type=int, default=365,
help="Only seed files newer than this many days (default 365).",
)
args = parser.parse_args()
if not args.seed_state:
parser.error("specify --seed-state (no other modes supported yet)")
print(f"[seed] thordata = {args.thordata}")
print(f"[seed] state = {args.state}")
print(f"[seed] max_age = {args.max_age_days} days")
state = ForwardState(args.state)
print(f"[seed] state currently has {state.count()} entries")
seed_state_from_folder(
args.thordata, state,
max_age_days=args.max_age_days,
logger=lambda m: print(m),
)
print(f"[seed] state now has {state.count()} entries")
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())
+112 -15
View File
@@ -1,5 +1,5 @@
"""
Thor Watcher — Series 4 Ingest Agent v0.2.0
Thor Watcher — Series 4 Ingest Agent v0.3.0
Micromate (Series 4) ingest agent for Terra-View.
@@ -7,6 +7,8 @@ Behavior:
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
- For each UM####, finds the newest .MLG by timestamp in the filename
- Posts JSON heartbeat payload to Terra-View backend
- Forwards .IDFH/.IDFW event files (+ TXT sidecars) to a seismo-relay
SFM server when sfm_forward_enabled=true. See event_forwarder.py.
- Tray-friendly: run_watcher(state, stop_event) for background thread use
"""
@@ -22,10 +24,12 @@ from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from socket import gethostname
import event_forwarder
# ── Version ───────────────────────────────────────────────────────────────────
VERSION = "0.2.0"
VERSION = "0.3.0"
# ── Config ────────────────────────────────────────────────────────────────────
@@ -54,6 +58,17 @@ def load_config(config_path: str) -> Dict[str, Any]:
"update_source": "gitea",
"update_url": "",
"debug": False,
# SFM event forwarding — default OFF, opt-in via Settings.
"sfm_forward_enabled": False,
"sfm_url": "", # e.g. "http://10.0.0.44:8200"
"sfm_forward_interval": 60, # seconds between forward passes
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "", # blank → <log_dir>/thor_forwarded.json
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365,
}
with open(config_path, "r", encoding="utf-8") as f:
@@ -153,8 +168,16 @@ def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
parsed = parse_mlg_filename(fname)
if not parsed:
continue
unit_id, ts = parsed
unit_id, _ = parsed # keep unit_id only
full_path = os.path.join(unit_path, fname)
try:
mtime = os.path.getmtime(full_path)
ts = datetime.fromtimestamp(mtime)
except Exception:
continue
current = unit_map.get(unit_id)
if current is None or ts > current["last_call"]:
unit_map[unit_id] = {
@@ -248,14 +271,17 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
Main watcher loop. Runs in a background thread when launched from the tray.
state keys written each cycle:
state["status"] — "running" | "error" | "starting"
state["api_status"] — "ok" | "fail" | "disabled"
state["units"] — list of unit dicts for tray display
state["last_scan"] — datetime of last successful scan
state["last_error"] — last error string or None
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
state["update_available"] — set True when API response signals an update
state["status"] "running" | "error" | "starting"
state["api_status"] "ok" | "fail" | "disabled"
state["units"] — list of unit dicts for tray display
state["last_scan"] — datetime of last successful scan
state["last_error"] — last error string or None
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
state["update_available"] — set True when API response signals an update
state["sfm_status"] — "ok" | "fail" | "disabled" | "ready"
state["last_forward"] — datetime of last forwarder pass (or None)
state["last_forward_counts"] — dict from event_forwarder.forward_pending
"""
# Resolve config path
if getattr(sys, "frozen", False):
@@ -291,16 +317,56 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
API_INTERVAL = int(cfg["api_interval"])
ENABLE_LOGGING = bool(cfg["enable_logging"])
# SFM forwarder config
SFM_FORWARD_ENABLED = bool(cfg.get("sfm_forward_enabled", False))
SFM_URL = str(cfg.get("sfm_url", "")).strip()
SFM_FORWARD_INTERVAL = int(cfg.get("sfm_forward_interval", 60))
SFM_QUIESCENCE = int(cfg.get("sfm_quiescence_seconds", 5))
SFM_GRACE = int(cfg.get("sfm_missing_report_grace_seconds", 60))
SFM_HTTP_TIMEOUT = int(cfg.get("sfm_http_timeout", 60))
SFM_MAX_PER_PASS = int(cfg.get("sfm_max_forwards_per_pass", 500))
SFM_MAX_AGE_DAYS = int(cfg.get("sfm_max_event_age_days", 365))
sfm_state_path = str(cfg.get("sfm_state_file", "")).strip() or \
os.path.join(state["log_dir"], "thor_forwarded.json")
log_message(log_file, ENABLE_LOGGING,
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={} SFM={}".format(
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL),
bool(SFM_FORWARD_ENABLED and SFM_URL),
)
)
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={} SFM={}".format(
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL),
bool(SFM_FORWARD_ENABLED and SFM_URL),
))
# Initialize SFM forwarder state (if enabled)
sfm_state_obj: Optional[event_forwarder.ForwardState] = None
if SFM_FORWARD_ENABLED and SFM_URL:
try:
sfm_state_obj = event_forwarder.ForwardState(sfm_state_path)
state["sfm_status"] = "ready"
log_message(log_file, ENABLE_LOGGING,
"[sfm] forwarder ready url={} state_file={} known={}".format(
SFM_URL, sfm_state_path, sfm_state_obj.count(),
)
)
print("[SFM] forwarder ready url={} known={}".format(
SFM_URL, sfm_state_obj.count(),
))
except Exception as exc:
state["sfm_status"] = "fail"
state["last_error"] = "SFM init failed: {}".format(exc)
log_message(log_file, ENABLE_LOGGING,
"[sfm] init failed: {}".format(exc))
else:
state["sfm_status"] = "disabled"
state["last_forward"] = None
state["last_forward_counts"] = None
last_api_ts = 0.0
last_forward_ts = 0.0
while not stop_event.is_set():
try:
@@ -362,6 +428,37 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
else:
state["api_status"] = "disabled"
# ── SFM event forwarding ───────────────────────────────────────────
if sfm_state_obj is not None:
now_ts = time.time()
if now_ts - last_forward_ts >= SFM_FORWARD_INTERVAL:
last_forward_ts = now_ts
try:
counts = event_forwarder.forward_pending(
THORDATA_PATH, SFM_URL, sfm_state_obj,
max_age_days=SFM_MAX_AGE_DAYS,
quiescence_seconds=SFM_QUIESCENCE,
missing_report_grace_seconds=SFM_GRACE,
timeout=SFM_HTTP_TIMEOUT,
max_per_pass=SFM_MAX_PER_PASS,
logger=lambda m: log_message(log_file, ENABLE_LOGGING, m),
)
state["last_forward"] = datetime.now()
state["last_forward_counts"] = counts
if counts["errors"] > 0:
state["sfm_status"] = "fail"
else:
state["sfm_status"] = "ok"
summary = ("[sfm] pass scanned={scanned} forwarded={forwarded} "
"errors={errors} with_report={with_report}").format(**counts)
print(summary)
log_message(log_file, ENABLE_LOGGING, summary)
except Exception as exc:
state["sfm_status"] = "fail"
msg = "[sfm] pass failed: {}".format(exc)
print(msg)
log_message(log_file, ENABLE_LOGGING, msg)
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
+716
View File
@@ -0,0 +1,716 @@
"""
test_event_forwarder.py — unit tests for Thor Watcher's SFM event forwarder.
Covers:
- is_event_binary() filename matching (positive + negative cases)
- parse_event_filename() / serial_from_filename()
- idf_report_path() — the TXT/ subfolder convention
- ForwardState load/save round-trip + idempotency check
- find_pending_events() against the THORDATA/<Project>/<Unit>/ tree,
plus quiescence + grace-period + re-pair logic
- _encode_multipart() byte-level shape (boundary + headers)
- forward_event_pair() end-to-end against a tiny stdlib HTTP server
that mimics seismo-relay's POST /db/import/idf_file endpoint
- seed_state_from_folder() walks the tree without POSTing
Stdlib only — runs with `python -m pytest test_event_forwarder.py`
on Python 3.8+ (the watcher's compat target).
"""
from __future__ import annotations
import http.server
import json
import os
import tempfile
import threading
import time
import unittest
from pathlib import Path
import event_forwarder as ef
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_thordata(root: Path, project: str, unit: str) -> Path:
"""Create a THORDATA/<project>/<unit>/ folder pair; return unit_dir."""
unit_dir = root / project / unit
unit_dir.mkdir(parents=True, exist_ok=True)
return unit_dir
def _touch_with_age(p: Path, age_seconds: float, content: bytes = b"x") -> Path:
"""Create a file with controlled mtime."""
p.write_bytes(content)
target = time.time() - age_seconds
os.utime(p, (target, target))
return p
def _make_event(unit_dir: Path, name: str, age_seconds: float = 100,
content: bytes = b"x") -> Path:
return _touch_with_age(unit_dir / name, age_seconds, content)
def _make_txt(unit_dir: Path, base_name: str, age_seconds: float = 100,
content: bytes = b"r") -> Path:
txt_dir = unit_dir / "TXT"
txt_dir.mkdir(exist_ok=True)
return _touch_with_age(txt_dir / ef.idf_report_name(base_name),
age_seconds, content)
# ── is_event_binary() ────────────────────────────────────────────────────────
class TestIsEventBinary(unittest.TestCase):
def test_recognises_typical_thor_filenames(self):
for name in [
"UM11719_20231219163444.IDFH",
"UM11719_20231219162723.IDFW",
"BE9439_20200713124251.IDFH",
"UM13981_20220808082418.IDFH",
# case-insensitive
"um11719_20231219163444.idfh",
]:
self.assertTrue(ef.is_event_binary(name), name)
def test_rejects_non_event_extensions(self):
for name in [
"UM11719_20231219163436.MLG", # monitor log
"UM11719_20231219163444.IDFH.txt", # report sidecar
"UM11719_20231219164135.IDFW.CDB", # cache database variant
"UM11719_20231219164135.IDFH.CDB",
"agent.log",
"config.json",
"foo.bak",
"bar.tmp",
"UM11719_20231219163444.csv",
"UM11719_20231219163444.pdf",
"UM11719_20231219163444.html",
"UM11719_20231219163444.xml",
]:
self.assertFalse(ef.is_event_binary(name), name)
def test_rejects_malformed_filenames(self):
for name in [
"",
"no_extension",
"UM_20231219163444.IDFH", # missing serial digits
"1234_20231219163444.IDFH", # serial must start with letters
"UM11719_2023121916.IDFH", # short timestamp
"UM11719_20231219163444.IDFX", # wrong kind
"UM11719-20231219163444.IDFH", # wrong separator
]:
self.assertFalse(ef.is_event_binary(name), name)
def test_parse_event_filename(self):
from datetime import datetime
parsed = ef.parse_event_filename("UM11719_20231219163444.IDFW")
self.assertIsNotNone(parsed)
serial, ts, kind = parsed
self.assertEqual(serial, "UM11719")
self.assertEqual(ts, datetime(2023, 12, 19, 16, 34, 44))
self.assertEqual(kind, "IDFW")
def test_serial_from_filename(self):
self.assertEqual(ef.serial_from_filename("UM11719_20231219163444.IDFH"),
"UM11719")
self.assertEqual(ef.serial_from_filename("BE9439_20200713124251.IDFH"),
"BE9439")
self.assertIsNone(ef.serial_from_filename("not_an_event.bin"))
def test_idf_report_path_uses_txt_subfolder(self):
binary = "/foo/THORDATA/Project A/UM11719/UM11719_20231219163444.IDFW"
self.assertEqual(
ef.idf_report_path(binary),
os.path.join("/foo/THORDATA/Project A/UM11719",
"TXT", "UM11719_20231219163444.IDFW.txt"),
)
def test_is_histogram_event(self):
self.assertTrue(ef.is_histogram_event("UM11719_20231219163444.IDFH"))
self.assertTrue(ef.is_histogram_event("um11719_20231219163444.idfh"))
self.assertFalse(ef.is_histogram_event("UM11719_20231219162723.IDFW"))
# ── ForwardState ─────────────────────────────────────────────────────────────
class TestForwardState(unittest.TestCase):
def test_round_trip_persists_marked_entries(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
s = ef.ForwardState(path)
self.assertFalse(s.is_forwarded("abc123"))
s.mark_forwarded("abc123", "UM11719_20231219163444.IDFW", 8800)
self.assertTrue(s.is_forwarded("abc123"))
# Re-load from disk
s2 = ef.ForwardState(path)
self.assertTrue(s2.is_forwarded("abc123"))
self.assertEqual(s2.count(), 1)
def test_corrupt_state_file_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
f.write("not valid json {{{")
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
def test_version_mismatch_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
json.dump({"version": 999, "forwarded": {"x": {}}}, f)
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
def test_legacy_entries_default_to_had_report_true(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
json.dump({
"version": 1,
"forwarded": {
"abc123": {
"filename": "UM11719_20231219163444.IDFW",
"size": 123,
"forwarded_at": "2025-01-01T00:00:00Z",
# No had_report field — legacy entry
}
}
}, f)
state = ef.ForwardState(path)
self.assertIs(state.status("abc123"), True)
def test_status_returns_none_for_unknown_sha(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
self.assertIs(state.status("never-seen"), None)
def test_mark_with_had_report_false_then_promote(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
had_report=False)
self.assertIs(state.status("xyz"), False)
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
had_report=True)
self.assertIs(state.status("xyz"), True)
# ── find_pending_events() ────────────────────────────────────────────────────
class TestFindPendingEvents(unittest.TestCase):
def test_returns_pair_when_both_files_present_and_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][0]),
"UM11719_20231219163444.IDFW")
self.assertEqual(os.path.basename(pending[0][1]),
"UM11719_20231219163444.IDFW.txt")
def test_idfh_and_idfw_are_separate_events(self):
"""A single timestamp produces both .IDFH and .IDFW — they
forward as two independent events with their own state entries."""
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=120, content=b"histogram")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"waveform")
_make_txt(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=100, content=b"hreport")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"wreport")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 2)
names = sorted(os.path.basename(p[0]) for p in pending)
self.assertEqual(names, [
"UM11719_20231219163444.IDFH",
"UM11719_20231219163444.IDFW",
])
def test_pairing_when_txt_is_in_unit_root_does_not_match(self):
"""Sidecars MUST live in the TXT/ subfolder. A stray .txt
next to the binary is not the canonical location and should
not be picked up as a sidecar."""
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"bin")
# .txt is in the unit dir, not unit/TXT/
_touch_with_age(unit_dir / "UM11719_20231219163444.IDFW.txt",
age_seconds=100, content=b"misplaced")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
# Forward proceeds (grace period elapsed), but WITHOUT pairing
self.assertEqual(len(pending), 1)
self.assertIsNone(pending[0][1])
def test_skips_if_already_forwarded(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_if_too_fresh_to_be_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=1, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=1, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_forwards_alone_after_grace_when_txt_missing(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=200, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
bin_path, txt_path = pending[0]
self.assertEqual(os.path.basename(bin_path),
"UM11719_20231219163444.IDFH")
self.assertIsNone(txt_path)
def test_re_pair_after_late_arriving_txt(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW",
len(b"binary"), had_report=False)
# First scan: TXT not present → still skipped.
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(pending, [])
# TXT finally appears.
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][1]),
"UM11719_20231219163444.IDFW.txt")
def test_defers_when_txt_missing_and_within_grace(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=15, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_old_files_beyond_max_age_days(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=10 * 86400, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=10 * 86400, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=1,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_ignores_mlg_and_other_non_event_files(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163436.MLG",
age_seconds=120, content=b"mlg")
_make_event(unit_dir, "UM11719_20231219164135.IDFW.CDB",
age_seconds=120, content=b"cache")
_touch_with_age(unit_dir / "agent.log", age_seconds=120, content=b"log")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_walks_multiple_projects_and_units(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_a = _make_thordata(root, "Project A", "UM11719")
unit_b = _make_thordata(root, "Project B", "BE9439")
_make_event(unit_a, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"a")
_make_event(unit_b, "BE9439_20200713131747.IDFW", age_seconds=200, content=b"b")
_make_txt(unit_a, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"ar")
_make_txt(unit_b, "BE9439_20200713131747.IDFW", age_seconds=100, content=b"br")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=10000, # BE event is from 2020
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 2)
names = sorted(os.path.basename(p[0]) for p in pending)
self.assertEqual(names, [
"BE9439_20200713131747.IDFW",
"UM11719_20231219163444.IDFW",
])
def test_max_per_pass_caps_returned_count(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
for i in range(5):
name = "UM11719_2023121916344{}.IDFW".format(i)
_make_event(unit, name, age_seconds=120 + i, content=("bin-" + str(i)).encode())
_make_txt(unit, name, age_seconds=110 + i, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
max_per_pass=2,
)
self.assertEqual(len(pending), 2)
def test_max_per_pass_returns_oldest_first(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
ages = [200, 150, 100, 50]
for i, age in enumerate(ages):
name = "UM11719_2023121916344{}.IDFW".format(i)
_make_event(unit, name, age_seconds=age, content=("c" + str(i)).encode())
_make_txt(unit, name, age_seconds=max(1, age - 10), content=b"r")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
max_per_pass=2,
)
names = [os.path.basename(p[0]) for p in pending]
# Oldest two should be index 0 (200s) and 1 (150s)
self.assertEqual(names, [
"UM11719_20231219163440.IDFW",
"UM11719_20231219163441.IDFW",
])
# ── Seed-state mode ──────────────────────────────────────────────────────────
class TestSeedStateFromFolder(unittest.TestCase):
def test_seeds_every_in_window_event_without_posting(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
for i in range(3):
_make_event(unit, "UM11719_2023121916344{}.IDFW".format(i),
age_seconds=120 + i, content=("e" + str(i)).encode())
# Ignored
_make_event(unit, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg")
state = ef.ForwardState(str(root / "seed.json"))
counts = ef.seed_state_from_folder(str(root), state, max_age_days=30)
self.assertEqual(counts["scanned"], 3)
self.assertEqual(counts["seeded"], 3)
self.assertEqual(counts["already_known"], 0)
self.assertEqual(state.count(), 3)
def test_seeded_files_are_then_skipped_by_normal_scan(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
_make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=110, content=b"r")
_make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"y")
_make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=110, content=b"r")
state = ef.ForwardState(str(root / "seed.json"))
ef.seed_state_from_folder(str(root), state, max_age_days=30)
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_seed_is_idempotent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
state = ef.ForwardState(str(root / "seed.json"))
counts1 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
counts2 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
self.assertEqual(counts1["seeded"], 1)
self.assertEqual(counts2["seeded"], 0)
self.assertEqual(counts2["already_known"], 1)
self.assertEqual(state.count(), 1)
# ── Multipart encoder ────────────────────────────────────────────────────────
class TestMultipartEncoder(unittest.TestCase):
def test_encodes_two_parts_with_proper_boundary(self):
body, content_type = ef._encode_multipart([
("files", "a.bin", "application/octet-stream", b"\x01\x02"),
("files", "a.txt", "text/plain", b"hello"),
])
self.assertTrue(content_type.startswith("multipart/form-data; boundary="))
boundary = content_type.split("boundary=", 1)[1]
self.assertIn(boundary.encode("ascii"), body)
text = body.decode("latin-1")
self.assertIn('name="files"; filename="a.bin"', text)
self.assertIn('name="files"; filename="a.txt"', text)
self.assertIn("Content-Type: application/octet-stream", text)
self.assertIn("Content-Type: text/plain", text)
self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--"))
# ── End-to-end forward_event_pair against a fake server ──────────────────────
class _FakeImportHandler(http.server.BaseHTTPRequestHandler):
"""Mimics seismo-relay's POST /db/import/idf_file response."""
received = [] # class-level capture for test inspection
def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
ctype = self.headers.get("Content-Type", "")
parts = body.split(b"--" + ctype.split("boundary=")[-1].encode())
filenames = []
for p in parts:
for line in p.split(b"\r\n"):
if b'filename="' in line:
fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0]
filenames.append(fn.decode("latin-1"))
self.__class__.received.append({
"path": self.path,
"ctype": ctype,
"filenames": filenames,
})
results = []
binary_fn = next(
(fn for fn in filenames if not fn.lower().endswith(".txt")),
None,
)
if binary_fn:
results.append({
"filename": binary_fn,
"status": "ok",
"stored_filename": binary_fn,
"filesize": len(body),
"sha256": "00" * 32,
"report_attached": any(fn.lower().endswith(".txt") for fn in filenames),
"inserted": 1,
"skipped": 0,
})
payload = json.dumps({"count": len(results), "results": results}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def log_message(self, *_a, **_kw): # silence the test runner
pass
def _start_fake_server():
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler)
threading.Thread(target=server.serve_forever, daemon=True).start()
host, port = server.server_address
return server, f"http://{host}:{port}"
class TestForwardEventPair(unittest.TestCase):
def setUp(self):
_FakeImportHandler.received = []
self.server, self.base_url = _start_fake_server()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
def test_post_with_paired_report(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = tmp_p / "UM11719_20231219163444.IDFW"
txt_p = tmp_p / "UM11719_20231219163444.IDFW.txt"
bin_p.write_bytes(b"\x10\x20\x30 binary")
txt_p.write_bytes(b'"SerialNumber : UM11719"\n')
result = ef.forward_event_pair(
self.base_url, str(bin_p), str(txt_p), timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertEqual(result["filename"], "UM11719_20231219163444.IDFW")
self.assertTrue(result["report_attached"])
self.assertEqual(len(_FakeImportHandler.received), 1)
req = _FakeImportHandler.received[0]
# Path includes the serial-hint auto-extracted from the filename
self.assertTrue(req["path"].startswith("/db/import/idf_file"))
self.assertIn("serial=UM11719", req["path"])
self.assertIn("UM11719_20231219163444.IDFW", req["filenames"])
self.assertIn("UM11719_20231219163444.IDFW.txt", req["filenames"])
def test_post_without_report(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "UM11719_20231219163444.IDFH"
bin_p.write_bytes(b"binary only")
result = ef.forward_event_pair(
self.base_url, str(bin_p), None, timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertFalse(result["report_attached"])
req = _FakeImportHandler.received[0]
self.assertEqual(req["filenames"], ["UM11719_20231219163444.IDFH"])
def test_explicit_serial_hint_overrides_auto(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "UM11719_20231219163444.IDFW"
bin_p.write_bytes(b"x")
ef.forward_event_pair(
self.base_url, str(bin_p), None,
serial_hint="OVERRIDE99", timeout=5.0,
)
req = _FakeImportHandler.received[0]
self.assertIn("serial=OVERRIDE99", req["path"])
# ── forward_pending() smoke test ─────────────────────────────────────────────
class TestForwardPending(unittest.TestCase):
"""End-to-end: tree → find → POST → state-update → no re-POST."""
def setUp(self):
_FakeImportHandler.received = []
self.server, self.base_url = _start_fake_server()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
def test_pass_then_re_pass_is_idempotent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"binary")
_make_txt(unit, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
_make_event(unit, "UM11719_20231219163444.IDFH",
age_seconds=200, content=b"histogram")
_make_txt(unit, "UM11719_20231219163444.IDFH",
age_seconds=100, content=b"hreport")
state = ef.ForwardState(str(root / "fwd.json"))
counts = ef.forward_pending(
str(root), self.base_url, state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60, timeout=5.0,
)
self.assertEqual(counts["scanned"], 2)
self.assertEqual(counts["forwarded"], 2)
self.assertEqual(counts["errors"], 0)
self.assertEqual(counts["with_report"], 2)
self.assertEqual(state.count(), 2)
self.assertEqual(len(_FakeImportHandler.received), 2)
# Re-pass: nothing pending; no new POSTs.
counts2 = ef.forward_pending(
str(root), self.base_url, state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60, timeout=5.0,
)
self.assertEqual(counts2["scanned"], 0)
self.assertEqual(counts2["forwarded"], 0)
self.assertEqual(len(_FakeImportHandler.received), 2)
if __name__ == "__main__":
unittest.main()
+163 -5
View File
@@ -1,5 +1,5 @@
"""
Thor Watcher — Settings Dialog v0.2.0
Thor Watcher — Settings Dialog v0.3.0
Provides a Tkinter settings dialog that doubles as a first-run wizard.
@@ -14,6 +14,8 @@ import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from socket import gethostname
import series4_ingest as watcher
# ── Defaults (mirror config.example.json) ────────────────────────────────────
@@ -34,6 +36,17 @@ DEFAULTS = {
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
# SFM forwarder defaults — mirror series4_ingest.load_config
"sfm_forward_enabled": False,
"sfm_url": "",
"sfm_forward_interval": 60,
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "",
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365,
}
@@ -145,7 +158,8 @@ class SettingsDialog:
self.saved = False
self.root = parent
title = "Thor Watcher — Setup" if wizard else "Thor Watcher — Settings"
kind = "Setup" if wizard else "Settings"
title = "Thor Watcher v{}{}".format(watcher.VERSION, kind)
self.root.title(title)
self.root.resizable(False, False)
self.root.update_idletasks()
@@ -192,6 +206,20 @@ class SettingsDialog:
self.var_update_source = tk.StringVar(value=src)
self.var_update_url = tk.StringVar(value=str(v.get("update_url", "")))
# SFM Forwarder
sfm_en = v.get("sfm_forward_enabled", False)
self.var_sfm_enabled = tk.BooleanVar(
value=bool(sfm_en) if isinstance(sfm_en, bool) else str(sfm_en).lower() in ("true", "1", "yes")
)
self.var_sfm_url = tk.StringVar(value=str(v.get("sfm_url", "")))
self.var_sfm_forward_interval = tk.StringVar(value=str(v.get("sfm_forward_interval", 60)))
self.var_sfm_quiescence = tk.StringVar(value=str(v.get("sfm_quiescence_seconds", 5)))
self.var_sfm_grace = tk.StringVar(value=str(v.get("sfm_missing_report_grace_seconds", 60)))
self.var_sfm_http_timeout = tk.StringVar(value=str(v.get("sfm_http_timeout", 60)))
self.var_sfm_max_per_pass = tk.StringVar(value=str(v.get("sfm_max_forwards_per_pass", 500)))
self.var_sfm_max_age_days = tk.StringVar(value=str(v.get("sfm_max_event_age_days", 365)))
self.var_sfm_state_file = tk.StringVar(value=str(v.get("sfm_state_file", "")))
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
@@ -216,6 +244,7 @@ class SettingsDialog:
self._build_tab_paths(nb)
self._build_tab_scanning(nb)
self._build_tab_logging(nb)
self._build_tab_forwarding(nb)
self._build_tab_updates(nb)
btn_frame = tk.Frame(outer)
@@ -347,6 +376,114 @@ class SettingsDialog:
_add_label_check(f, 0, "Enable Logging", self.var_enable_logging)
_add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365)
def _build_tab_forwarding(self, nb):
f = self._tab_frame(nb, "SFM Forward")
# Row 0: enable checkbox
_add_label_check(f, 0, "Enable SFM Forwarding", self.var_sfm_enabled)
# Row 1: SFM URL + Test button
tk.Label(f, text="SFM URL", anchor="w").grid(
row=1, column=0, sticky="w", padx=(8, 4), pady=4
)
url_frame = tk.Frame(f)
url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
url_frame.columnconfigure(0, weight=1)
sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32)
sfm_entry.grid(row=0, column=0, sticky="ew")
_hint = "http://10.0.0.44:8200"
if not self.var_sfm_url.get():
sfm_entry.config(foreground="grey")
sfm_entry.insert(0, _hint)
def _on_focus_in(e, ent=sfm_entry, h=_hint):
if ent.get() == h:
ent.delete(0, tk.END)
ent.config(foreground="black")
def _on_focus_out(e, ent=sfm_entry, h=_hint, v=self.var_sfm_url):
if not ent.get():
ent.config(foreground="grey")
ent.insert(0, h)
v.set("")
sfm_entry.bind("<FocusIn>", _on_focus_in)
sfm_entry.bind("<FocusOut>", _on_focus_out)
self._sfm_test_btn = ttk.Button(url_frame, text="Test", width=6,
command=self._test_sfm_connection)
self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0))
self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20)
self._sfm_test_status.grid(row=0, column=2, padx=(6, 0))
# Rows 2-7: timing/limits spinboxes
_add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 30, 3600)
_add_label_spinbox(f, 3, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60)
_add_label_spinbox(f, 4, "Missing-Report Grace (sec)", self.var_sfm_grace, 0, 3600)
_add_label_spinbox(f, 5, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 300)
_add_label_spinbox(f, 6, "Max Forwards Per Pass", self.var_sfm_max_per_pass, 1, 5000)
_add_label_spinbox(f, 7, "Max Event Age (days)", self.var_sfm_max_age_days, 1, 3650)
# Row 8: state file browse
def browse_state():
p = filedialog.asksaveasfilename(
title="Select SFM State File",
defaultextension=".json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
initialfile=os.path.basename(self.var_sfm_state_file.get() or "thor_forwarded.json"),
initialdir=os.path.dirname(self.var_sfm_state_file.get() or "C:\\"),
)
if p:
self.var_sfm_state_file.set(p.replace("/", "\\"))
_add_label_browse_entry(f, 8, "State File", self.var_sfm_state_file, browse_state)
# Row 9: help text
help_text = (
"Forwards .IDFH (histogram) and .IDFW (waveform) event files plus their\n"
"TXT/<basename>.txt sidecars to a seismo-relay SFM server.\n"
"Idempotent: each file is tracked by sha256, so re-scans never re-POST.\n"
"If the TXT sidecar appears AFTER the binary was forwarded alone, the\n"
"next pass will re-forward so the relay can refresh the DB row with\n"
"device-authoritative PPV/ZCFreq/peak values.\n"
"State file blank → defaults to <log_dir>\\thor_forwarded.json."
)
tk.Label(
f, text=help_text, justify="left", fg="#555555", wraplength=420,
).grid(row=9, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4))
def _test_sfm_connection(self):
import urllib.request
import urllib.error
self._sfm_test_status.config(text="Testing...", foreground="grey")
self._sfm_test_btn.config(state="disabled")
self.root.update_idletasks()
raw = self.var_sfm_url.get().strip()
if not raw or raw == "http://10.0.0.44:8200":
self._sfm_test_status.config(text="Enter a URL first", foreground="orange")
self._sfm_test_btn.config(state="normal")
return
url = raw.rstrip("/") + "/health"
try:
with urllib.request.urlopen(urllib.request.Request(url), timeout=5) as resp:
if resp.status == 200:
self._sfm_test_status.config(text="Connected!", foreground="green")
else:
self._sfm_test_status.config(text="HTTP {}".format(resp.status), foreground="orange")
except urllib.error.URLError as e:
reason = str(e.reason) if hasattr(e, "reason") else str(e)
self._sfm_test_status.config(text="Failed: {}".format(reason[:30]), foreground="red")
except Exception as e:
self._sfm_test_status.config(text="Error: {}".format(str(e)[:30]), foreground="red")
finally:
self._sfm_test_btn.config(state="normal")
def _build_tab_updates(self, nb):
f = self._tab_frame(nb, "Updates")
@@ -421,9 +558,15 @@ class SettingsDialog:
def _on_save(self):
checks = [
(self.var_api_interval, "API Interval", 30, 3600),
(self.var_scan_interval, "Scan Interval", 10, 3600),
(self.var_log_retention_days, "Log Retention Days", 1, 365),
(self.var_api_interval, "API Interval", 30, 3600),
(self.var_scan_interval, "Scan Interval", 10, 3600),
(self.var_log_retention_days, "Log Retention Days", 1, 365),
(self.var_sfm_forward_interval, "Forward Interval", 30, 3600),
(self.var_sfm_quiescence, "Quiescence", 1, 60),
(self.var_sfm_grace, "Missing-Report Grace", 0, 3600),
(self.var_sfm_http_timeout, "HTTP Timeout", 5, 300),
(self.var_sfm_max_per_pass, "Max Forwards Per Pass", 1, 5000),
(self.var_sfm_max_age_days, "Max Event Age (days)", 1, 3650),
]
int_values = {}
for var, name, mn, mx in checks:
@@ -442,6 +585,11 @@ class SettingsDialog:
else:
api_url = api_url.rstrip("/") + "/api/series4/heartbeat"
sfm_url = self.var_sfm_url.get().strip()
if sfm_url == "http://10.0.0.44:8200":
sfm_url = ""
sfm_url = sfm_url.rstrip("/") # event_forwarder adds the endpoint path
values = {
"thordata_path": self.var_thordata_path.get().strip(),
"scan_interval": int_values["Scan Interval"],
@@ -456,6 +604,16 @@ class SettingsDialog:
"log_retention_days": int_values["Log Retention Days"],
"update_source": self.var_update_source.get().strip() or "gitea",
"update_url": self.var_update_url.get().strip(),
"sfm_forward_enabled": self.var_sfm_enabled.get(),
"sfm_url": sfm_url,
"sfm_forward_interval": int_values["Forward Interval"],
"sfm_quiescence_seconds": int_values["Quiescence"],
"sfm_missing_report_grace_seconds": int_values["Missing-Report Grace"],
"sfm_http_timeout": int_values["HTTP Timeout"],
"sfm_max_forwards_per_pass": int_values["Max Forwards Per Pass"],
"sfm_max_event_age_days": int_values["Max Event Age (days)"],
"sfm_state_file": self.var_sfm_state_file.get().strip(),
}
try:
+25 -2
View File
@@ -1,5 +1,5 @@
"""
Thor Watcher System Tray Launcher v0.2.0
Thor Watcher System Tray Launcher v0.3.0
Requires: pystray, Pillow, tkinter (stdlib)
Run with: pythonw thor_tray.py (no console window)
@@ -420,7 +420,25 @@ class WatcherTray:
else:
api_str = "API off"
return "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
base_line = "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
sfm_status = self.state.get("sfm_status", "disabled")
if sfm_status in ("ok", "fail", "ready"):
counts = self.state.get("last_forward_counts") or {}
fwd = counts.get("forwarded", 0)
errs = counts.get("errors", 0)
last_fwd = self.state.get("last_forward")
if last_fwd is not None:
fwd_age = int((datetime.now() - last_fwd).total_seconds())
fwd_age_str = "{}s ago".format(fwd_age) if fwd_age < 60 else "{}m ago".format(fwd_age // 60)
else:
fwd_age_str = "pending"
sfm_line = "SFM {} | {} fwd, {} err | last {}".format(
sfm_status.upper(), fwd, errs, fwd_age_str,
)
return base_line + "\n" + sfm_line
return base_line
def _tray_status(self):
status = self.state.get("status", "starting")
@@ -431,12 +449,17 @@ class WatcherTray:
api_status = self.state.get("api_status", "disabled")
if api_status == "fail":
return "missing" # red — API failing
sfm_status = self.state.get("sfm_status", "disabled")
if api_status == "ok" and sfm_status == "fail":
return "pending" # amber — heartbeat OK but forwarder is failing
if api_status == "disabled":
return "pending" # amber — running but not reporting
return "ok" # green — running and API good
def _build_menu(self):
return pystray.Menu(
pystray.MenuItem("Thor Watcher v{}".format(_CURRENT_VERSION), None, enabled=False),
pystray.Menu.SEPARATOR,
pystray.MenuItem(lambda item: self._status_text(), None, enabled=False),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Settings...", self._open_settings),