Compare commits
3 Commits
dev
...
76387ad17a
| Author | SHA1 | Date | |
|---|---|---|---|
| 76387ad17a | |||
| 4312efc15c | |||
| 0e2fe7c1e3 |
@@ -2,6 +2,7 @@
|
|||||||
# Thor Watcher local files
|
# Thor Watcher local files
|
||||||
# --------------------------
|
# --------------------------
|
||||||
config.json
|
config.json
|
||||||
|
/example-data/
|
||||||
|
|
||||||
# -------------------------
|
# -------------------------
|
||||||
# Python ignores
|
# Python ignores
|
||||||
|
|||||||
@@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [0.3.0] - 2026-05-19
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- `event_forwarder.py` — forwards `.IDFH` (histogram) and `.IDFW` (waveform) event files plus their `TXT/<basename>.txt` sidecars to a seismo-relay SFM server's new `/db/import/idf_file` endpoint
|
||||||
|
- Sha256-keyed `thor_forwarded.json` state file for idempotency across restarts and re-scans (default path: `<log_dir>/thor_forwarded.json`)
|
||||||
|
- "SFM Forward" tab in Settings dialog: enable/URL/Test, forward interval, quiescence, missing-report grace, HTTP timeout, max forwards per pass, max event age, state file picker
|
||||||
|
- Forwarder status line in tray menu: `SFM OK | N fwd, M err | last 30s ago`
|
||||||
|
- Tray icon goes amber when the SFM forwarder is failing but the API heartbeat is still healthy
|
||||||
|
- Re-pair logic: events forwarded without their TXT are re-forwarded once the sidecar appears so the relay can refresh DB rows with device-authoritative PPV/ZCFreq/peak values
|
||||||
|
- `event_forwarder.py --seed-state` CLI for skipping historical backfill on a first deploy
|
||||||
|
- Version badge: `Thor Watcher vX.Y.Z` shown at the top of the tray menu and in the Settings dialog title bar — operators no longer have to crack open the .exe properties to tell which version is running
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Bumped `VERSION` to `0.3.0`
|
||||||
|
- Settings dialog tab order: Connection / Paths / Scanning / Logging / **SFM Forward** / Updates
|
||||||
|
|
||||||
## [0.2.0] - 2026-03-20
|
## [0.2.0] - 2026-03-20
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
# Thor Watcher
|
# Thor Watcher
|
||||||
|
|
||||||
**Version:** 0.2.0
|
**Version:** 0.3.0
|
||||||
|
|
||||||
Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, and sends heartbeat data to Terra-View.
|
Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, sends heartbeat data to Terra-View, and (optionally) forwards `.IDFH`/`.IDFW` event files to a seismo-relay SFM server.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -29,7 +29,7 @@ build.bat
|
|||||||
```
|
```
|
||||||
|
|
||||||
Produces:
|
Produces:
|
||||||
- `dist\thor-watcher-0.2.0.exe` — upload to Gitea release
|
- `dist\thor-watcher-0.3.0.exe` — upload to Gitea release
|
||||||
- `dist\thor-watcher.exe` — use with Inno Setup
|
- `dist\thor-watcher.exe` — use with Inno Setup
|
||||||
|
|
||||||
Then run Inno Setup Compiler on `installer.iss` to produce `thor-watcher-setup.exe`.
|
Then run Inno Setup Compiler on `installer.iss` to produce `thor-watcher-setup.exe`.
|
||||||
@@ -62,6 +62,27 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con
|
|||||||
| `log_retention_days` | integer | `30` | Days before log is auto-cleared |
|
| `log_retention_days` | integer | `30` | Days before log is auto-cleared |
|
||||||
| `update_source` | string | `gitea` | Auto-update source: `gitea`, `url`, or `disabled` |
|
| `update_source` | string | `gitea` | Auto-update source: `gitea`, `url`, or `disabled` |
|
||||||
| `update_url` | string | `""` | Base URL for `url` mode (e.g. Terra-View server) |
|
| `update_url` | string | `""` | Base URL for `url` mode (e.g. Terra-View server) |
|
||||||
|
| `sfm_forward_enabled` | boolean | `false` | Forward `.IDFH`/`.IDFW` event files to a seismo-relay SFM server |
|
||||||
|
| `sfm_url` | string | `""` | Base URL of the seismo-relay SFM server (e.g. `http://10.0.0.44:8200`) |
|
||||||
|
| `sfm_forward_interval` | integer | `60` | Seconds between forwarder passes |
|
||||||
|
| `sfm_quiescence_seconds` | integer | `5` | Skip files modified within the last N seconds (avoid in-flight files) |
|
||||||
|
| `sfm_missing_report_grace_seconds` | integer | `60` | Forward a binary without its `.txt` sidecar if it hasn't appeared after N seconds |
|
||||||
|
| `sfm_http_timeout` | integer | `60` | HTTP timeout per forward POST |
|
||||||
|
| `sfm_state_file` | string | `""` | Path to the sha256-keyed state file. Blank → `<log_dir>\thor_forwarded.json` |
|
||||||
|
| `sfm_max_forwards_per_pass` | integer | `500` | Cap per pass to drip-feed large backfills |
|
||||||
|
| `sfm_max_event_age_days` | integer | `365` | Skip event files older than this many days |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Event Forwarding
|
||||||
|
|
||||||
|
When `sfm_forward_enabled` is true and `sfm_url` is set, Thor Watcher walks the THORDATA tree each `sfm_forward_interval` seconds, finds `.IDFH` (histogram) and `.IDFW` (waveform) event binaries plus their `TXT/<basename>.txt` ASCII sidecars, and POSTs them to seismo-relay's `/db/import/idf_file` endpoint.
|
||||||
|
|
||||||
|
- **Idempotent.** Every forwarded file is recorded by sha256 in `thor_forwarded.json`. Re-scans never re-POST.
|
||||||
|
- **Default off.** Operators must explicitly enable from the Settings → SFM Forward tab.
|
||||||
|
- **Re-pair logic.** If a binary is forwarded before its TXT sidecar appears (after the grace period), it's flagged `had_report=false` and re-forwarded once the TXT arrives so the SFM database row can be refreshed with device-authoritative PPV/ZCFreq/peak values.
|
||||||
|
- **TXT export must be enabled in Thor.** Thor's TXT sidecars are not produced automatically — operators should enable TXT export so the relay can extract rich metadata. Forwards without a TXT are still useful (binary gets indexed; rich fields stay NULL).
|
||||||
|
- **Backfill seeding.** To skip a large historical archive on first deploy, run `python event_forwarder.py --seed-state --thordata C:\THORDATA --state <state file>` before flipping the switch.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -69,8 +90,8 @@ Managed through the Settings dialog (right-click tray icon → Settings). A `con
|
|||||||
|
|
||||||
| Color | Meaning |
|
| Color | Meaning |
|
||||||
|-------|---------|
|
|-------|---------|
|
||||||
| Green | Running, API reporting OK |
|
| Green | Running, API reporting OK (and SFM forwarder healthy when enabled) |
|
||||||
| Amber | Running, API disabled or not configured |
|
| Amber | Running, API disabled OR SFM forwarder failing while API is healthy |
|
||||||
| Red | Running, API failing |
|
| Red | Running, API failing |
|
||||||
| Purple | Error — check logs |
|
| Purple | Error — check logs |
|
||||||
| Grey | Starting up |
|
| Grey | Starting up |
|
||||||
@@ -100,7 +121,7 @@ Posted to `api_url` on each API interval:
|
|||||||
{
|
{
|
||||||
"source_id": "THOR-PC",
|
"source_id": "THOR-PC",
|
||||||
"source_type": "series4_watcher",
|
"source_type": "series4_watcher",
|
||||||
"version": "0.2.0",
|
"version": "0.3.0",
|
||||||
"generated_at": "2026-03-20T14:30:00Z",
|
"generated_at": "2026-03-20T14:30:00Z",
|
||||||
"log_tail": ["...last 25 log lines..."],
|
"log_tail": ["...last 25 log lines..."],
|
||||||
"units": [
|
"units": [
|
||||||
|
|||||||
+11
-1
@@ -14,5 +14,15 @@
|
|||||||
"log_retention_days": 30,
|
"log_retention_days": 30,
|
||||||
|
|
||||||
"update_source": "gitea",
|
"update_source": "gitea",
|
||||||
"update_url": ""
|
"update_url": "",
|
||||||
|
|
||||||
|
"sfm_forward_enabled": false,
|
||||||
|
"sfm_url": "",
|
||||||
|
"sfm_forward_interval": 60,
|
||||||
|
"sfm_quiescence_seconds": 5,
|
||||||
|
"sfm_missing_report_grace_seconds": 60,
|
||||||
|
"sfm_http_timeout": 60,
|
||||||
|
"sfm_state_file": "",
|
||||||
|
"sfm_max_forwards_per_pass": 500,
|
||||||
|
"sfm_max_event_age_days": 365
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
# Thor File Stucture Guide
|
||||||
|
|
||||||
|
This document is to explain how Thor formatis the file structure for units that call in via thor Autocall home.
|
||||||
|
|
||||||
|
## Main Stucture
|
||||||
|
|
||||||
|
Thor saves its data in a folder located in C:/ called 'THORDATA'. it then Creates folders for each user entered project. When a unit is added to that project, it creates a folder in project with that unit's serial number. Raw events (.IDFH for histogram and .IDFW for waveforms) plus .MLG monitor logs are then saved in this folder. if a unit is not assigned a project, it saves into a default project folder. If it matters, there is also a daily log file that gets created in the folder 'Logs'
|
||||||
|
|
||||||
|
In each unit's folder, there are various formats saved in their own individual folders too. Most cases have CSV, HTML, PDF, TXT, and XML
|
||||||
|
|
||||||
|
Here is the structure illustrated:
|
||||||
|
|
||||||
|
C:/THORDATA
|
||||||
|
├───Mon-Fayette Express Way - Sec 53A1
|
||||||
|
│ └───UM11402
|
||||||
|
│ ├───CSV
|
||||||
|
│ ├───HTML
|
||||||
|
│ ├───PDF
|
||||||
|
│ ├───TXT
|
||||||
|
│ └───XML
|
||||||
|
├───P.J. Dick - 5th and Halket
|
||||||
|
│ ├───UM11719
|
||||||
|
│ │ ├───CSV
|
||||||
|
│ │ ├───HTML
|
||||||
|
│ │ ├───PDF
|
||||||
|
│ │ ├───TXT
|
||||||
|
│ │ └───XML
|
||||||
|
│ UM12420
|
||||||
|
│ ├───CSV
|
||||||
|
│ ├───HTML
|
||||||
|
│ ├───PDF
|
||||||
|
│ ├───TXT
|
||||||
|
│ └───XML
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Here is an expanded project folder with two events in it
|
||||||
|
|
||||||
|
|
||||||
|
└───Clearwater - ECMS 57940
|
||||||
|
└───BE9439
|
||||||
|
│ BE9439_20200713124250.MLG
|
||||||
|
│ BE9439_20200713124251.IDFH
|
||||||
|
│ BE9439_20200713131747.IDFW
|
||||||
|
│ BE9439_20200713131747.IDFW.CDB
|
||||||
|
│
|
||||||
|
├───CSV
|
||||||
|
│ BE9439_20200713124251.IDFH.csv
|
||||||
|
│ BE9439_20200713131747.IDFW.csv
|
||||||
|
│
|
||||||
|
├───PDF
|
||||||
|
│ BE9439_20200713124251.IDFH.pdf
|
||||||
|
│ BE9439_20200713131747.IDFW.pdf
|
||||||
|
│
|
||||||
|
├───TXT
|
||||||
|
│ BE9439_20200713124251.IDFH.txt
|
||||||
|
│ BE9439_20200713131747.IDFW.txt
|
||||||
|
│
|
||||||
|
└───XML
|
||||||
|
BE9439_20200713124251_IDFH_XML.XML
|
||||||
|
BE9439_20200713131747_IDFW_XML.XML
|
||||||
|
|
||||||
@@ -0,0 +1,823 @@
|
|||||||
|
"""
|
||||||
|
event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event
|
||||||
|
files to a seismo-relay SFM server.
|
||||||
|
|
||||||
|
Walks the same `THORDATA_PATH/<Project>/<Unit>/` tree the heartbeat path
|
||||||
|
scans. For each event binary that hasn't been forwarded yet, pairs it
|
||||||
|
with its `<unit>/TXT/<basename>.txt` ASCII report (when available) and
|
||||||
|
POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one
|
||||||
|
multipart request.
|
||||||
|
|
||||||
|
This is a port of `series3-watcher/event_forwarder.py` adapted for the
|
||||||
|
Thor file layout. Key differences from the series3 forwarder:
|
||||||
|
|
||||||
|
- **Filenames are literal `<SERIAL>_<YYYYMMDDHHMMSS>.IDFH|.IDFW`** —
|
||||||
|
no base36 stem; the serial is right there in the prefix.
|
||||||
|
- **TXT sidecars live in a `TXT/` subfolder** next to the binaries
|
||||||
|
(Thor's exporter writes them there, not alongside the binary).
|
||||||
|
- **IDFH and IDFW are forwarded as independent events.** Each has
|
||||||
|
its own sha256-keyed state entry and its own POST. A single
|
||||||
|
timestamp can produce both a histogram (IDFH) and a waveform (IDFW),
|
||||||
|
and the seismo-relay endpoint dedupes on (serial, timestamp, kind),
|
||||||
|
so treating them as separate rows is the right model.
|
||||||
|
|
||||||
|
Design notes
|
||||||
|
────────────
|
||||||
|
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
|
||||||
|
Multipart encoding is hand-rolled.
|
||||||
|
- **Idempotent across restarts.** Forwarded files are tracked by
|
||||||
|
sha256 in a JSON state file (default: `<log_dir>/thor_forwarded.json`).
|
||||||
|
Re-scanning the watch tree doesn't re-POST anything.
|
||||||
|
- **Default-off.** Callers must enable via config
|
||||||
|
(`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x
|
||||||
|
deployments that auto-update stay non-forwarding until an operator
|
||||||
|
flips the switch.
|
||||||
|
- **Quiescence guard.** Files modified within the last few seconds
|
||||||
|
are skipped — Thor writes the .txt after the binary, so we wait
|
||||||
|
until both look stable before forwarding.
|
||||||
|
- **Best-effort report pairing.** When the .txt hasn't appeared yet
|
||||||
|
but the binary is older than `missing_report_grace_seconds`, the
|
||||||
|
binary is forwarded alone (seismo-relay accepts that and just skips
|
||||||
|
the rich fields — we'd rather get the binary indexed than block
|
||||||
|
forever waiting for a TXT that may never arrive, e.g. operator
|
||||||
|
hasn't enabled the TXT export in Thor).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Default tuning. All overridable via config.json sfm_* keys.
|
||||||
|
DEFAULT_QUIESCENCE_SECONDS = 5
|
||||||
|
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60
|
||||||
|
DEFAULT_HTTP_TIMEOUT = 60.0
|
||||||
|
STATE_SCHEMA_VERSION = 1
|
||||||
|
|
||||||
|
|
||||||
|
# ── Filename matching ─────────────────────────────────────────────────────────
|
||||||
|
#
|
||||||
|
# Thor (Micromate Series IV) filename scheme:
|
||||||
|
# <SERIAL>_<YYYYMMDDHHMMSS>.<KIND>
|
||||||
|
# where SERIAL is the literal device serial (e.g. UM11719, BE9439),
|
||||||
|
# and KIND is IDFH (histogram) or IDFW (waveform).
|
||||||
|
#
|
||||||
|
# Examples:
|
||||||
|
# UM11719_20231219163444.IDFH
|
||||||
|
# UM11719_20231219162723.IDFW
|
||||||
|
# BE9439_20200713124251.IDFH
|
||||||
|
_EVENT_FILENAME_RE = re.compile(
|
||||||
|
r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Filenames we explicitly skip even if they look event-shaped.
|
||||||
|
_NON_EVENT_EXTS = {
|
||||||
|
".mlg", # monitor-log files (separate heartbeat path)
|
||||||
|
".txt", # ASCII reports — paired in, not primary
|
||||||
|
".csv", # operator-facing derivative
|
||||||
|
".html", # operator-facing derivative
|
||||||
|
".pdf", # operator-facing derivative
|
||||||
|
".xml", # operator-facing derivative
|
||||||
|
".cdb", # IDFW.CDB cache-database variant — skip
|
||||||
|
".log",
|
||||||
|
".ini",
|
||||||
|
".json",
|
||||||
|
".sfm.json",
|
||||||
|
".bak",
|
||||||
|
".tmp",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_event_binary(path: str) -> bool:
|
||||||
|
"""Return True if `path`'s basename looks like a Thor event binary."""
|
||||||
|
name = os.path.basename(path)
|
||||||
|
lname = name.lower()
|
||||||
|
# Explicit reject for compound extensions like .IDFW.CDB
|
||||||
|
if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"):
|
||||||
|
return False
|
||||||
|
if not _EVENT_FILENAME_RE.match(name):
|
||||||
|
return False
|
||||||
|
ext = os.path.splitext(name)[1].lower()
|
||||||
|
if ext in _NON_EVENT_EXTS:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]:
|
||||||
|
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` -> (serial, timestamp, kind).
|
||||||
|
|
||||||
|
`kind` is the upper-case extension without the dot — "IDFH" or "IDFW".
|
||||||
|
Returns None if the filename doesn't match.
|
||||||
|
"""
|
||||||
|
m = _EVENT_FILENAME_RE.match(name)
|
||||||
|
if not m:
|
||||||
|
return None
|
||||||
|
serial = m.group(1).upper()
|
||||||
|
try:
|
||||||
|
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
kind = m.group(3).upper()
|
||||||
|
return serial, ts, kind
|
||||||
|
|
||||||
|
|
||||||
|
def idf_report_name(binary_name: str) -> str:
|
||||||
|
"""Thor TXT-export convention: append `.txt` to the binary basename.
|
||||||
|
|
||||||
|
UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt
|
||||||
|
"""
|
||||||
|
return binary_name + ".txt"
|
||||||
|
|
||||||
|
|
||||||
|
def idf_report_path(binary_path: str) -> str:
|
||||||
|
"""Compute the expected TXT sidecar path for a Thor event binary.
|
||||||
|
|
||||||
|
Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the
|
||||||
|
unit directory (verified against captured example-data). The
|
||||||
|
returned path is the *expected* location — caller is responsible
|
||||||
|
for checking that it actually exists.
|
||||||
|
"""
|
||||||
|
unit_dir = os.path.dirname(binary_path)
|
||||||
|
name = os.path.basename(binary_path)
|
||||||
|
return os.path.join(unit_dir, "TXT", idf_report_name(name))
|
||||||
|
|
||||||
|
|
||||||
|
def is_histogram_event(filename: str) -> bool:
|
||||||
|
"""True if this is an .IDFH (histogram) event. Used purely for
|
||||||
|
log clarity — Thor doesn't always export a TXT for histograms, so
|
||||||
|
a missing-report warning is suppressed for them."""
|
||||||
|
name = os.path.basename(filename)
|
||||||
|
return name.lower().endswith(".idfh")
|
||||||
|
|
||||||
|
|
||||||
|
def serial_from_filename(name: str) -> Optional[str]:
|
||||||
|
"""Extract the serial-number prefix from a Thor event filename."""
|
||||||
|
parsed = parse_event_filename(name)
|
||||||
|
if parsed is None:
|
||||||
|
return None
|
||||||
|
return parsed[0]
|
||||||
|
|
||||||
|
|
||||||
|
# ── State file ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class ForwardState:
|
||||||
|
"""Idempotency record: which event files have we already forwarded?
|
||||||
|
|
||||||
|
State file format (JSON):
|
||||||
|
|
||||||
|
{
|
||||||
|
"version": 1,
|
||||||
|
"forwarded": {
|
||||||
|
"<sha256>": {
|
||||||
|
"filename": "UM11719_20231219163444.IDFW",
|
||||||
|
"size": 8800,
|
||||||
|
"forwarded_at": "2026-05-19T...Z",
|
||||||
|
"had_report": true
|
||||||
|
},
|
||||||
|
...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Keyed by sha256 (not filename) so identical content is recognised
|
||||||
|
as already-forwarded even if the file moved or got renamed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, path: str):
|
||||||
|
self.path = path
|
||||||
|
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
|
||||||
|
self._load()
|
||||||
|
|
||||||
|
def _load(self) -> None:
|
||||||
|
try:
|
||||||
|
with open(self.path, "r", encoding="utf-8") as f:
|
||||||
|
d = json.load(f)
|
||||||
|
if not isinstance(d, dict):
|
||||||
|
raise ValueError("state file root is not an object")
|
||||||
|
if d.get("version") != STATE_SCHEMA_VERSION:
|
||||||
|
log.warning(
|
||||||
|
"forward state version mismatch (got %r, want %d) — starting fresh",
|
||||||
|
d.get("version"), STATE_SCHEMA_VERSION,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
forwarded = d.get("forwarded")
|
||||||
|
if isinstance(forwarded, dict):
|
||||||
|
self._data["forwarded"] = forwarded
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
except (OSError, ValueError, json.JSONDecodeError) as exc:
|
||||||
|
log.warning("failed to load forward state from %s: %s", self.path, exc)
|
||||||
|
|
||||||
|
def _save(self) -> None:
|
||||||
|
tmp = self.path + ".tmp"
|
||||||
|
try:
|
||||||
|
d = os.path.dirname(self.path)
|
||||||
|
if d and not os.path.isdir(d):
|
||||||
|
os.makedirs(d, exist_ok=True)
|
||||||
|
with open(tmp, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(self._data, f, indent=2, sort_keys=True)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp, self.path)
|
||||||
|
except OSError as exc:
|
||||||
|
log.warning("failed to save forward state to %s: %s", self.path, exc)
|
||||||
|
|
||||||
|
def is_forwarded(self, sha256: str) -> bool:
|
||||||
|
return sha256 in self._data["forwarded"]
|
||||||
|
|
||||||
|
def status(self, sha256: str) -> Optional[bool]:
|
||||||
|
"""Return forwarding status for *sha256*.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None — never forwarded. Eligible for a fresh forward.
|
||||||
|
True — forwarded successfully with its paired report.
|
||||||
|
NOT a candidate for re-forward.
|
||||||
|
False — forwarded WITHOUT its paired `.txt`. Eligible for
|
||||||
|
re-forward IF the TXT now exists, so seismo-relay's
|
||||||
|
upsert refreshes the DB row with authoritative
|
||||||
|
device-side values.
|
||||||
|
|
||||||
|
Legacy entries without a `had_report` key default to True so
|
||||||
|
an upgrade doesn't unexpectedly re-forward every entry.
|
||||||
|
"""
|
||||||
|
entry = self._data["forwarded"].get(sha256)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
return bool(entry.get("had_report", True))
|
||||||
|
|
||||||
|
def mark_forwarded(
|
||||||
|
self,
|
||||||
|
sha256: str,
|
||||||
|
filename: str,
|
||||||
|
size: int,
|
||||||
|
had_report: bool = True,
|
||||||
|
) -> None:
|
||||||
|
"""Record a successful forward.
|
||||||
|
|
||||||
|
Set `had_report=False` when the forward shipped the binary
|
||||||
|
without its paired ASCII report. Such entries are re-checked
|
||||||
|
on subsequent scans and re-forwarded once the TXT appears.
|
||||||
|
|
||||||
|
Idempotent: re-marking an existing sha256 with `had_report=True`
|
||||||
|
is the explicit promotion path used when a re-pair succeeds.
|
||||||
|
"""
|
||||||
|
self._data["forwarded"][sha256] = {
|
||||||
|
"filename": filename,
|
||||||
|
"size": size,
|
||||||
|
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||||||
|
"had_report": had_report,
|
||||||
|
}
|
||||||
|
self._save()
|
||||||
|
|
||||||
|
def count(self) -> int:
|
||||||
|
return len(self._data["forwarded"])
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def sha256_of_file(path: str) -> str:
|
||||||
|
h = hashlib.sha256()
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(65536), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
|
||||||
|
"""Return True if the file's mtime is at least `quiescence_seconds`
|
||||||
|
in the past — i.e. no longer being written."""
|
||||||
|
try:
|
||||||
|
mtime = os.path.getmtime(path)
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
return (now_ts - mtime) >= quiescence_seconds
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_unit_dirs(thordata_root: str):
|
||||||
|
"""Yield (project_name, unit_name, unit_path) for every Thor unit
|
||||||
|
folder beneath `thordata_root`.
|
||||||
|
|
||||||
|
THORDATA layout: <root>/<Project>/<UnitSerial>/
|
||||||
|
"""
|
||||||
|
if not os.path.isdir(thordata_root):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
projects = os.listdir(thordata_root)
|
||||||
|
except OSError:
|
||||||
|
return
|
||||||
|
for proj in projects:
|
||||||
|
proj_path = os.path.join(thordata_root, proj)
|
||||||
|
if not os.path.isdir(proj_path):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
units = os.listdir(proj_path)
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
for unit in units:
|
||||||
|
unit_path = os.path.join(proj_path, unit)
|
||||||
|
if not os.path.isdir(unit_path):
|
||||||
|
continue
|
||||||
|
yield proj, unit, unit_path
|
||||||
|
|
||||||
|
|
||||||
|
def _find_txt_in_unit(unit_path: str, binary_name: str,
|
||||||
|
now_ts: float, quiescence_seconds: float,
|
||||||
|
_txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]:
|
||||||
|
"""Look up the matching `.txt` sidecar for `binary_name` inside
|
||||||
|
`<unit_path>/TXT/`. Case-insensitive on Windows-shaped paths.
|
||||||
|
|
||||||
|
Returns the absolute path if a quiescent sidecar exists, else None.
|
||||||
|
`_txt_cache` is mutated to memoize the lower-case → actual-name
|
||||||
|
map for each unit so repeated lookups in one scan don't restat.
|
||||||
|
"""
|
||||||
|
expected = idf_report_name(binary_name)
|
||||||
|
|
||||||
|
if unit_path not in _txt_cache:
|
||||||
|
txt_dir = os.path.join(unit_path, "TXT")
|
||||||
|
listing: Dict[str, str] = {}
|
||||||
|
if os.path.isdir(txt_dir):
|
||||||
|
try:
|
||||||
|
for n in os.listdir(txt_dir):
|
||||||
|
listing[n.lower()] = n
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
_txt_cache[unit_path] = listing
|
||||||
|
|
||||||
|
listing = _txt_cache[unit_path]
|
||||||
|
actual = listing.get(expected.lower())
|
||||||
|
if not actual:
|
||||||
|
return None
|
||||||
|
candidate = os.path.join(unit_path, "TXT", actual)
|
||||||
|
if _is_quiescent(candidate, now_ts, quiescence_seconds):
|
||||||
|
return candidate
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Scan pass ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def find_pending_events(
|
||||||
|
thordata_root: str,
|
||||||
|
state: ForwardState,
|
||||||
|
*,
|
||||||
|
max_age_days: int,
|
||||||
|
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||||||
|
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||||||
|
max_per_pass: int = 0,
|
||||||
|
) -> List[Tuple[str, Optional[str]]]:
|
||||||
|
"""
|
||||||
|
Walk `thordata_root` and return the list of (binary_path, txt_path_or_None)
|
||||||
|
pairs that need forwarding.
|
||||||
|
|
||||||
|
Filtering rules:
|
||||||
|
- Filename must match the Thor event filename regex.
|
||||||
|
- File must be quiescent (mtime >= quiescence_seconds in the past).
|
||||||
|
- File must not exceed `max_age_days`.
|
||||||
|
- File's sha256 must NOT already be in the forwarded state
|
||||||
|
(unless it was forwarded without its TXT and the TXT is now
|
||||||
|
present — see ForwardState.status).
|
||||||
|
- If a `<unit>/TXT/<basename>.txt` exists and is quiescent,
|
||||||
|
we pair them. Otherwise, if the binary is older than
|
||||||
|
missing_report_grace_seconds, we forward without the TXT.
|
||||||
|
Younger binaries with a missing TXT are deferred.
|
||||||
|
- When `max_per_pass > 0`, return at most that many pairs.
|
||||||
|
Older files (lower mtime) are forwarded first so backfill
|
||||||
|
proceeds chronologically.
|
||||||
|
"""
|
||||||
|
if not os.path.isdir(thordata_root):
|
||||||
|
log.warning("forward scan: thordata root not found: %s", thordata_root)
|
||||||
|
return []
|
||||||
|
|
||||||
|
now_ts = time.time()
|
||||||
|
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
||||||
|
|
||||||
|
# First pass: collect every event-binary entry with mtime for sorting.
|
||||||
|
candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path)
|
||||||
|
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
|
||||||
|
try:
|
||||||
|
entries = list(os.scandir(unit_path))
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
for e in entries:
|
||||||
|
if not e.is_file():
|
||||||
|
continue
|
||||||
|
if not is_event_binary(e.path):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
mtime = e.stat().st_mtime
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if (now_ts - mtime) > max_age_seconds:
|
||||||
|
continue
|
||||||
|
candidates.append((mtime, unit_path, e.path))
|
||||||
|
|
||||||
|
# Sort oldest-first so backfill is chronological.
|
||||||
|
candidates.sort(key=lambda t: t[0])
|
||||||
|
|
||||||
|
pending: List[Tuple[str, Optional[str]]] = []
|
||||||
|
skipped_inflight = 0
|
||||||
|
skipped_already_forwarded = 0
|
||||||
|
txt_cache: Dict[str, Dict[str, str]] = {}
|
||||||
|
|
||||||
|
for mtime, unit_path, binary_path in candidates:
|
||||||
|
if not _is_quiescent(binary_path, now_ts, quiescence_seconds):
|
||||||
|
skipped_inflight += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
digest = sha256_of_file(binary_path)
|
||||||
|
except OSError as exc:
|
||||||
|
log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc)
|
||||||
|
continue
|
||||||
|
fwd_status = state.status(digest)
|
||||||
|
if fwd_status is True:
|
||||||
|
skipped_already_forwarded += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
binary_name = os.path.basename(binary_path)
|
||||||
|
txt_path = _find_txt_in_unit(
|
||||||
|
unit_path, binary_name, now_ts, quiescence_seconds, txt_cache,
|
||||||
|
)
|
||||||
|
|
||||||
|
if fwd_status is False:
|
||||||
|
# Previously forwarded WITHOUT report. Re-forward only
|
||||||
|
# if the TXT is now present so seismo-relay's upsert can
|
||||||
|
# refresh the row with authoritative device values.
|
||||||
|
if txt_path is None:
|
||||||
|
skipped_already_forwarded += 1
|
||||||
|
continue
|
||||||
|
elif txt_path is None:
|
||||||
|
# First-time forward and TXT not yet present. Wait for
|
||||||
|
# the grace period before forwarding alone.
|
||||||
|
if (now_ts - mtime) < missing_report_grace_seconds:
|
||||||
|
skipped_inflight += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
pending.append((binary_path, txt_path))
|
||||||
|
|
||||||
|
if max_per_pass and len(pending) >= max_per_pass:
|
||||||
|
break
|
||||||
|
|
||||||
|
log.debug(
|
||||||
|
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
|
||||||
|
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
|
||||||
|
)
|
||||||
|
return pending
|
||||||
|
|
||||||
|
|
||||||
|
# ── Multipart upload ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_multipart(
|
||||||
|
parts: List[Tuple[str, str, str, bytes]],
|
||||||
|
) -> Tuple[bytes, str]:
|
||||||
|
"""Encode a list of (field_name, filename, content_type, data) tuples
|
||||||
|
as a multipart/form-data body. Returns (body_bytes, content_type
|
||||||
|
header value)."""
|
||||||
|
boundary = "----ThorWatcherBoundary" + os.urandom(8).hex()
|
||||||
|
chunks: List[bytes] = []
|
||||||
|
for field_name, filename, content_type, data in parts:
|
||||||
|
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
|
||||||
|
chunks.append(
|
||||||
|
(f'Content-Disposition: form-data; name="{field_name}"; '
|
||||||
|
f'filename="{filename}"\r\n').encode("ascii")
|
||||||
|
)
|
||||||
|
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
|
||||||
|
chunks.append(data)
|
||||||
|
chunks.append(b"\r\n")
|
||||||
|
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
|
||||||
|
body = b"".join(chunks)
|
||||||
|
content_type_hdr = f"multipart/form-data; boundary={boundary}"
|
||||||
|
return body, content_type_hdr
|
||||||
|
|
||||||
|
|
||||||
|
def _import_endpoint(sfm_url: str) -> str:
|
||||||
|
"""Compose the import endpoint URL from a base SFM URL."""
|
||||||
|
return sfm_url.rstrip("/") + "/db/import/idf_file"
|
||||||
|
|
||||||
|
|
||||||
|
def forward_event_pair(
|
||||||
|
sfm_url: str,
|
||||||
|
binary_path: str,
|
||||||
|
txt_path: Optional[str],
|
||||||
|
*,
|
||||||
|
serial_hint: Optional[str] = None,
|
||||||
|
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""POST a single event (binary + optional .txt) to the SFM import
|
||||||
|
endpoint.
|
||||||
|
|
||||||
|
Returns a dict mirroring the per-file outcome the server returned
|
||||||
|
on success, or a dict with `status="error"` on transport/HTTP
|
||||||
|
failure.
|
||||||
|
"""
|
||||||
|
binary_name = os.path.basename(binary_path)
|
||||||
|
with open(binary_path, "rb") as f:
|
||||||
|
binary_bytes = f.read()
|
||||||
|
|
||||||
|
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
|
||||||
|
if txt_path is not None:
|
||||||
|
with open(txt_path, "rb") as f:
|
||||||
|
txt_bytes = f.read()
|
||||||
|
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
|
||||||
|
|
||||||
|
body, content_type = _encode_multipart(parts)
|
||||||
|
|
||||||
|
# Auto-derive serial from filename if caller didn't supply one.
|
||||||
|
if not serial_hint:
|
||||||
|
serial_hint = serial_from_filename(binary_name)
|
||||||
|
|
||||||
|
url = _import_endpoint(sfm_url)
|
||||||
|
if serial_hint:
|
||||||
|
sep = "&" if "?" in url else "?"
|
||||||
|
url = f"{url}{sep}serial={serial_hint}"
|
||||||
|
|
||||||
|
req = urllib.request.Request(
|
||||||
|
url, data=body, method="POST",
|
||||||
|
headers={
|
||||||
|
"Content-Type": content_type,
|
||||||
|
"Content-Length": str(len(body)),
|
||||||
|
"User-Agent": "thor-watcher/sfm-forwarder",
|
||||||
|
"Accept": "application/json",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||||
|
raw = resp.read().decode("utf-8", errors="replace")
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"filename": binary_name,
|
||||||
|
"detail": f"server returned non-JSON: {raw[:200]!r}",
|
||||||
|
}
|
||||||
|
for entry in (payload.get("results") or []):
|
||||||
|
if entry.get("filename") == binary_name and entry.get("status") == "ok":
|
||||||
|
return entry
|
||||||
|
for entry in (payload.get("results") or []):
|
||||||
|
if entry.get("filename") == binary_name:
|
||||||
|
return entry
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"filename": binary_name,
|
||||||
|
"detail": f"unexpected server response: {payload!r}",
|
||||||
|
}
|
||||||
|
except urllib.error.HTTPError as exc:
|
||||||
|
try:
|
||||||
|
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
|
||||||
|
except Exception:
|
||||||
|
body_excerpt = ""
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"filename": binary_name,
|
||||||
|
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
|
||||||
|
}
|
||||||
|
except urllib.error.URLError as exc:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"filename": binary_name,
|
||||||
|
"detail": f"connection error: {exc.reason}",
|
||||||
|
}
|
||||||
|
except (OSError, TimeoutError) as exc:
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"filename": binary_name,
|
||||||
|
"detail": f"transport error: {exc}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Top-level orchestration ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def forward_pending(
|
||||||
|
thordata_root: str,
|
||||||
|
sfm_url: str,
|
||||||
|
state: ForwardState,
|
||||||
|
*,
|
||||||
|
max_age_days: int,
|
||||||
|
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
|
||||||
|
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
|
||||||
|
timeout: float = DEFAULT_HTTP_TIMEOUT,
|
||||||
|
max_per_pass: int = 0,
|
||||||
|
logger: Optional[Any] = None,
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
"""
|
||||||
|
Run one full pass: find pending events, POST each one, update state.
|
||||||
|
|
||||||
|
Returns a counts dict suitable for logging:
|
||||||
|
|
||||||
|
{
|
||||||
|
"scanned": <int>, # event binaries selected for forward
|
||||||
|
"forwarded": <int>, # successfully POSTed this pass
|
||||||
|
"errors": <int>, # POST failures (will retry next pass)
|
||||||
|
"with_report":<int>, # of forwarded, how many had a paired TXT
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
def _log(msg: str) -> None:
|
||||||
|
if logger:
|
||||||
|
logger(msg)
|
||||||
|
else:
|
||||||
|
log.info(msg)
|
||||||
|
|
||||||
|
pending = find_pending_events(
|
||||||
|
thordata_root, state,
|
||||||
|
max_age_days=max_age_days,
|
||||||
|
quiescence_seconds=quiescence_seconds,
|
||||||
|
missing_report_grace_seconds=missing_report_grace_seconds,
|
||||||
|
max_per_pass=max_per_pass,
|
||||||
|
)
|
||||||
|
|
||||||
|
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
|
||||||
|
|
||||||
|
for binary_path, txt_path in pending:
|
||||||
|
result = forward_event_pair(
|
||||||
|
sfm_url, binary_path, txt_path,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
if result.get("status") == "ok":
|
||||||
|
try:
|
||||||
|
digest = sha256_of_file(binary_path)
|
||||||
|
size = os.path.getsize(binary_path)
|
||||||
|
state.mark_forwarded(
|
||||||
|
digest,
|
||||||
|
os.path.basename(binary_path),
|
||||||
|
size,
|
||||||
|
had_report=(txt_path is not None),
|
||||||
|
)
|
||||||
|
except OSError as exc:
|
||||||
|
_log(f"[forward] post-success state save failed for "
|
||||||
|
f"{os.path.basename(binary_path)}: {exc}")
|
||||||
|
counts["forwarded"] += 1
|
||||||
|
if txt_path:
|
||||||
|
counts["with_report"] += 1
|
||||||
|
|
||||||
|
if txt_path:
|
||||||
|
report_token = "+ {} attached".format(os.path.basename(txt_path))
|
||||||
|
elif is_histogram_event(binary_path):
|
||||||
|
report_token = "(histogram, no report expected)"
|
||||||
|
else:
|
||||||
|
report_token = "no report"
|
||||||
|
|
||||||
|
_log(
|
||||||
|
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
|
||||||
|
os.path.basename(binary_path),
|
||||||
|
result.get("filesize", 0),
|
||||||
|
report_token,
|
||||||
|
result.get("inserted", 0),
|
||||||
|
result.get("skipped", 0),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
counts["errors"] += 1
|
||||||
|
_log(
|
||||||
|
f"[forward] ERR {os.path.basename(binary_path)}: "
|
||||||
|
f"{result.get('detail', 'unknown error')}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def seed_state_from_folder(
|
||||||
|
thordata_root: str,
|
||||||
|
state: ForwardState,
|
||||||
|
*,
|
||||||
|
max_age_days: int = 365,
|
||||||
|
logger: Optional[Any] = None,
|
||||||
|
) -> Dict[str, int]:
|
||||||
|
"""Walk `thordata_root` and mark every existing event binary as
|
||||||
|
already forwarded — without POSTing anything.
|
||||||
|
|
||||||
|
Run this ONCE before enabling sfm_forward_enabled on a machine
|
||||||
|
with a large historical archive. The watcher then starts
|
||||||
|
forwarding only events that appear AFTER the seed run.
|
||||||
|
|
||||||
|
Returns a counts dict:
|
||||||
|
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
|
||||||
|
"""
|
||||||
|
def _log(msg: str) -> None:
|
||||||
|
if logger:
|
||||||
|
logger(msg)
|
||||||
|
else:
|
||||||
|
log.info(msg)
|
||||||
|
|
||||||
|
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
|
||||||
|
|
||||||
|
if not os.path.isdir(thordata_root):
|
||||||
|
_log(f"[seed] thordata root not found: {thordata_root}")
|
||||||
|
return counts
|
||||||
|
|
||||||
|
now_ts = time.time()
|
||||||
|
max_age_seconds = max(1, int(max_age_days)) * 86400.0
|
||||||
|
|
||||||
|
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
|
||||||
|
try:
|
||||||
|
entries = [e for e in os.scandir(unit_path) if e.is_file()]
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
for e in entries:
|
||||||
|
if not is_event_binary(e.path):
|
||||||
|
continue
|
||||||
|
counts["scanned"] += 1
|
||||||
|
try:
|
||||||
|
mtime = e.stat().st_mtime
|
||||||
|
size = e.stat().st_size
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if (now_ts - mtime) > max_age_seconds:
|
||||||
|
counts["skipped_too_old"] += 1
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
digest = sha256_of_file(e.path)
|
||||||
|
except OSError as exc:
|
||||||
|
_log(f"[seed] sha256 failed for {e.path}: {exc}")
|
||||||
|
continue
|
||||||
|
if state.is_forwarded(digest):
|
||||||
|
counts["already_known"] += 1
|
||||||
|
continue
|
||||||
|
state.mark_forwarded(digest, e.name, size)
|
||||||
|
counts["seeded"] += 1
|
||||||
|
if counts["seeded"] % 1000 == 0:
|
||||||
|
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
|
||||||
|
|
||||||
|
_log(
|
||||||
|
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
|
||||||
|
f"already_known={counts['already_known']} "
|
||||||
|
f"skipped_too_old={counts['skipped_too_old']}"
|
||||||
|
)
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
# ── CLI entry point ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _main() -> int:
|
||||||
|
"""Command-line interface for one-shot operations.
|
||||||
|
|
||||||
|
python event_forwarder.py --seed-state \\
|
||||||
|
--thordata "C:\\THORDATA" \\
|
||||||
|
--state "<path/to/thor_forwarded.json>" \\
|
||||||
|
[--max-age-days 365]
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Thor Watcher — SFM event forwarder utilities",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--seed-state", action="store_true",
|
||||||
|
help="Mark every event binary in --thordata as already-forwarded "
|
||||||
|
"(without POSTing). Use this BEFORE enabling sfm_forward "
|
||||||
|
"on a machine with a large historical archive.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--thordata", required=True,
|
||||||
|
help="Path to the THORDATA root folder.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--state", required=True,
|
||||||
|
help="Path to the JSON state file. Will be created if missing.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--max-age-days", type=int, default=365,
|
||||||
|
help="Only seed files newer than this many days (default 365).",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not args.seed_state:
|
||||||
|
parser.error("specify --seed-state (no other modes supported yet)")
|
||||||
|
|
||||||
|
print(f"[seed] thordata = {args.thordata}")
|
||||||
|
print(f"[seed] state = {args.state}")
|
||||||
|
print(f"[seed] max_age = {args.max_age_days} days")
|
||||||
|
|
||||||
|
state = ForwardState(args.state)
|
||||||
|
print(f"[seed] state currently has {state.count()} entries")
|
||||||
|
seed_state_from_folder(
|
||||||
|
args.thordata, state,
|
||||||
|
max_age_days=args.max_age_days,
|
||||||
|
logger=lambda m: print(m),
|
||||||
|
)
|
||||||
|
print(f"[seed] state now has {state.count()} entries")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
|
sys.exit(_main())
|
||||||
+103
-14
@@ -1,5 +1,5 @@
|
|||||||
"""
|
"""
|
||||||
Thor Watcher — Series 4 Ingest Agent v0.2.0
|
Thor Watcher — Series 4 Ingest Agent v0.3.0
|
||||||
|
|
||||||
Micromate (Series 4) ingest agent for Terra-View.
|
Micromate (Series 4) ingest agent for Terra-View.
|
||||||
|
|
||||||
@@ -7,6 +7,8 @@ Behavior:
|
|||||||
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
|
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
|
||||||
- For each UM####, finds the newest .MLG by timestamp in the filename
|
- For each UM####, finds the newest .MLG by timestamp in the filename
|
||||||
- Posts JSON heartbeat payload to Terra-View backend
|
- Posts JSON heartbeat payload to Terra-View backend
|
||||||
|
- Forwards .IDFH/.IDFW event files (+ TXT sidecars) to a seismo-relay
|
||||||
|
SFM server when sfm_forward_enabled=true. See event_forwarder.py.
|
||||||
- Tray-friendly: run_watcher(state, stop_event) for background thread use
|
- Tray-friendly: run_watcher(state, stop_event) for background thread use
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -22,10 +24,12 @@ from datetime import datetime, timezone
|
|||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
from socket import gethostname
|
from socket import gethostname
|
||||||
|
|
||||||
|
import event_forwarder
|
||||||
|
|
||||||
|
|
||||||
# ── Version ───────────────────────────────────────────────────────────────────
|
# ── Version ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
VERSION = "0.2.0"
|
VERSION = "0.3.0"
|
||||||
|
|
||||||
|
|
||||||
# ── Config ────────────────────────────────────────────────────────────────────
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||||||
@@ -54,6 +58,17 @@ def load_config(config_path: str) -> Dict[str, Any]:
|
|||||||
"update_source": "gitea",
|
"update_source": "gitea",
|
||||||
"update_url": "",
|
"update_url": "",
|
||||||
"debug": False,
|
"debug": False,
|
||||||
|
|
||||||
|
# SFM event forwarding — default OFF, opt-in via Settings.
|
||||||
|
"sfm_forward_enabled": False,
|
||||||
|
"sfm_url": "", # e.g. "http://10.0.0.44:8200"
|
||||||
|
"sfm_forward_interval": 60, # seconds between forward passes
|
||||||
|
"sfm_quiescence_seconds": 5,
|
||||||
|
"sfm_missing_report_grace_seconds": 60,
|
||||||
|
"sfm_http_timeout": 60,
|
||||||
|
"sfm_state_file": "", # blank → <log_dir>/thor_forwarded.json
|
||||||
|
"sfm_max_forwards_per_pass": 500,
|
||||||
|
"sfm_max_event_age_days": 365,
|
||||||
}
|
}
|
||||||
|
|
||||||
with open(config_path, "r", encoding="utf-8") as f:
|
with open(config_path, "r", encoding="utf-8") as f:
|
||||||
@@ -248,14 +263,17 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
|||||||
Main watcher loop. Runs in a background thread when launched from the tray.
|
Main watcher loop. Runs in a background thread when launched from the tray.
|
||||||
|
|
||||||
state keys written each cycle:
|
state keys written each cycle:
|
||||||
state["status"] — "running" | "error" | "starting"
|
state["status"] — "running" | "error" | "starting"
|
||||||
state["api_status"] — "ok" | "fail" | "disabled"
|
state["api_status"] — "ok" | "fail" | "disabled"
|
||||||
state["units"] — list of unit dicts for tray display
|
state["units"] — list of unit dicts for tray display
|
||||||
state["last_scan"] — datetime of last successful scan
|
state["last_scan"] — datetime of last successful scan
|
||||||
state["last_error"] — last error string or None
|
state["last_error"] — last error string or None
|
||||||
state["log_dir"] — directory containing the log file
|
state["log_dir"] — directory containing the log file
|
||||||
state["cfg"] — loaded config dict
|
state["cfg"] — loaded config dict
|
||||||
state["update_available"] — set True when API response signals an update
|
state["update_available"] — set True when API response signals an update
|
||||||
|
state["sfm_status"] — "ok" | "fail" | "disabled" | "ready"
|
||||||
|
state["last_forward"] — datetime of last forwarder pass (or None)
|
||||||
|
state["last_forward_counts"] — dict from event_forwarder.forward_pending
|
||||||
"""
|
"""
|
||||||
# Resolve config path
|
# Resolve config path
|
||||||
if getattr(sys, "frozen", False):
|
if getattr(sys, "frozen", False):
|
||||||
@@ -291,16 +309,56 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
|||||||
API_INTERVAL = int(cfg["api_interval"])
|
API_INTERVAL = int(cfg["api_interval"])
|
||||||
ENABLE_LOGGING = bool(cfg["enable_logging"])
|
ENABLE_LOGGING = bool(cfg["enable_logging"])
|
||||||
|
|
||||||
|
# SFM forwarder config
|
||||||
|
SFM_FORWARD_ENABLED = bool(cfg.get("sfm_forward_enabled", False))
|
||||||
|
SFM_URL = str(cfg.get("sfm_url", "")).strip()
|
||||||
|
SFM_FORWARD_INTERVAL = int(cfg.get("sfm_forward_interval", 60))
|
||||||
|
SFM_QUIESCENCE = int(cfg.get("sfm_quiescence_seconds", 5))
|
||||||
|
SFM_GRACE = int(cfg.get("sfm_missing_report_grace_seconds", 60))
|
||||||
|
SFM_HTTP_TIMEOUT = int(cfg.get("sfm_http_timeout", 60))
|
||||||
|
SFM_MAX_PER_PASS = int(cfg.get("sfm_max_forwards_per_pass", 500))
|
||||||
|
SFM_MAX_AGE_DAYS = int(cfg.get("sfm_max_event_age_days", 365))
|
||||||
|
sfm_state_path = str(cfg.get("sfm_state_file", "")).strip() or \
|
||||||
|
os.path.join(state["log_dir"], "thor_forwarded.json")
|
||||||
|
|
||||||
log_message(log_file, ENABLE_LOGGING,
|
log_message(log_file, ENABLE_LOGGING,
|
||||||
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
|
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={} SFM={}".format(
|
||||||
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
|
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL),
|
||||||
|
bool(SFM_FORWARD_ENABLED and SFM_URL),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
|
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={} SFM={}".format(
|
||||||
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
|
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL),
|
||||||
|
bool(SFM_FORWARD_ENABLED and SFM_URL),
|
||||||
))
|
))
|
||||||
|
|
||||||
|
# Initialize SFM forwarder state (if enabled)
|
||||||
|
sfm_state_obj: Optional[event_forwarder.ForwardState] = None
|
||||||
|
if SFM_FORWARD_ENABLED and SFM_URL:
|
||||||
|
try:
|
||||||
|
sfm_state_obj = event_forwarder.ForwardState(sfm_state_path)
|
||||||
|
state["sfm_status"] = "ready"
|
||||||
|
log_message(log_file, ENABLE_LOGGING,
|
||||||
|
"[sfm] forwarder ready url={} state_file={} known={}".format(
|
||||||
|
SFM_URL, sfm_state_path, sfm_state_obj.count(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print("[SFM] forwarder ready url={} known={}".format(
|
||||||
|
SFM_URL, sfm_state_obj.count(),
|
||||||
|
))
|
||||||
|
except Exception as exc:
|
||||||
|
state["sfm_status"] = "fail"
|
||||||
|
state["last_error"] = "SFM init failed: {}".format(exc)
|
||||||
|
log_message(log_file, ENABLE_LOGGING,
|
||||||
|
"[sfm] init failed: {}".format(exc))
|
||||||
|
else:
|
||||||
|
state["sfm_status"] = "disabled"
|
||||||
|
|
||||||
|
state["last_forward"] = None
|
||||||
|
state["last_forward_counts"] = None
|
||||||
|
|
||||||
last_api_ts = 0.0
|
last_api_ts = 0.0
|
||||||
|
last_forward_ts = 0.0
|
||||||
|
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
try:
|
try:
|
||||||
@@ -362,6 +420,37 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
|||||||
else:
|
else:
|
||||||
state["api_status"] = "disabled"
|
state["api_status"] = "disabled"
|
||||||
|
|
||||||
|
# ── SFM event forwarding ───────────────────────────────────────────
|
||||||
|
if sfm_state_obj is not None:
|
||||||
|
now_ts = time.time()
|
||||||
|
if now_ts - last_forward_ts >= SFM_FORWARD_INTERVAL:
|
||||||
|
last_forward_ts = now_ts
|
||||||
|
try:
|
||||||
|
counts = event_forwarder.forward_pending(
|
||||||
|
THORDATA_PATH, SFM_URL, sfm_state_obj,
|
||||||
|
max_age_days=SFM_MAX_AGE_DAYS,
|
||||||
|
quiescence_seconds=SFM_QUIESCENCE,
|
||||||
|
missing_report_grace_seconds=SFM_GRACE,
|
||||||
|
timeout=SFM_HTTP_TIMEOUT,
|
||||||
|
max_per_pass=SFM_MAX_PER_PASS,
|
||||||
|
logger=lambda m: log_message(log_file, ENABLE_LOGGING, m),
|
||||||
|
)
|
||||||
|
state["last_forward"] = datetime.now()
|
||||||
|
state["last_forward_counts"] = counts
|
||||||
|
if counts["errors"] > 0:
|
||||||
|
state["sfm_status"] = "fail"
|
||||||
|
else:
|
||||||
|
state["sfm_status"] = "ok"
|
||||||
|
summary = ("[sfm] pass scanned={scanned} forwarded={forwarded} "
|
||||||
|
"errors={errors} with_report={with_report}").format(**counts)
|
||||||
|
print(summary)
|
||||||
|
log_message(log_file, ENABLE_LOGGING, summary)
|
||||||
|
except Exception as exc:
|
||||||
|
state["sfm_status"] = "fail"
|
||||||
|
msg = "[sfm] pass failed: {}".format(exc)
|
||||||
|
print(msg)
|
||||||
|
log_message(log_file, ENABLE_LOGGING, msg)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err = "[loop-error] {}".format(e)
|
err = "[loop-error] {}".format(e)
|
||||||
print(err)
|
print(err)
|
||||||
|
|||||||
@@ -0,0 +1,716 @@
|
|||||||
|
"""
|
||||||
|
test_event_forwarder.py — unit tests for Thor Watcher's SFM event forwarder.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- is_event_binary() filename matching (positive + negative cases)
|
||||||
|
- parse_event_filename() / serial_from_filename()
|
||||||
|
- idf_report_path() — the TXT/ subfolder convention
|
||||||
|
- ForwardState load/save round-trip + idempotency check
|
||||||
|
- find_pending_events() against the THORDATA/<Project>/<Unit>/ tree,
|
||||||
|
plus quiescence + grace-period + re-pair logic
|
||||||
|
- _encode_multipart() byte-level shape (boundary + headers)
|
||||||
|
- forward_event_pair() end-to-end against a tiny stdlib HTTP server
|
||||||
|
that mimics seismo-relay's POST /db/import/idf_file endpoint
|
||||||
|
- seed_state_from_folder() walks the tree without POSTing
|
||||||
|
|
||||||
|
Stdlib only — runs with `python -m pytest test_event_forwarder.py`
|
||||||
|
on Python 3.8+ (the watcher's compat target).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import http.server
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import event_forwarder as ef
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _make_thordata(root: Path, project: str, unit: str) -> Path:
|
||||||
|
"""Create a THORDATA/<project>/<unit>/ folder pair; return unit_dir."""
|
||||||
|
unit_dir = root / project / unit
|
||||||
|
unit_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
return unit_dir
|
||||||
|
|
||||||
|
|
||||||
|
def _touch_with_age(p: Path, age_seconds: float, content: bytes = b"x") -> Path:
|
||||||
|
"""Create a file with controlled mtime."""
|
||||||
|
p.write_bytes(content)
|
||||||
|
target = time.time() - age_seconds
|
||||||
|
os.utime(p, (target, target))
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def _make_event(unit_dir: Path, name: str, age_seconds: float = 100,
|
||||||
|
content: bytes = b"x") -> Path:
|
||||||
|
return _touch_with_age(unit_dir / name, age_seconds, content)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_txt(unit_dir: Path, base_name: str, age_seconds: float = 100,
|
||||||
|
content: bytes = b"r") -> Path:
|
||||||
|
txt_dir = unit_dir / "TXT"
|
||||||
|
txt_dir.mkdir(exist_ok=True)
|
||||||
|
return _touch_with_age(txt_dir / ef.idf_report_name(base_name),
|
||||||
|
age_seconds, content)
|
||||||
|
|
||||||
|
|
||||||
|
# ── is_event_binary() ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsEventBinary(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_recognises_typical_thor_filenames(self):
|
||||||
|
for name in [
|
||||||
|
"UM11719_20231219163444.IDFH",
|
||||||
|
"UM11719_20231219162723.IDFW",
|
||||||
|
"BE9439_20200713124251.IDFH",
|
||||||
|
"UM13981_20220808082418.IDFH",
|
||||||
|
# case-insensitive
|
||||||
|
"um11719_20231219163444.idfh",
|
||||||
|
]:
|
||||||
|
self.assertTrue(ef.is_event_binary(name), name)
|
||||||
|
|
||||||
|
def test_rejects_non_event_extensions(self):
|
||||||
|
for name in [
|
||||||
|
"UM11719_20231219163436.MLG", # monitor log
|
||||||
|
"UM11719_20231219163444.IDFH.txt", # report sidecar
|
||||||
|
"UM11719_20231219164135.IDFW.CDB", # cache database variant
|
||||||
|
"UM11719_20231219164135.IDFH.CDB",
|
||||||
|
"agent.log",
|
||||||
|
"config.json",
|
||||||
|
"foo.bak",
|
||||||
|
"bar.tmp",
|
||||||
|
"UM11719_20231219163444.csv",
|
||||||
|
"UM11719_20231219163444.pdf",
|
||||||
|
"UM11719_20231219163444.html",
|
||||||
|
"UM11719_20231219163444.xml",
|
||||||
|
]:
|
||||||
|
self.assertFalse(ef.is_event_binary(name), name)
|
||||||
|
|
||||||
|
def test_rejects_malformed_filenames(self):
|
||||||
|
for name in [
|
||||||
|
"",
|
||||||
|
"no_extension",
|
||||||
|
"UM_20231219163444.IDFH", # missing serial digits
|
||||||
|
"1234_20231219163444.IDFH", # serial must start with letters
|
||||||
|
"UM11719_2023121916.IDFH", # short timestamp
|
||||||
|
"UM11719_20231219163444.IDFX", # wrong kind
|
||||||
|
"UM11719-20231219163444.IDFH", # wrong separator
|
||||||
|
]:
|
||||||
|
self.assertFalse(ef.is_event_binary(name), name)
|
||||||
|
|
||||||
|
def test_parse_event_filename(self):
|
||||||
|
from datetime import datetime
|
||||||
|
parsed = ef.parse_event_filename("UM11719_20231219163444.IDFW")
|
||||||
|
self.assertIsNotNone(parsed)
|
||||||
|
serial, ts, kind = parsed
|
||||||
|
self.assertEqual(serial, "UM11719")
|
||||||
|
self.assertEqual(ts, datetime(2023, 12, 19, 16, 34, 44))
|
||||||
|
self.assertEqual(kind, "IDFW")
|
||||||
|
|
||||||
|
def test_serial_from_filename(self):
|
||||||
|
self.assertEqual(ef.serial_from_filename("UM11719_20231219163444.IDFH"),
|
||||||
|
"UM11719")
|
||||||
|
self.assertEqual(ef.serial_from_filename("BE9439_20200713124251.IDFH"),
|
||||||
|
"BE9439")
|
||||||
|
self.assertIsNone(ef.serial_from_filename("not_an_event.bin"))
|
||||||
|
|
||||||
|
def test_idf_report_path_uses_txt_subfolder(self):
|
||||||
|
binary = "/foo/THORDATA/Project A/UM11719/UM11719_20231219163444.IDFW"
|
||||||
|
self.assertEqual(
|
||||||
|
ef.idf_report_path(binary),
|
||||||
|
os.path.join("/foo/THORDATA/Project A/UM11719",
|
||||||
|
"TXT", "UM11719_20231219163444.IDFW.txt"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_is_histogram_event(self):
|
||||||
|
self.assertTrue(ef.is_histogram_event("UM11719_20231219163444.IDFH"))
|
||||||
|
self.assertTrue(ef.is_histogram_event("um11719_20231219163444.idfh"))
|
||||||
|
self.assertFalse(ef.is_histogram_event("UM11719_20231219162723.IDFW"))
|
||||||
|
|
||||||
|
|
||||||
|
# ── ForwardState ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestForwardState(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_round_trip_persists_marked_entries(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
path = os.path.join(tmp, "fwd.json")
|
||||||
|
s = ef.ForwardState(path)
|
||||||
|
self.assertFalse(s.is_forwarded("abc123"))
|
||||||
|
s.mark_forwarded("abc123", "UM11719_20231219163444.IDFW", 8800)
|
||||||
|
self.assertTrue(s.is_forwarded("abc123"))
|
||||||
|
|
||||||
|
# Re-load from disk
|
||||||
|
s2 = ef.ForwardState(path)
|
||||||
|
self.assertTrue(s2.is_forwarded("abc123"))
|
||||||
|
self.assertEqual(s2.count(), 1)
|
||||||
|
|
||||||
|
def test_corrupt_state_file_starts_fresh(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
path = os.path.join(tmp, "fwd.json")
|
||||||
|
with open(path, "w") as f:
|
||||||
|
f.write("not valid json {{{")
|
||||||
|
s = ef.ForwardState(path)
|
||||||
|
self.assertEqual(s.count(), 0)
|
||||||
|
|
||||||
|
def test_version_mismatch_starts_fresh(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
path = os.path.join(tmp, "fwd.json")
|
||||||
|
with open(path, "w") as f:
|
||||||
|
json.dump({"version": 999, "forwarded": {"x": {}}}, f)
|
||||||
|
s = ef.ForwardState(path)
|
||||||
|
self.assertEqual(s.count(), 0)
|
||||||
|
|
||||||
|
def test_legacy_entries_default_to_had_report_true(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
path = os.path.join(tmp, "fwd.json")
|
||||||
|
with open(path, "w") as f:
|
||||||
|
json.dump({
|
||||||
|
"version": 1,
|
||||||
|
"forwarded": {
|
||||||
|
"abc123": {
|
||||||
|
"filename": "UM11719_20231219163444.IDFW",
|
||||||
|
"size": 123,
|
||||||
|
"forwarded_at": "2025-01-01T00:00:00Z",
|
||||||
|
# No had_report field — legacy entry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, f)
|
||||||
|
state = ef.ForwardState(path)
|
||||||
|
self.assertIs(state.status("abc123"), True)
|
||||||
|
|
||||||
|
def test_status_returns_none_for_unknown_sha(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
|
||||||
|
self.assertIs(state.status("never-seen"), None)
|
||||||
|
|
||||||
|
def test_mark_with_had_report_false_then_promote(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
|
||||||
|
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
|
||||||
|
had_report=False)
|
||||||
|
self.assertIs(state.status("xyz"), False)
|
||||||
|
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
|
||||||
|
had_report=True)
|
||||||
|
self.assertIs(state.status("xyz"), True)
|
||||||
|
|
||||||
|
|
||||||
|
# ── find_pending_events() ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestFindPendingEvents(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_returns_pair_when_both_files_present_and_quiescent(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=120, content=b"binary")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=100, content=b"report")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 1)
|
||||||
|
self.assertEqual(os.path.basename(pending[0][0]),
|
||||||
|
"UM11719_20231219163444.IDFW")
|
||||||
|
self.assertEqual(os.path.basename(pending[0][1]),
|
||||||
|
"UM11719_20231219163444.IDFW.txt")
|
||||||
|
|
||||||
|
def test_idfh_and_idfw_are_separate_events(self):
|
||||||
|
"""A single timestamp produces both .IDFH and .IDFW — they
|
||||||
|
forward as two independent events with their own state entries."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
|
||||||
|
age_seconds=120, content=b"histogram")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=120, content=b"waveform")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFH",
|
||||||
|
age_seconds=100, content=b"hreport")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=100, content=b"wreport")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 2)
|
||||||
|
names = sorted(os.path.basename(p[0]) for p in pending)
|
||||||
|
self.assertEqual(names, [
|
||||||
|
"UM11719_20231219163444.IDFH",
|
||||||
|
"UM11719_20231219163444.IDFW",
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_pairing_when_txt_is_in_unit_root_does_not_match(self):
|
||||||
|
"""Sidecars MUST live in the TXT/ subfolder. A stray .txt
|
||||||
|
next to the binary is not the canonical location and should
|
||||||
|
not be picked up as a sidecar."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=200, content=b"bin")
|
||||||
|
# .txt is in the unit dir, not unit/TXT/
|
||||||
|
_touch_with_age(unit_dir / "UM11719_20231219163444.IDFW.txt",
|
||||||
|
age_seconds=100, content=b"misplaced")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
# Forward proceeds (grace period elapsed), but WITHOUT pairing
|
||||||
|
self.assertEqual(len(pending), 1)
|
||||||
|
self.assertIsNone(pending[0][1])
|
||||||
|
|
||||||
|
def test_skips_if_already_forwarded(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=120, content=b"binary")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=100, content=b"report")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
digest = ef.sha256_of_file(str(bin_p))
|
||||||
|
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary"))
|
||||||
|
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_skips_if_too_fresh_to_be_quiescent(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=1, content=b"binary")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=1, content=b"report")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_forwards_alone_after_grace_when_txt_missing(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
|
||||||
|
age_seconds=200, content=b"binary")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 1)
|
||||||
|
bin_path, txt_path = pending[0]
|
||||||
|
self.assertEqual(os.path.basename(bin_path),
|
||||||
|
"UM11719_20231219163444.IDFH")
|
||||||
|
self.assertIsNone(txt_path)
|
||||||
|
|
||||||
|
def test_re_pair_after_late_arriving_txt(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=200, content=b"binary")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
digest = ef.sha256_of_file(str(bin_p))
|
||||||
|
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW",
|
||||||
|
len(b"binary"), had_report=False)
|
||||||
|
|
||||||
|
# First scan: TXT not present → still skipped.
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(pending, [])
|
||||||
|
|
||||||
|
# TXT finally appears.
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=100, content=b"report")
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 1)
|
||||||
|
self.assertEqual(os.path.basename(pending[0][1]),
|
||||||
|
"UM11719_20231219163444.IDFW.txt")
|
||||||
|
|
||||||
|
def test_defers_when_txt_missing_and_within_grace(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=15, content=b"binary")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_skips_old_files_beyond_max_age_days(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=10 * 86400, content=b"binary")
|
||||||
|
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=10 * 86400, content=b"report")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=1,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_ignores_mlg_and_other_non_event_files(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_dir = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219163436.MLG",
|
||||||
|
age_seconds=120, content=b"mlg")
|
||||||
|
_make_event(unit_dir, "UM11719_20231219164135.IDFW.CDB",
|
||||||
|
age_seconds=120, content=b"cache")
|
||||||
|
_touch_with_age(unit_dir / "agent.log", age_seconds=120, content=b"log")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_walks_multiple_projects_and_units(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit_a = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
unit_b = _make_thordata(root, "Project B", "BE9439")
|
||||||
|
_make_event(unit_a, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"a")
|
||||||
|
_make_event(unit_b, "BE9439_20200713131747.IDFW", age_seconds=200, content=b"b")
|
||||||
|
_make_txt(unit_a, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"ar")
|
||||||
|
_make_txt(unit_b, "BE9439_20200713131747.IDFW", age_seconds=100, content=b"br")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=10000, # BE event is from 2020
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 2)
|
||||||
|
names = sorted(os.path.basename(p[0]) for p in pending)
|
||||||
|
self.assertEqual(names, [
|
||||||
|
"BE9439_20200713131747.IDFW",
|
||||||
|
"UM11719_20231219163444.IDFW",
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_max_per_pass_caps_returned_count(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
for i in range(5):
|
||||||
|
name = "UM11719_2023121916344{}.IDFW".format(i)
|
||||||
|
_make_event(unit, name, age_seconds=120 + i, content=("bin-" + str(i)).encode())
|
||||||
|
_make_txt(unit, name, age_seconds=110 + i, content=b"report")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
max_per_pass=2,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 2)
|
||||||
|
|
||||||
|
def test_max_per_pass_returns_oldest_first(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
ages = [200, 150, 100, 50]
|
||||||
|
for i, age in enumerate(ages):
|
||||||
|
name = "UM11719_2023121916344{}.IDFW".format(i)
|
||||||
|
_make_event(unit, name, age_seconds=age, content=("c" + str(i)).encode())
|
||||||
|
_make_txt(unit, name, age_seconds=max(1, age - 10), content=b"r")
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
max_per_pass=2,
|
||||||
|
)
|
||||||
|
names = [os.path.basename(p[0]) for p in pending]
|
||||||
|
# Oldest two should be index 0 (200s) and 1 (150s)
|
||||||
|
self.assertEqual(names, [
|
||||||
|
"UM11719_20231219163440.IDFW",
|
||||||
|
"UM11719_20231219163441.IDFW",
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
# ── Seed-state mode ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestSeedStateFromFolder(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_seeds_every_in_window_event_without_posting(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
for i in range(3):
|
||||||
|
_make_event(unit, "UM11719_2023121916344{}.IDFW".format(i),
|
||||||
|
age_seconds=120 + i, content=("e" + str(i)).encode())
|
||||||
|
# Ignored
|
||||||
|
_make_event(unit, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "seed.json"))
|
||||||
|
counts = ef.seed_state_from_folder(str(root), state, max_age_days=30)
|
||||||
|
self.assertEqual(counts["scanned"], 3)
|
||||||
|
self.assertEqual(counts["seeded"], 3)
|
||||||
|
self.assertEqual(counts["already_known"], 0)
|
||||||
|
self.assertEqual(state.count(), 3)
|
||||||
|
|
||||||
|
def test_seeded_files_are_then_skipped_by_normal_scan(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
|
||||||
|
_make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=110, content=b"r")
|
||||||
|
_make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"y")
|
||||||
|
_make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=110, content=b"r")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "seed.json"))
|
||||||
|
ef.seed_state_from_folder(str(root), state, max_age_days=30)
|
||||||
|
|
||||||
|
pending = ef.find_pending_events(
|
||||||
|
str(root), state, max_age_days=30,
|
||||||
|
quiescence_seconds=5, missing_report_grace_seconds=60,
|
||||||
|
)
|
||||||
|
self.assertEqual(len(pending), 0)
|
||||||
|
|
||||||
|
def test_seed_is_idempotent(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "seed.json"))
|
||||||
|
counts1 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
|
||||||
|
counts2 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
|
||||||
|
self.assertEqual(counts1["seeded"], 1)
|
||||||
|
self.assertEqual(counts2["seeded"], 0)
|
||||||
|
self.assertEqual(counts2["already_known"], 1)
|
||||||
|
self.assertEqual(state.count(), 1)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Multipart encoder ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestMultipartEncoder(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_encodes_two_parts_with_proper_boundary(self):
|
||||||
|
body, content_type = ef._encode_multipart([
|
||||||
|
("files", "a.bin", "application/octet-stream", b"\x01\x02"),
|
||||||
|
("files", "a.txt", "text/plain", b"hello"),
|
||||||
|
])
|
||||||
|
self.assertTrue(content_type.startswith("multipart/form-data; boundary="))
|
||||||
|
boundary = content_type.split("boundary=", 1)[1]
|
||||||
|
self.assertIn(boundary.encode("ascii"), body)
|
||||||
|
|
||||||
|
text = body.decode("latin-1")
|
||||||
|
self.assertIn('name="files"; filename="a.bin"', text)
|
||||||
|
self.assertIn('name="files"; filename="a.txt"', text)
|
||||||
|
self.assertIn("Content-Type: application/octet-stream", text)
|
||||||
|
self.assertIn("Content-Type: text/plain", text)
|
||||||
|
self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--"))
|
||||||
|
|
||||||
|
|
||||||
|
# ── End-to-end forward_event_pair against a fake server ──────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeImportHandler(http.server.BaseHTTPRequestHandler):
|
||||||
|
"""Mimics seismo-relay's POST /db/import/idf_file response."""
|
||||||
|
received = [] # class-level capture for test inspection
|
||||||
|
|
||||||
|
def do_POST(self):
|
||||||
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
|
body = self.rfile.read(length)
|
||||||
|
ctype = self.headers.get("Content-Type", "")
|
||||||
|
|
||||||
|
parts = body.split(b"--" + ctype.split("boundary=")[-1].encode())
|
||||||
|
filenames = []
|
||||||
|
for p in parts:
|
||||||
|
for line in p.split(b"\r\n"):
|
||||||
|
if b'filename="' in line:
|
||||||
|
fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0]
|
||||||
|
filenames.append(fn.decode("latin-1"))
|
||||||
|
|
||||||
|
self.__class__.received.append({
|
||||||
|
"path": self.path,
|
||||||
|
"ctype": ctype,
|
||||||
|
"filenames": filenames,
|
||||||
|
})
|
||||||
|
|
||||||
|
results = []
|
||||||
|
binary_fn = next(
|
||||||
|
(fn for fn in filenames if not fn.lower().endswith(".txt")),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if binary_fn:
|
||||||
|
results.append({
|
||||||
|
"filename": binary_fn,
|
||||||
|
"status": "ok",
|
||||||
|
"stored_filename": binary_fn,
|
||||||
|
"filesize": len(body),
|
||||||
|
"sha256": "00" * 32,
|
||||||
|
"report_attached": any(fn.lower().endswith(".txt") for fn in filenames),
|
||||||
|
"inserted": 1,
|
||||||
|
"skipped": 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
payload = json.dumps({"count": len(results), "results": results}).encode()
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.send_header("Content-Length", str(len(payload)))
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(payload)
|
||||||
|
|
||||||
|
def log_message(self, *_a, **_kw): # silence the test runner
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _start_fake_server():
|
||||||
|
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler)
|
||||||
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
||||||
|
host, port = server.server_address
|
||||||
|
return server, f"http://{host}:{port}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestForwardEventPair(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
_FakeImportHandler.received = []
|
||||||
|
self.server, self.base_url = _start_fake_server()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.server.shutdown()
|
||||||
|
self.server.server_close()
|
||||||
|
|
||||||
|
def test_post_with_paired_report(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_p = Path(tmp)
|
||||||
|
bin_p = tmp_p / "UM11719_20231219163444.IDFW"
|
||||||
|
txt_p = tmp_p / "UM11719_20231219163444.IDFW.txt"
|
||||||
|
bin_p.write_bytes(b"\x10\x20\x30 binary")
|
||||||
|
txt_p.write_bytes(b'"SerialNumber : UM11719"\n')
|
||||||
|
|
||||||
|
result = ef.forward_event_pair(
|
||||||
|
self.base_url, str(bin_p), str(txt_p), timeout=5.0,
|
||||||
|
)
|
||||||
|
self.assertEqual(result["status"], "ok")
|
||||||
|
self.assertEqual(result["filename"], "UM11719_20231219163444.IDFW")
|
||||||
|
self.assertTrue(result["report_attached"])
|
||||||
|
|
||||||
|
self.assertEqual(len(_FakeImportHandler.received), 1)
|
||||||
|
req = _FakeImportHandler.received[0]
|
||||||
|
# Path includes the serial-hint auto-extracted from the filename
|
||||||
|
self.assertTrue(req["path"].startswith("/db/import/idf_file"))
|
||||||
|
self.assertIn("serial=UM11719", req["path"])
|
||||||
|
self.assertIn("UM11719_20231219163444.IDFW", req["filenames"])
|
||||||
|
self.assertIn("UM11719_20231219163444.IDFW.txt", req["filenames"])
|
||||||
|
|
||||||
|
def test_post_without_report(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
bin_p = Path(tmp) / "UM11719_20231219163444.IDFH"
|
||||||
|
bin_p.write_bytes(b"binary only")
|
||||||
|
|
||||||
|
result = ef.forward_event_pair(
|
||||||
|
self.base_url, str(bin_p), None, timeout=5.0,
|
||||||
|
)
|
||||||
|
self.assertEqual(result["status"], "ok")
|
||||||
|
self.assertFalse(result["report_attached"])
|
||||||
|
req = _FakeImportHandler.received[0]
|
||||||
|
self.assertEqual(req["filenames"], ["UM11719_20231219163444.IDFH"])
|
||||||
|
|
||||||
|
def test_explicit_serial_hint_overrides_auto(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
bin_p = Path(tmp) / "UM11719_20231219163444.IDFW"
|
||||||
|
bin_p.write_bytes(b"x")
|
||||||
|
ef.forward_event_pair(
|
||||||
|
self.base_url, str(bin_p), None,
|
||||||
|
serial_hint="OVERRIDE99", timeout=5.0,
|
||||||
|
)
|
||||||
|
req = _FakeImportHandler.received[0]
|
||||||
|
self.assertIn("serial=OVERRIDE99", req["path"])
|
||||||
|
|
||||||
|
|
||||||
|
# ── forward_pending() smoke test ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestForwardPending(unittest.TestCase):
|
||||||
|
"""End-to-end: tree → find → POST → state-update → no re-POST."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
_FakeImportHandler.received = []
|
||||||
|
self.server, self.base_url = _start_fake_server()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.server.shutdown()
|
||||||
|
self.server.server_close()
|
||||||
|
|
||||||
|
def test_pass_then_re_pass_is_idempotent(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
root = Path(tmp)
|
||||||
|
unit = _make_thordata(root, "Project A", "UM11719")
|
||||||
|
_make_event(unit, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=200, content=b"binary")
|
||||||
|
_make_txt(unit, "UM11719_20231219163444.IDFW",
|
||||||
|
age_seconds=100, content=b"report")
|
||||||
|
_make_event(unit, "UM11719_20231219163444.IDFH",
|
||||||
|
age_seconds=200, content=b"histogram")
|
||||||
|
_make_txt(unit, "UM11719_20231219163444.IDFH",
|
||||||
|
age_seconds=100, content=b"hreport")
|
||||||
|
|
||||||
|
state = ef.ForwardState(str(root / "fwd.json"))
|
||||||
|
counts = ef.forward_pending(
|
||||||
|
str(root), self.base_url, state,
|
||||||
|
max_age_days=30, quiescence_seconds=5,
|
||||||
|
missing_report_grace_seconds=60, timeout=5.0,
|
||||||
|
)
|
||||||
|
self.assertEqual(counts["scanned"], 2)
|
||||||
|
self.assertEqual(counts["forwarded"], 2)
|
||||||
|
self.assertEqual(counts["errors"], 0)
|
||||||
|
self.assertEqual(counts["with_report"], 2)
|
||||||
|
self.assertEqual(state.count(), 2)
|
||||||
|
self.assertEqual(len(_FakeImportHandler.received), 2)
|
||||||
|
|
||||||
|
# Re-pass: nothing pending; no new POSTs.
|
||||||
|
counts2 = ef.forward_pending(
|
||||||
|
str(root), self.base_url, state,
|
||||||
|
max_age_days=30, quiescence_seconds=5,
|
||||||
|
missing_report_grace_seconds=60, timeout=5.0,
|
||||||
|
)
|
||||||
|
self.assertEqual(counts2["scanned"], 0)
|
||||||
|
self.assertEqual(counts2["forwarded"], 0)
|
||||||
|
self.assertEqual(len(_FakeImportHandler.received), 2)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
+163
-5
@@ -1,5 +1,5 @@
|
|||||||
"""
|
"""
|
||||||
Thor Watcher — Settings Dialog v0.2.0
|
Thor Watcher — Settings Dialog v0.3.0
|
||||||
|
|
||||||
Provides a Tkinter settings dialog that doubles as a first-run wizard.
|
Provides a Tkinter settings dialog that doubles as a first-run wizard.
|
||||||
|
|
||||||
@@ -14,6 +14,8 @@ import tkinter as tk
|
|||||||
from tkinter import ttk, filedialog, messagebox
|
from tkinter import ttk, filedialog, messagebox
|
||||||
from socket import gethostname
|
from socket import gethostname
|
||||||
|
|
||||||
|
import series4_ingest as watcher
|
||||||
|
|
||||||
|
|
||||||
# ── Defaults (mirror config.example.json) ────────────────────────────────────
|
# ── Defaults (mirror config.example.json) ────────────────────────────────────
|
||||||
|
|
||||||
@@ -34,6 +36,17 @@ DEFAULTS = {
|
|||||||
"log_retention_days": 30,
|
"log_retention_days": 30,
|
||||||
"update_source": "gitea",
|
"update_source": "gitea",
|
||||||
"update_url": "",
|
"update_url": "",
|
||||||
|
|
||||||
|
# SFM forwarder defaults — mirror series4_ingest.load_config
|
||||||
|
"sfm_forward_enabled": False,
|
||||||
|
"sfm_url": "",
|
||||||
|
"sfm_forward_interval": 60,
|
||||||
|
"sfm_quiescence_seconds": 5,
|
||||||
|
"sfm_missing_report_grace_seconds": 60,
|
||||||
|
"sfm_http_timeout": 60,
|
||||||
|
"sfm_state_file": "",
|
||||||
|
"sfm_max_forwards_per_pass": 500,
|
||||||
|
"sfm_max_event_age_days": 365,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -145,7 +158,8 @@ class SettingsDialog:
|
|||||||
self.saved = False
|
self.saved = False
|
||||||
self.root = parent
|
self.root = parent
|
||||||
|
|
||||||
title = "Thor Watcher — Setup" if wizard else "Thor Watcher — Settings"
|
kind = "Setup" if wizard else "Settings"
|
||||||
|
title = "Thor Watcher v{} — {}".format(watcher.VERSION, kind)
|
||||||
self.root.title(title)
|
self.root.title(title)
|
||||||
self.root.resizable(False, False)
|
self.root.resizable(False, False)
|
||||||
self.root.update_idletasks()
|
self.root.update_idletasks()
|
||||||
@@ -192,6 +206,20 @@ class SettingsDialog:
|
|||||||
self.var_update_source = tk.StringVar(value=src)
|
self.var_update_source = tk.StringVar(value=src)
|
||||||
self.var_update_url = tk.StringVar(value=str(v.get("update_url", "")))
|
self.var_update_url = tk.StringVar(value=str(v.get("update_url", "")))
|
||||||
|
|
||||||
|
# SFM Forwarder
|
||||||
|
sfm_en = v.get("sfm_forward_enabled", False)
|
||||||
|
self.var_sfm_enabled = tk.BooleanVar(
|
||||||
|
value=bool(sfm_en) if isinstance(sfm_en, bool) else str(sfm_en).lower() in ("true", "1", "yes")
|
||||||
|
)
|
||||||
|
self.var_sfm_url = tk.StringVar(value=str(v.get("sfm_url", "")))
|
||||||
|
self.var_sfm_forward_interval = tk.StringVar(value=str(v.get("sfm_forward_interval", 60)))
|
||||||
|
self.var_sfm_quiescence = tk.StringVar(value=str(v.get("sfm_quiescence_seconds", 5)))
|
||||||
|
self.var_sfm_grace = tk.StringVar(value=str(v.get("sfm_missing_report_grace_seconds", 60)))
|
||||||
|
self.var_sfm_http_timeout = tk.StringVar(value=str(v.get("sfm_http_timeout", 60)))
|
||||||
|
self.var_sfm_max_per_pass = tk.StringVar(value=str(v.get("sfm_max_forwards_per_pass", 500)))
|
||||||
|
self.var_sfm_max_age_days = tk.StringVar(value=str(v.get("sfm_max_event_age_days", 365)))
|
||||||
|
self.var_sfm_state_file = tk.StringVar(value=str(v.get("sfm_state_file", "")))
|
||||||
|
|
||||||
# ── UI construction ───────────────────────────────────────────────────────
|
# ── UI construction ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _build_ui(self):
|
def _build_ui(self):
|
||||||
@@ -216,6 +244,7 @@ class SettingsDialog:
|
|||||||
self._build_tab_paths(nb)
|
self._build_tab_paths(nb)
|
||||||
self._build_tab_scanning(nb)
|
self._build_tab_scanning(nb)
|
||||||
self._build_tab_logging(nb)
|
self._build_tab_logging(nb)
|
||||||
|
self._build_tab_forwarding(nb)
|
||||||
self._build_tab_updates(nb)
|
self._build_tab_updates(nb)
|
||||||
|
|
||||||
btn_frame = tk.Frame(outer)
|
btn_frame = tk.Frame(outer)
|
||||||
@@ -347,6 +376,114 @@ class SettingsDialog:
|
|||||||
_add_label_check(f, 0, "Enable Logging", self.var_enable_logging)
|
_add_label_check(f, 0, "Enable Logging", self.var_enable_logging)
|
||||||
_add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365)
|
_add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365)
|
||||||
|
|
||||||
|
def _build_tab_forwarding(self, nb):
|
||||||
|
f = self._tab_frame(nb, "SFM Forward")
|
||||||
|
|
||||||
|
# Row 0: enable checkbox
|
||||||
|
_add_label_check(f, 0, "Enable SFM Forwarding", self.var_sfm_enabled)
|
||||||
|
|
||||||
|
# Row 1: SFM URL + Test button
|
||||||
|
tk.Label(f, text="SFM URL", anchor="w").grid(
|
||||||
|
row=1, column=0, sticky="w", padx=(8, 4), pady=4
|
||||||
|
)
|
||||||
|
url_frame = tk.Frame(f)
|
||||||
|
url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
|
||||||
|
url_frame.columnconfigure(0, weight=1)
|
||||||
|
|
||||||
|
sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32)
|
||||||
|
sfm_entry.grid(row=0, column=0, sticky="ew")
|
||||||
|
|
||||||
|
_hint = "http://10.0.0.44:8200"
|
||||||
|
if not self.var_sfm_url.get():
|
||||||
|
sfm_entry.config(foreground="grey")
|
||||||
|
sfm_entry.insert(0, _hint)
|
||||||
|
|
||||||
|
def _on_focus_in(e, ent=sfm_entry, h=_hint):
|
||||||
|
if ent.get() == h:
|
||||||
|
ent.delete(0, tk.END)
|
||||||
|
ent.config(foreground="black")
|
||||||
|
|
||||||
|
def _on_focus_out(e, ent=sfm_entry, h=_hint, v=self.var_sfm_url):
|
||||||
|
if not ent.get():
|
||||||
|
ent.config(foreground="grey")
|
||||||
|
ent.insert(0, h)
|
||||||
|
v.set("")
|
||||||
|
|
||||||
|
sfm_entry.bind("<FocusIn>", _on_focus_in)
|
||||||
|
sfm_entry.bind("<FocusOut>", _on_focus_out)
|
||||||
|
|
||||||
|
self._sfm_test_btn = ttk.Button(url_frame, text="Test", width=6,
|
||||||
|
command=self._test_sfm_connection)
|
||||||
|
self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0))
|
||||||
|
|
||||||
|
self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20)
|
||||||
|
self._sfm_test_status.grid(row=0, column=2, padx=(6, 0))
|
||||||
|
|
||||||
|
# Rows 2-7: timing/limits spinboxes
|
||||||
|
_add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 30, 3600)
|
||||||
|
_add_label_spinbox(f, 3, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60)
|
||||||
|
_add_label_spinbox(f, 4, "Missing-Report Grace (sec)", self.var_sfm_grace, 0, 3600)
|
||||||
|
_add_label_spinbox(f, 5, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 300)
|
||||||
|
_add_label_spinbox(f, 6, "Max Forwards Per Pass", self.var_sfm_max_per_pass, 1, 5000)
|
||||||
|
_add_label_spinbox(f, 7, "Max Event Age (days)", self.var_sfm_max_age_days, 1, 3650)
|
||||||
|
|
||||||
|
# Row 8: state file browse
|
||||||
|
def browse_state():
|
||||||
|
p = filedialog.asksaveasfilename(
|
||||||
|
title="Select SFM State File",
|
||||||
|
defaultextension=".json",
|
||||||
|
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
|
||||||
|
initialfile=os.path.basename(self.var_sfm_state_file.get() or "thor_forwarded.json"),
|
||||||
|
initialdir=os.path.dirname(self.var_sfm_state_file.get() or "C:\\"),
|
||||||
|
)
|
||||||
|
if p:
|
||||||
|
self.var_sfm_state_file.set(p.replace("/", "\\"))
|
||||||
|
|
||||||
|
_add_label_browse_entry(f, 8, "State File", self.var_sfm_state_file, browse_state)
|
||||||
|
|
||||||
|
# Row 9: help text
|
||||||
|
help_text = (
|
||||||
|
"Forwards .IDFH (histogram) and .IDFW (waveform) event files plus their\n"
|
||||||
|
"TXT/<basename>.txt sidecars to a seismo-relay SFM server.\n"
|
||||||
|
"Idempotent: each file is tracked by sha256, so re-scans never re-POST.\n"
|
||||||
|
"If the TXT sidecar appears AFTER the binary was forwarded alone, the\n"
|
||||||
|
"next pass will re-forward so the relay can refresh the DB row with\n"
|
||||||
|
"device-authoritative PPV/ZCFreq/peak values.\n"
|
||||||
|
"State file blank → defaults to <log_dir>\\thor_forwarded.json."
|
||||||
|
)
|
||||||
|
tk.Label(
|
||||||
|
f, text=help_text, justify="left", fg="#555555", wraplength=420,
|
||||||
|
).grid(row=9, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4))
|
||||||
|
|
||||||
|
def _test_sfm_connection(self):
|
||||||
|
import urllib.request
|
||||||
|
import urllib.error
|
||||||
|
|
||||||
|
self._sfm_test_status.config(text="Testing...", foreground="grey")
|
||||||
|
self._sfm_test_btn.config(state="disabled")
|
||||||
|
self.root.update_idletasks()
|
||||||
|
|
||||||
|
raw = self.var_sfm_url.get().strip()
|
||||||
|
if not raw or raw == "http://10.0.0.44:8200":
|
||||||
|
self._sfm_test_status.config(text="Enter a URL first", foreground="orange")
|
||||||
|
self._sfm_test_btn.config(state="normal")
|
||||||
|
return
|
||||||
|
|
||||||
|
url = raw.rstrip("/") + "/health"
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(urllib.request.Request(url), timeout=5) as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
self._sfm_test_status.config(text="Connected!", foreground="green")
|
||||||
|
else:
|
||||||
|
self._sfm_test_status.config(text="HTTP {}".format(resp.status), foreground="orange")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
reason = str(e.reason) if hasattr(e, "reason") else str(e)
|
||||||
|
self._sfm_test_status.config(text="Failed: {}".format(reason[:30]), foreground="red")
|
||||||
|
except Exception as e:
|
||||||
|
self._sfm_test_status.config(text="Error: {}".format(str(e)[:30]), foreground="red")
|
||||||
|
finally:
|
||||||
|
self._sfm_test_btn.config(state="normal")
|
||||||
|
|
||||||
def _build_tab_updates(self, nb):
|
def _build_tab_updates(self, nb):
|
||||||
f = self._tab_frame(nb, "Updates")
|
f = self._tab_frame(nb, "Updates")
|
||||||
|
|
||||||
@@ -421,9 +558,15 @@ class SettingsDialog:
|
|||||||
|
|
||||||
def _on_save(self):
|
def _on_save(self):
|
||||||
checks = [
|
checks = [
|
||||||
(self.var_api_interval, "API Interval", 30, 3600),
|
(self.var_api_interval, "API Interval", 30, 3600),
|
||||||
(self.var_scan_interval, "Scan Interval", 10, 3600),
|
(self.var_scan_interval, "Scan Interval", 10, 3600),
|
||||||
(self.var_log_retention_days, "Log Retention Days", 1, 365),
|
(self.var_log_retention_days, "Log Retention Days", 1, 365),
|
||||||
|
(self.var_sfm_forward_interval, "Forward Interval", 30, 3600),
|
||||||
|
(self.var_sfm_quiescence, "Quiescence", 1, 60),
|
||||||
|
(self.var_sfm_grace, "Missing-Report Grace", 0, 3600),
|
||||||
|
(self.var_sfm_http_timeout, "HTTP Timeout", 5, 300),
|
||||||
|
(self.var_sfm_max_per_pass, "Max Forwards Per Pass", 1, 5000),
|
||||||
|
(self.var_sfm_max_age_days, "Max Event Age (days)", 1, 3650),
|
||||||
]
|
]
|
||||||
int_values = {}
|
int_values = {}
|
||||||
for var, name, mn, mx in checks:
|
for var, name, mn, mx in checks:
|
||||||
@@ -442,6 +585,11 @@ class SettingsDialog:
|
|||||||
else:
|
else:
|
||||||
api_url = api_url.rstrip("/") + "/api/series4/heartbeat"
|
api_url = api_url.rstrip("/") + "/api/series4/heartbeat"
|
||||||
|
|
||||||
|
sfm_url = self.var_sfm_url.get().strip()
|
||||||
|
if sfm_url == "http://10.0.0.44:8200":
|
||||||
|
sfm_url = ""
|
||||||
|
sfm_url = sfm_url.rstrip("/") # event_forwarder adds the endpoint path
|
||||||
|
|
||||||
values = {
|
values = {
|
||||||
"thordata_path": self.var_thordata_path.get().strip(),
|
"thordata_path": self.var_thordata_path.get().strip(),
|
||||||
"scan_interval": int_values["Scan Interval"],
|
"scan_interval": int_values["Scan Interval"],
|
||||||
@@ -456,6 +604,16 @@ class SettingsDialog:
|
|||||||
"log_retention_days": int_values["Log Retention Days"],
|
"log_retention_days": int_values["Log Retention Days"],
|
||||||
"update_source": self.var_update_source.get().strip() or "gitea",
|
"update_source": self.var_update_source.get().strip() or "gitea",
|
||||||
"update_url": self.var_update_url.get().strip(),
|
"update_url": self.var_update_url.get().strip(),
|
||||||
|
|
||||||
|
"sfm_forward_enabled": self.var_sfm_enabled.get(),
|
||||||
|
"sfm_url": sfm_url,
|
||||||
|
"sfm_forward_interval": int_values["Forward Interval"],
|
||||||
|
"sfm_quiescence_seconds": int_values["Quiescence"],
|
||||||
|
"sfm_missing_report_grace_seconds": int_values["Missing-Report Grace"],
|
||||||
|
"sfm_http_timeout": int_values["HTTP Timeout"],
|
||||||
|
"sfm_max_forwards_per_pass": int_values["Max Forwards Per Pass"],
|
||||||
|
"sfm_max_event_age_days": int_values["Max Event Age (days)"],
|
||||||
|
"sfm_state_file": self.var_sfm_state_file.get().strip(),
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
+25
-2
@@ -1,5 +1,5 @@
|
|||||||
"""
|
"""
|
||||||
Thor Watcher — System Tray Launcher v0.2.0
|
Thor Watcher — System Tray Launcher v0.3.0
|
||||||
Requires: pystray, Pillow, tkinter (stdlib)
|
Requires: pystray, Pillow, tkinter (stdlib)
|
||||||
|
|
||||||
Run with: pythonw thor_tray.py (no console window)
|
Run with: pythonw thor_tray.py (no console window)
|
||||||
@@ -420,7 +420,25 @@ class WatcherTray:
|
|||||||
else:
|
else:
|
||||||
api_str = "API off"
|
api_str = "API off"
|
||||||
|
|
||||||
return "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
|
base_line = "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
|
||||||
|
|
||||||
|
sfm_status = self.state.get("sfm_status", "disabled")
|
||||||
|
if sfm_status in ("ok", "fail", "ready"):
|
||||||
|
counts = self.state.get("last_forward_counts") or {}
|
||||||
|
fwd = counts.get("forwarded", 0)
|
||||||
|
errs = counts.get("errors", 0)
|
||||||
|
last_fwd = self.state.get("last_forward")
|
||||||
|
if last_fwd is not None:
|
||||||
|
fwd_age = int((datetime.now() - last_fwd).total_seconds())
|
||||||
|
fwd_age_str = "{}s ago".format(fwd_age) if fwd_age < 60 else "{}m ago".format(fwd_age // 60)
|
||||||
|
else:
|
||||||
|
fwd_age_str = "pending"
|
||||||
|
sfm_line = "SFM {} | {} fwd, {} err | last {}".format(
|
||||||
|
sfm_status.upper(), fwd, errs, fwd_age_str,
|
||||||
|
)
|
||||||
|
return base_line + "\n" + sfm_line
|
||||||
|
|
||||||
|
return base_line
|
||||||
|
|
||||||
def _tray_status(self):
|
def _tray_status(self):
|
||||||
status = self.state.get("status", "starting")
|
status = self.state.get("status", "starting")
|
||||||
@@ -431,12 +449,17 @@ class WatcherTray:
|
|||||||
api_status = self.state.get("api_status", "disabled")
|
api_status = self.state.get("api_status", "disabled")
|
||||||
if api_status == "fail":
|
if api_status == "fail":
|
||||||
return "missing" # red — API failing
|
return "missing" # red — API failing
|
||||||
|
sfm_status = self.state.get("sfm_status", "disabled")
|
||||||
|
if api_status == "ok" and sfm_status == "fail":
|
||||||
|
return "pending" # amber — heartbeat OK but forwarder is failing
|
||||||
if api_status == "disabled":
|
if api_status == "disabled":
|
||||||
return "pending" # amber — running but not reporting
|
return "pending" # amber — running but not reporting
|
||||||
return "ok" # green — running and API good
|
return "ok" # green — running and API good
|
||||||
|
|
||||||
def _build_menu(self):
|
def _build_menu(self):
|
||||||
return pystray.Menu(
|
return pystray.Menu(
|
||||||
|
pystray.MenuItem("Thor Watcher v{}".format(_CURRENT_VERSION), None, enabled=False),
|
||||||
|
pystray.Menu.SEPARATOR,
|
||||||
pystray.MenuItem(lambda item: self._status_text(), None, enabled=False),
|
pystray.MenuItem(lambda item: self._status_text(), None, enabled=False),
|
||||||
pystray.Menu.SEPARATOR,
|
pystray.Menu.SEPARATOR,
|
||||||
pystray.MenuItem("Settings...", self._open_settings),
|
pystray.MenuItem("Settings...", self._open_settings),
|
||||||
|
|||||||
Reference in New Issue
Block a user