5 Commits

12 changed files with 3449 additions and 486 deletions
BIN
View File
Binary file not shown.
+36
View File
@@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.3.0] - 2026-05-19
### Added
- `event_forwarder.py` — forwards `.IDFH` (histogram) and `.IDFW` (waveform) event files plus their `TXT/<basename>.txt` sidecars to a seismo-relay SFM server's new `/db/import/idf_file` endpoint
- Sha256-keyed `thor_forwarded.json` state file for idempotency across restarts and re-scans (default path: `<log_dir>/thor_forwarded.json`)
- "SFM Forward" tab in Settings dialog: enable/URL/Test, forward interval, quiescence, missing-report grace, HTTP timeout, max forwards per pass, max event age, state file picker
- Forwarder status line in tray menu: `SFM OK | N fwd, M err | last 30s ago`
- Tray icon goes amber when the SFM forwarder is failing but the API heartbeat is still healthy
- Re-pair logic: events forwarded without their TXT are re-forwarded once the sidecar appears so the relay can refresh DB rows with device-authoritative PPV/ZCFreq/peak values
- `event_forwarder.py --seed-state` CLI for skipping historical backfill on a first deploy
- Version badge: `Thor Watcher vX.Y.Z` shown at the top of the tray menu and in the Settings dialog title bar — operators no longer have to crack open the .exe properties to tell which version is running
### Changed
- Bumped `VERSION` to `0.3.0`
- Settings dialog tab order: Connection / Paths / Scanning / Logging / **SFM Forward** / Updates
## [0.2.0] - 2026-03-20
### Added
- `thor_tray.py` — system tray launcher with status icon (green/amber/red/grey), Settings and Open Log Folder menu items
- `thor_settings_dialog.py` — Tkinter settings dialog with first-run wizard; tabs for Connection, Paths, Scanning, Logging, Updates
- Hardened auto-updater: three-layer download validation (100 KB floor, 50% relative size floor, MZ magic bytes), safer swap bat with 5-retry cap and `:fail` exit, `.exe.old` backup
- Configurable update source: `update_source` (gitea / url / disabled), `update_url` for custom server
- Remote push support: `update_available` flag in API heartbeat response triggers update regardless of `update_source` setting
- `build.bat` — PyInstaller build script; outputs versioned exe for Gitea and plain copy for Inno Setup
- `installer.iss` — Inno Setup installer script with startup shortcut
- `_update_log()` helper writes timestamped `[updater]` lines to the watcher log
- `log_tail` included in heartbeat payload (last 25 lines) for terra-view display
- `run_watcher(state, stop_event)` pattern in `series4_ingest.py` for background thread use from tray
### Changed
- `series4_ingest.py` refactored into tray-friendly background thread module; `main()` retained for standalone use
- Config key `sfm_endpoint` renamed to `api_url` for consistency with series3-watcher
- Heartbeat payload now uses `source_id`, `source_type`, `version` fields matching terra-view WatcherAgent model
- AppData folder: `ThorWatcher` (was not previously defined)
## [0.1.1] - 2025-12-08 ## [0.1.1] - 2025-12-08
### Changed ### Changed
+128 -155
View File
@@ -1,205 +1,178 @@
# Series 4 Ingest Agent # Thor Watcher
**Version:** 0.1.2 **Version:** 0.3.0
Micromate (Series 4) ingest agent for Seismo Fleet Manager (SFM). Micromate (Series 4) watcher agent for Terra-View fleet management. Runs as a Windows system tray application, scans THORDATA for Micromate unit activity, sends heartbeat data to Terra-View, and (optionally) forwards `.IDFH`/`.IDFW` event files to a seismo-relay SFM server.
---
## Overview ## Overview
Series 4 Ingest Agent is a Python-based monitoring tool that scans for Micromate unit activity logs and reports their status. It monitors MLG (Micromate Log) files in the THORDATA directory structure, determines unit health based on last activity, and optionally transmits heartbeat data to a Seismo Fleet Manager backend. Thor Watcher monitors `C:\THORDATA\<Project>\<UM####>\*.MLG` files, determines each unit's last activity from the MLG filename timestamp, and periodically posts a heartbeat payload to the Terra-View backend. It runs silently in the system tray and auto-starts on login.
## Features ---
- **Automatic MLG File Discovery**: Scans `C:\THORDATA\<Project>\<UM####>\*.MLG` for Micromate units
- **Intelligent Unit Tracking**: Automatically identifies the newest MLG file per unit based on timestamp
- **Status Classification**: Categorizes units as OK, LATE, or STALE based on configurable age thresholds
- **Console Heartbeat Display**: Real-time status dashboard with formatted output
- **SFM Backend Integration**: Optional HTTP POST of JSON telemetry to Fleet Manager
- **External Configuration**: JSON-based configuration file for easy customization
- **Robust Error Handling**: Graceful degradation with informative warnings
- **No External Dependencies**: Uses Python standard library only (urllib, json, datetime, etc.)
## Requirements
- Python 3.6 or higher
- Windows OS (designed for THOR PC/VM environments)
- Access to THORDATA directory structure
## Installation ## Installation
1. Clone or copy this repository to your THOR PC/VM 1. Run `thor-watcher-setup.exe`
2. Ensure Python 3.6+ is installed 2. On first launch the setup wizard will appear — enter your THORDATA path and Terra-View URL
3. Configure `config.json` (see Configuration section) 3. The app starts in the system tray and launches automatically on login
4. Run the agent:
```bash ---
python series4_ingest.py
## Building from Source
Requires Python 3.10+ and pip on PATH.
```bat
build.bat
``` ```
Produces:
- `dist\thor-watcher-0.3.0.exe` — upload to Gitea release
- `dist\thor-watcher.exe` — use with Inno Setup
Then run Inno Setup Compiler on `installer.iss` to produce `thor-watcher-setup.exe`.
---
## Configuration ## Configuration
All configuration is managed through `config.json` in the same directory as the script. Config is stored at:
```
%LOCALAPPDATA%\ThorWatcher\config.json
```
### config.json Structure Managed through the Settings dialog (right-click tray icon → Settings). A `config.example.json` is included as reference.
### Config Keys
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `thordata_path` | string | `C:\THORDATA` | Root THORDATA directory |
| `scan_interval` | integer | `60` | Seconds between scans |
| `api_url` | string | `""` | Terra-View heartbeat URL (e.g. `http://10.0.0.40:8000/api/series4/heartbeat`) |
| `api_timeout` | integer | `5` | HTTP request timeout in seconds |
| `api_interval` | integer | `300` | Seconds between API heartbeat POSTs |
| `source_id` | string | hostname | Identifier for this machine in Terra-View |
| `source_type` | string | `series4_watcher` | Agent type (do not change) |
| `local_timezone` | string | `America/New_York` | Timezone of the field machine — used to convert MLG timestamps to UTC |
| `enable_logging` | boolean | `true` | Write log file |
| `log_file` | string | `%LOCALAPPDATA%\ThorWatcher\agent_logs\thor_watcher.log` | Log file path |
| `log_retention_days` | integer | `30` | Days before log is auto-cleared |
| `update_source` | string | `gitea` | Auto-update source: `gitea`, `url`, or `disabled` |
| `update_url` | string | `""` | Base URL for `url` mode (e.g. Terra-View server) |
| `sfm_forward_enabled` | boolean | `false` | Forward `.IDFH`/`.IDFW` event files to a seismo-relay SFM server |
| `sfm_url` | string | `""` | Base URL of the seismo-relay SFM server (e.g. `http://10.0.0.44:8200`) |
| `sfm_forward_interval` | integer | `60` | Seconds between forwarder passes |
| `sfm_quiescence_seconds` | integer | `5` | Skip files modified within the last N seconds (avoid in-flight files) |
| `sfm_missing_report_grace_seconds` | integer | `60` | Forward a binary without its `.txt` sidecar if it hasn't appeared after N seconds |
| `sfm_http_timeout` | integer | `60` | HTTP timeout per forward POST |
| `sfm_state_file` | string | `""` | Path to the sha256-keyed state file. Blank → `<log_dir>\thor_forwarded.json` |
| `sfm_max_forwards_per_pass` | integer | `500` | Cap per pass to drip-feed large backfills |
| `sfm_max_event_age_days` | integer | `365` | Skip event files older than this many days |
---
## Event Forwarding
When `sfm_forward_enabled` is true and `sfm_url` is set, Thor Watcher walks the THORDATA tree each `sfm_forward_interval` seconds, finds `.IDFH` (histogram) and `.IDFW` (waveform) event binaries plus their `TXT/<basename>.txt` ASCII sidecars, and POSTs them to seismo-relay's `/db/import/idf_file` endpoint.
- **Idempotent.** Every forwarded file is recorded by sha256 in `thor_forwarded.json`. Re-scans never re-POST.
- **Default off.** Operators must explicitly enable from the Settings → SFM Forward tab.
- **Re-pair logic.** If a binary is forwarded before its TXT sidecar appears (after the grace period), it's flagged `had_report=false` and re-forwarded once the TXT arrives so the SFM database row can be refreshed with device-authoritative PPV/ZCFreq/peak values.
- **TXT export must be enabled in Thor.** Thor's TXT sidecars are not produced automatically — operators should enable TXT export so the relay can extract rich metadata. Forwards without a TXT are still useful (binary gets indexed; rich fields stay NULL).
- **Backfill seeding.** To skip a large historical archive on first deploy, run `python event_forwarder.py --seed-state --thordata C:\THORDATA --state <state file>` before flipping the switch.
---
## Tray Icon Colors
| Color | Meaning |
|-------|---------|
| Green | Running, API reporting OK (and SFM forwarder healthy when enabled) |
| Amber | Running, API disabled OR SFM forwarder failing while API is healthy |
| Red | Running, API failing |
| Purple | Error — check logs |
| Grey | Starting up |
---
## Auto-Updater
Thor Watcher checks for updates every ~5 minutes. When a new release is found it downloads and validates the exe, then relaunches via a swap bat — no manual intervention needed.
**Update sources:**
- `gitea` — checks the Gitea release page (default)
- `url` — fetches `version.txt` and `thor-watcher.exe` from a custom server (e.g. Terra-View)
- `disabled` — no automatic checks; remote push from Terra-View still works
**Download validation:** 100 KB minimum size, 50% relative size floor vs current exe, MZ magic bytes check.
Remote update push from Terra-View Watcher Manager works regardless of `update_source` setting.
---
## Heartbeat Payload
Posted to `api_url` on each API interval:
```json ```json
{ {
"thordata_path": "C:\\THORDATA", "source_id": "THOR-PC",
"scan_interval": 60, "source_type": "series4_watcher",
"late_days": 2, "version": "0.3.0",
"stale_days": 60, "generated_at": "2026-03-20T14:30:00Z",
"sfm_endpoint": "http://<sfm backend ip>:8001/api/series4/heartbeat" "log_tail": ["...last 25 log lines..."],
"sfm_timeout": 5,
"debug": true
}
```
### Configuration Options
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `thordata_path` | string | `C:\THORDATA` | Root directory containing project/unit MLG files |
| `scan_interval` | integer | `60` | Time in seconds between scans |
| `late_days` | integer | `2` | Days before a unit is marked as LATE |
| `stale_days` | integer | `60` | Days before a unit is marked as STALE |
| `sfm_endpoint` | string | `""` | SFM backend URL (leave empty to disable HTTP) |
| `sfm_timeout` | integer | `5` | HTTP request timeout in seconds |
| `debug` | boolean | `true` | Enable/disable debug logging |
### Fallback Behavior
If `config.json` is missing or malformed, the agent will:
- Display a warning message
- Use built-in default values
- Continue running normally
## Usage
### Start the Agent
```bash
python series4_ingest.py
```
### Console Output Example
```
Series 4 Ingest Agent — Micromate Heartbeat (v0.1.2)
THORDATA root: C:\THORDATA
Now: 2025-12-08 14:30:00
--------------------------------------------------------------------------------
UM11719 OK Age: 1h 12m Last: 2025-12-08 13:18:00 Project: Clearwater - ECMS 57940
UM12345 LATE Age: 3d 5h Last: 2025-12-05 09:15:30 Project: Site Alpha
UM98765 STALE Age: 65d Last: 2025-10-04 08:22:15 Project: Legacy Site
--------------------------------------------------------------------------------
Total units: 3
Next scan in 60 seconds...
```
### Stop the Agent
Press `Ctrl+C` to gracefully stop the agent.
## Status Classification
Units are classified based on the age of their last MLG file:
- **OK**: Last activity within `late_days` threshold (default: < 2 days)
- **LATE**: Last activity between `late_days` and `stale_days` (default: 2-60 days)
- **STALE**: Last activity exceeds `stale_days` threshold (default: ≥ 60 days)
## SFM Backend Integration
When `sfm_endpoint` is configured, the agent sends JSON payloads to the Seismo Fleet Manager:
### Payload Structure
```json
{
"source": "series4_ingest",
"generated_at": "2025-12-08T14:30:00",
"units": [ "units": [
{ {
"unit_id": "UM11719", "unit_id": "UM11719",
"type": "micromate", "last_call": "2026-03-20T13:18:00Z",
"project_hint": "Clearwater - ECMS 57940", "age_minutes": 72,
"last_call": "2025-12-08T13:18:00", "mlg_path": "C:\\THORDATA\\Project A\\UM11719\\UM11719_20260320131800.MLG",
"status": "OK", "project_hint": "Project A"
"age_days": 0.05,
"age_hours": 1.2,
"mlg_path": "C:\\THORDATA\\Clearwater - ECMS 57940\\UM11719\\UM11719_20251208131800.MLG"
} }
] ]
} }
``` ```
### Disabling SFM Integration ---
Set `sfm_endpoint` to an empty string in `config.json`: ## THORDATA Directory Structure
```json
{
"sfm_endpoint": ""
}
```
## MLG File Format
The agent expects MLG files to follow this naming convention:
```
UM<unit_number>_YYYYMMDDHHMMSS.MLG
```
Example: `UM11719_20251208131800.MLG`
- `UM11719`: Unit ID
- `20251208131800`: Timestamp (2025-12-08 13:18:00)
- `.MLG`: Micromate Log file extension
## Directory Structure
Expected THORDATA folder structure:
``` ```
C:\THORDATA\ C:\THORDATA\
├── Project A\ ├── Project A\
│ ├── UM11719\ │ ├── UM11719\
│ │ ├── UM11719_20251208131800.MLG │ │ ├── UM11719_20260320131800.MLG
│ │ ── UM11719_20251207095430.MLG │ │ ── UM11719_20260319095430.MLG
│ │ └── ...
│ └── UM12345\ │ └── UM12345\
│ └── UM12345_20251205091530.MLG │ └── UM12345_20260318091530.MLG
└── Project B\ └── Project B\
└── UM98765\ └── UM98765\
└── UM98765_20251004082215.MLG └── UM98765_20260301082215.MLG
``` ```
---
## Troubleshooting ## Troubleshooting
### Config file not found **Tray icon is amber:** API URL is not configured or disabled — open Settings and enter Terra-View URL.
If you see `[WARN] Config file not found`, create `config.json` in the same directory as `series4_ingest.py`.
### THORDATA path doesn't exist **Tray icon is red:** API is failing — check Terra-View is reachable, URL is correct, and the network is up.
Verify the `thordata_path` in `config.json` points to the correct directory.
### No units found **Units showing wrong time in Terra-View:** Check `local_timezone` in Settings matches the field machine's timezone.
Ensure MLG files exist in the expected directory structure and follow the naming convention.
### SFM POST failures **No units found:** Verify `thordata_path` is correct and MLG files exist following the `UM####_YYYYMMDDHHMMSS.MLG` naming convention.
Check that:
- `sfm_endpoint` URL is correct and accessible
- Network connectivity is available
- SFM backend is running and accepting requests
## License **Auto-updater not working:** Check the log file for `[updater]` lines. On first deploy, verify the Gitea release has a `thor-watcher-X.X.X.exe` asset (not a setup exe).
Proprietary - Internal use only ---
## Version History ## Version History
See [CHANGELOG.md](CHANGELOG.md) for detailed version history. See [CHANGELOG.md](CHANGELOG.md) for detailed version history.
## Support ---
For issues or questions, contact the Seismo development team. *Proprietary — Terra-Mechanics Inc. Internal use only.*
+31
View File
@@ -0,0 +1,31 @@
@echo off
echo Building thor-watcher.exe...
pip install pyinstaller pystray Pillow
REM Extract version from series4_ingest.py (looks for: VERSION = "0.2.0")
for /f "tokens=3 delims= " %%V in ('findstr /C:"VERSION = " series4_ingest.py') do set RAW_VER=%%V
set VERSION=%RAW_VER:"=%
set EXE_NAME=thor-watcher-%VERSION%
echo Version: %VERSION%
echo Output: dist\%EXE_NAME%.exe
REM Check whether icon.ico exists alongside this script.
if exist "%~dp0icon.ico" (
pyinstaller --onefile --windowed --name "%EXE_NAME%" ^
--icon="%~dp0icon.ico" ^
--add-data "%~dp0icon.ico;." ^
thor_tray.py
) else (
echo [INFO] icon.ico not found -- building without custom icon.
pyinstaller --onefile --windowed --name "%EXE_NAME%" thor_tray.py
)
REM Copy versioned exe to plain name for Inno Setup
copy /Y "dist\%EXE_NAME%.exe" "dist\thor-watcher.exe"
echo.
echo Done.
echo Gitea upload: dist\%EXE_NAME%.exe
echo Inno Setup: dist\thor-watcher.exe (copy of above)
pause
+24 -5
View File
@@ -1,9 +1,28 @@
{ {
"thordata_path": "C:\\THORDATA", "thordata_path": "C:\\THORDATA",
"scan_interval": 60, "scan_interval": 60,
"late_days": 2,
"stale_days": 60, "api_url": "",
"sfm_endpoint": "http://<server_address>:8001/api/series4/heartbeat", "api_timeout": 5,
"sfm_timeout": 5, "api_interval": 300,
"debug": true "source_id": "",
"source_type": "series4_watcher",
"local_timezone": "America/New_York",
"enable_logging": true,
"log_file": "C:\\Users\\%USERNAME%\\AppData\\Local\\ThorWatcher\\agent_logs\\thor_watcher.log",
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
"sfm_forward_enabled": false,
"sfm_url": "",
"sfm_forward_interval": 60,
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "",
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365
} }
+62
View File
@@ -0,0 +1,62 @@
# Thor File Stucture Guide
This document is to explain how Thor formatis the file structure for units that call in via thor Autocall home.
## Main Stucture
Thor saves its data in a folder located in C:/ called 'THORDATA'. it then Creates folders for each user entered project. When a unit is added to that project, it creates a folder in project with that unit's serial number. Raw events (.IDFH for histogram and .IDFW for waveforms) plus .MLG monitor logs are then saved in this folder. if a unit is not assigned a project, it saves into a default project folder. If it matters, there is also a daily log file that gets created in the folder 'Logs'
In each unit's folder, there are various formats saved in their own individual folders too. Most cases have CSV, HTML, PDF, TXT, and XML
Here is the structure illustrated:
C:/THORDATA
├───Mon-Fayette Express Way - Sec 53A1
│ └───UM11402
│ ├───CSV
│ ├───HTML
│ ├───PDF
│ ├───TXT
│ └───XML
├───P.J. Dick - 5th and Halket
│ ├───UM11719
│ │ ├───CSV
│ │ ├───HTML
│ │ ├───PDF
│ │ ├───TXT
│ │ └───XML
│ UM12420
│ ├───CSV
│ ├───HTML
│ ├───PDF
│ ├───TXT
│ └───XML
Here is an expanded project folder with two events in it
└───Clearwater - ECMS 57940
└───BE9439
│ BE9439_20200713124250.MLG
│ BE9439_20200713124251.IDFH
│ BE9439_20200713131747.IDFW
│ BE9439_20200713131747.IDFW.CDB
├───CSV
│ BE9439_20200713124251.IDFH.csv
│ BE9439_20200713131747.IDFW.csv
├───PDF
│ BE9439_20200713124251.IDFH.pdf
│ BE9439_20200713131747.IDFW.pdf
├───TXT
│ BE9439_20200713124251.IDFH.txt
│ BE9439_20200713131747.IDFW.txt
└───XML
BE9439_20200713124251_IDFH_XML.XML
BE9439_20200713131747_IDFW_XML.XML
+823
View File
@@ -0,0 +1,823 @@
"""
event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event
files to a seismo-relay SFM server.
Walks the same `THORDATA_PATH/<Project>/<Unit>/` tree the heartbeat path
scans. For each event binary that hasn't been forwarded yet, pairs it
with its `<unit>/TXT/<basename>.txt` ASCII report (when available) and
POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one
multipart request.
This is a port of `series3-watcher/event_forwarder.py` adapted for the
Thor file layout. Key differences from the series3 forwarder:
- **Filenames are literal `<SERIAL>_<YYYYMMDDHHMMSS>.IDFH|.IDFW`** —
no base36 stem; the serial is right there in the prefix.
- **TXT sidecars live in a `TXT/` subfolder** next to the binaries
(Thor's exporter writes them there, not alongside the binary).
- **IDFH and IDFW are forwarded as independent events.** Each has
its own sha256-keyed state entry and its own POST. A single
timestamp can produce both a histogram (IDFH) and a waveform (IDFW),
and the seismo-relay endpoint dedupes on (serial, timestamp, kind),
so treating them as separate rows is the right model.
Design notes
────────────
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
Multipart encoding is hand-rolled.
- **Idempotent across restarts.** Forwarded files are tracked by
sha256 in a JSON state file (default: `<log_dir>/thor_forwarded.json`).
Re-scanning the watch tree doesn't re-POST anything.
- **Default-off.** Callers must enable via config
(`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x
deployments that auto-update stay non-forwarding until an operator
flips the switch.
- **Quiescence guard.** Files modified within the last few seconds
are skipped — Thor writes the .txt after the binary, so we wait
until both look stable before forwarding.
- **Best-effort report pairing.** When the .txt hasn't appeared yet
but the binary is older than `missing_report_grace_seconds`, the
binary is forwarded alone (seismo-relay accepts that and just skips
the rich fields — we'd rather get the binary indexed than block
forever waiting for a TXT that may never arrive, e.g. operator
hasn't enabled the TXT export in Thor).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
log = logging.getLogger(__name__)
# Default tuning. All overridable via config.json sfm_* keys.
DEFAULT_QUIESCENCE_SECONDS = 5
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60
DEFAULT_HTTP_TIMEOUT = 60.0
STATE_SCHEMA_VERSION = 1
# ── Filename matching ─────────────────────────────────────────────────────────
#
# Thor (Micromate Series IV) filename scheme:
# <SERIAL>_<YYYYMMDDHHMMSS>.<KIND>
# where SERIAL is the literal device serial (e.g. UM11719, BE9439),
# and KIND is IDFH (histogram) or IDFW (waveform).
#
# Examples:
# UM11719_20231219163444.IDFH
# UM11719_20231219162723.IDFW
# BE9439_20200713124251.IDFH
_EVENT_FILENAME_RE = re.compile(
r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
re.IGNORECASE,
)
# Filenames we explicitly skip even if they look event-shaped.
_NON_EVENT_EXTS = {
".mlg", # monitor-log files (separate heartbeat path)
".txt", # ASCII reports — paired in, not primary
".csv", # operator-facing derivative
".html", # operator-facing derivative
".pdf", # operator-facing derivative
".xml", # operator-facing derivative
".cdb", # IDFW.CDB cache-database variant — skip
".log",
".ini",
".json",
".sfm.json",
".bak",
".tmp",
}
def is_event_binary(path: str) -> bool:
"""Return True if `path`'s basename looks like a Thor event binary."""
name = os.path.basename(path)
lname = name.lower()
# Explicit reject for compound extensions like .IDFW.CDB
if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"):
return False
if not _EVENT_FILENAME_RE.match(name):
return False
ext = os.path.splitext(name)[1].lower()
if ext in _NON_EVENT_EXTS:
return False
return True
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]:
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` -> (serial, timestamp, kind).
`kind` is the upper-case extension without the dot — "IDFH" or "IDFW".
Returns None if the filename doesn't match.
"""
m = _EVENT_FILENAME_RE.match(name)
if not m:
return None
serial = m.group(1).upper()
try:
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError:
return None
kind = m.group(3).upper()
return serial, ts, kind
def idf_report_name(binary_name: str) -> str:
"""Thor TXT-export convention: append `.txt` to the binary basename.
UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt
"""
return binary_name + ".txt"
def idf_report_path(binary_path: str) -> str:
"""Compute the expected TXT sidecar path for a Thor event binary.
Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the
unit directory (verified against captured example-data). The
returned path is the *expected* location — caller is responsible
for checking that it actually exists.
"""
unit_dir = os.path.dirname(binary_path)
name = os.path.basename(binary_path)
return os.path.join(unit_dir, "TXT", idf_report_name(name))
def is_histogram_event(filename: str) -> bool:
"""True if this is an .IDFH (histogram) event. Used purely for
log clarity — Thor doesn't always export a TXT for histograms, so
a missing-report warning is suppressed for them."""
name = os.path.basename(filename)
return name.lower().endswith(".idfh")
def serial_from_filename(name: str) -> Optional[str]:
"""Extract the serial-number prefix from a Thor event filename."""
parsed = parse_event_filename(name)
if parsed is None:
return None
return parsed[0]
# ── State file ────────────────────────────────────────────────────────────────
class ForwardState:
"""Idempotency record: which event files have we already forwarded?
State file format (JSON):
{
"version": 1,
"forwarded": {
"<sha256>": {
"filename": "UM11719_20231219163444.IDFW",
"size": 8800,
"forwarded_at": "2026-05-19T...Z",
"had_report": true
},
...
}
}
Keyed by sha256 (not filename) so identical content is recognised
as already-forwarded even if the file moved or got renamed.
"""
def __init__(self, path: str):
self.path = path
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
self._load()
def _load(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as f:
d = json.load(f)
if not isinstance(d, dict):
raise ValueError("state file root is not an object")
if d.get("version") != STATE_SCHEMA_VERSION:
log.warning(
"forward state version mismatch (got %r, want %d) — starting fresh",
d.get("version"), STATE_SCHEMA_VERSION,
)
return
forwarded = d.get("forwarded")
if isinstance(forwarded, dict):
self._data["forwarded"] = forwarded
except FileNotFoundError:
pass
except (OSError, ValueError, json.JSONDecodeError) as exc:
log.warning("failed to load forward state from %s: %s", self.path, exc)
def _save(self) -> None:
tmp = self.path + ".tmp"
try:
d = os.path.dirname(self.path)
if d and not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except OSError as exc:
log.warning("failed to save forward state to %s: %s", self.path, exc)
def is_forwarded(self, sha256: str) -> bool:
return sha256 in self._data["forwarded"]
def status(self, sha256: str) -> Optional[bool]:
"""Return forwarding status for *sha256*.
Returns:
None — never forwarded. Eligible for a fresh forward.
True — forwarded successfully with its paired report.
NOT a candidate for re-forward.
False — forwarded WITHOUT its paired `.txt`. Eligible for
re-forward IF the TXT now exists, so seismo-relay's
upsert refreshes the DB row with authoritative
device-side values.
Legacy entries without a `had_report` key default to True so
an upgrade doesn't unexpectedly re-forward every entry.
"""
entry = self._data["forwarded"].get(sha256)
if entry is None:
return None
return bool(entry.get("had_report", True))
def mark_forwarded(
self,
sha256: str,
filename: str,
size: int,
had_report: bool = True,
) -> None:
"""Record a successful forward.
Set `had_report=False` when the forward shipped the binary
without its paired ASCII report. Such entries are re-checked
on subsequent scans and re-forwarded once the TXT appears.
Idempotent: re-marking an existing sha256 with `had_report=True`
is the explicit promotion path used when a re-pair succeeds.
"""
self._data["forwarded"][sha256] = {
"filename": filename,
"size": size,
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"had_report": had_report,
}
self._save()
def count(self) -> int:
return len(self._data["forwarded"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def sha256_of_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
"""Return True if the file's mtime is at least `quiescence_seconds`
in the past — i.e. no longer being written."""
try:
mtime = os.path.getmtime(path)
except OSError:
return False
return (now_ts - mtime) >= quiescence_seconds
def _iter_unit_dirs(thordata_root: str):
"""Yield (project_name, unit_name, unit_path) for every Thor unit
folder beneath `thordata_root`.
THORDATA layout: <root>/<Project>/<UnitSerial>/
"""
if not os.path.isdir(thordata_root):
return
try:
projects = os.listdir(thordata_root)
except OSError:
return
for proj in projects:
proj_path = os.path.join(thordata_root, proj)
if not os.path.isdir(proj_path):
continue
try:
units = os.listdir(proj_path)
except OSError:
continue
for unit in units:
unit_path = os.path.join(proj_path, unit)
if not os.path.isdir(unit_path):
continue
yield proj, unit, unit_path
def _find_txt_in_unit(unit_path: str, binary_name: str,
now_ts: float, quiescence_seconds: float,
_txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]:
"""Look up the matching `.txt` sidecar for `binary_name` inside
`<unit_path>/TXT/`. Case-insensitive on Windows-shaped paths.
Returns the absolute path if a quiescent sidecar exists, else None.
`_txt_cache` is mutated to memoize the lower-case → actual-name
map for each unit so repeated lookups in one scan don't restat.
"""
expected = idf_report_name(binary_name)
if unit_path not in _txt_cache:
txt_dir = os.path.join(unit_path, "TXT")
listing: Dict[str, str] = {}
if os.path.isdir(txt_dir):
try:
for n in os.listdir(txt_dir):
listing[n.lower()] = n
except OSError:
pass
_txt_cache[unit_path] = listing
listing = _txt_cache[unit_path]
actual = listing.get(expected.lower())
if not actual:
return None
candidate = os.path.join(unit_path, "TXT", actual)
if _is_quiescent(candidate, now_ts, quiescence_seconds):
return candidate
return None
# ── Scan pass ─────────────────────────────────────────────────────────────────
def find_pending_events(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
max_per_pass: int = 0,
) -> List[Tuple[str, Optional[str]]]:
"""
Walk `thordata_root` and return the list of (binary_path, txt_path_or_None)
pairs that need forwarding.
Filtering rules:
- Filename must match the Thor event filename regex.
- File must be quiescent (mtime >= quiescence_seconds in the past).
- File must not exceed `max_age_days`.
- File's sha256 must NOT already be in the forwarded state
(unless it was forwarded without its TXT and the TXT is now
present — see ForwardState.status).
- If a `<unit>/TXT/<basename>.txt` exists and is quiescent,
we pair them. Otherwise, if the binary is older than
missing_report_grace_seconds, we forward without the TXT.
Younger binaries with a missing TXT are deferred.
- When `max_per_pass > 0`, return at most that many pairs.
Older files (lower mtime) are forwarded first so backfill
proceeds chronologically.
"""
if not os.path.isdir(thordata_root):
log.warning("forward scan: thordata root not found: %s", thordata_root)
return []
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
# First pass: collect every event-binary entry with mtime for sorting.
candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path)
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = list(os.scandir(unit_path))
except OSError:
continue
for e in entries:
if not e.is_file():
continue
if not is_event_binary(e.path):
continue
try:
mtime = e.stat().st_mtime
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
continue
candidates.append((mtime, unit_path, e.path))
# Sort oldest-first so backfill is chronological.
candidates.sort(key=lambda t: t[0])
pending: List[Tuple[str, Optional[str]]] = []
skipped_inflight = 0
skipped_already_forwarded = 0
txt_cache: Dict[str, Dict[str, str]] = {}
for mtime, unit_path, binary_path in candidates:
if not _is_quiescent(binary_path, now_ts, quiescence_seconds):
skipped_inflight += 1
continue
try:
digest = sha256_of_file(binary_path)
except OSError as exc:
log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc)
continue
fwd_status = state.status(digest)
if fwd_status is True:
skipped_already_forwarded += 1
continue
binary_name = os.path.basename(binary_path)
txt_path = _find_txt_in_unit(
unit_path, binary_name, now_ts, quiescence_seconds, txt_cache,
)
if fwd_status is False:
# Previously forwarded WITHOUT report. Re-forward only
# if the TXT is now present so seismo-relay's upsert can
# refresh the row with authoritative device values.
if txt_path is None:
skipped_already_forwarded += 1
continue
elif txt_path is None:
# First-time forward and TXT not yet present. Wait for
# the grace period before forwarding alone.
if (now_ts - mtime) < missing_report_grace_seconds:
skipped_inflight += 1
continue
pending.append((binary_path, txt_path))
if max_per_pass and len(pending) >= max_per_pass:
break
log.debug(
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
)
return pending
# ── Multipart upload ──────────────────────────────────────────────────────────
def _encode_multipart(
parts: List[Tuple[str, str, str, bytes]],
) -> Tuple[bytes, str]:
"""Encode a list of (field_name, filename, content_type, data) tuples
as a multipart/form-data body. Returns (body_bytes, content_type
header value)."""
boundary = "----ThorWatcherBoundary" + os.urandom(8).hex()
chunks: List[bytes] = []
for field_name, filename, content_type, data in parts:
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
chunks.append(
(f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode("ascii")
)
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
chunks.append(data)
chunks.append(b"\r\n")
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
body = b"".join(chunks)
content_type_hdr = f"multipart/form-data; boundary={boundary}"
return body, content_type_hdr
def _import_endpoint(sfm_url: str) -> str:
"""Compose the import endpoint URL from a base SFM URL."""
return sfm_url.rstrip("/") + "/db/import/idf_file"
def forward_event_pair(
sfm_url: str,
binary_path: str,
txt_path: Optional[str],
*,
serial_hint: Optional[str] = None,
timeout: float = DEFAULT_HTTP_TIMEOUT,
) -> Dict[str, Any]:
"""POST a single event (binary + optional .txt) to the SFM import
endpoint.
Returns a dict mirroring the per-file outcome the server returned
on success, or a dict with `status="error"` on transport/HTTP
failure.
"""
binary_name = os.path.basename(binary_path)
with open(binary_path, "rb") as f:
binary_bytes = f.read()
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
if txt_path is not None:
with open(txt_path, "rb") as f:
txt_bytes = f.read()
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
body, content_type = _encode_multipart(parts)
# Auto-derive serial from filename if caller didn't supply one.
if not serial_hint:
serial_hint = serial_from_filename(binary_name)
url = _import_endpoint(sfm_url)
if serial_hint:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}serial={serial_hint}"
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Content-Type": content_type,
"Content-Length": str(len(body)),
"User-Agent": "thor-watcher/sfm-forwarder",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return {
"status": "error",
"filename": binary_name,
"detail": f"server returned non-JSON: {raw[:200]!r}",
}
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name and entry.get("status") == "ok":
return entry
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name:
return entry
return {
"status": "error",
"filename": binary_name,
"detail": f"unexpected server response: {payload!r}",
}
except urllib.error.HTTPError as exc:
try:
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
except Exception:
body_excerpt = ""
return {
"status": "error",
"filename": binary_name,
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
}
except urllib.error.URLError as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"connection error: {exc.reason}",
}
except (OSError, TimeoutError) as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"transport error: {exc}",
}
# ── Top-level orchestration ───────────────────────────────────────────────────
def forward_pending(
thordata_root: str,
sfm_url: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
timeout: float = DEFAULT_HTTP_TIMEOUT,
max_per_pass: int = 0,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""
Run one full pass: find pending events, POST each one, update state.
Returns a counts dict suitable for logging:
{
"scanned": <int>, # event binaries selected for forward
"forwarded": <int>, # successfully POSTed this pass
"errors": <int>, # POST failures (will retry next pass)
"with_report":<int>, # of forwarded, how many had a paired TXT
}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
pending = find_pending_events(
thordata_root, state,
max_age_days=max_age_days,
quiescence_seconds=quiescence_seconds,
missing_report_grace_seconds=missing_report_grace_seconds,
max_per_pass=max_per_pass,
)
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
for binary_path, txt_path in pending:
result = forward_event_pair(
sfm_url, binary_path, txt_path,
timeout=timeout,
)
if result.get("status") == "ok":
try:
digest = sha256_of_file(binary_path)
size = os.path.getsize(binary_path)
state.mark_forwarded(
digest,
os.path.basename(binary_path),
size,
had_report=(txt_path is not None),
)
except OSError as exc:
_log(f"[forward] post-success state save failed for "
f"{os.path.basename(binary_path)}: {exc}")
counts["forwarded"] += 1
if txt_path:
counts["with_report"] += 1
if txt_path:
report_token = "+ {} attached".format(os.path.basename(txt_path))
elif is_histogram_event(binary_path):
report_token = "(histogram, no report expected)"
else:
report_token = "no report"
_log(
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
os.path.basename(binary_path),
result.get("filesize", 0),
report_token,
result.get("inserted", 0),
result.get("skipped", 0),
)
)
else:
counts["errors"] += 1
_log(
f"[forward] ERR {os.path.basename(binary_path)}: "
f"{result.get('detail', 'unknown error')}"
)
return counts
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
def seed_state_from_folder(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int = 365,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""Walk `thordata_root` and mark every existing event binary as
already forwarded — without POSTing anything.
Run this ONCE before enabling sfm_forward_enabled on a machine
with a large historical archive. The watcher then starts
forwarding only events that appear AFTER the seed run.
Returns a counts dict:
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
if not os.path.isdir(thordata_root):
_log(f"[seed] thordata root not found: {thordata_root}")
return counts
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = [e for e in os.scandir(unit_path) if e.is_file()]
except OSError:
continue
for e in entries:
if not is_event_binary(e.path):
continue
counts["scanned"] += 1
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
counts["skipped_too_old"] += 1
continue
try:
digest = sha256_of_file(e.path)
except OSError as exc:
_log(f"[seed] sha256 failed for {e.path}: {exc}")
continue
if state.is_forwarded(digest):
counts["already_known"] += 1
continue
state.mark_forwarded(digest, e.name, size)
counts["seeded"] += 1
if counts["seeded"] % 1000 == 0:
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
_log(
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
f"already_known={counts['already_known']} "
f"skipped_too_old={counts['skipped_too_old']}"
)
return counts
# ── CLI entry point ─────────────────────────────────────────────────────────
def _main() -> int:
"""Command-line interface for one-shot operations.
python event_forwarder.py --seed-state \\
--thordata "C:\\THORDATA" \\
--state "<path/to/thor_forwarded.json>" \\
[--max-age-days 365]
"""
import argparse
parser = argparse.ArgumentParser(
description="Thor Watcher — SFM event forwarder utilities",
)
parser.add_argument(
"--seed-state", action="store_true",
help="Mark every event binary in --thordata as already-forwarded "
"(without POSTing). Use this BEFORE enabling sfm_forward "
"on a machine with a large historical archive.",
)
parser.add_argument(
"--thordata", required=True,
help="Path to the THORDATA root folder.",
)
parser.add_argument(
"--state", required=True,
help="Path to the JSON state file. Will be created if missing.",
)
parser.add_argument(
"--max-age-days", type=int, default=365,
help="Only seed files newer than this many days (default 365).",
)
args = parser.parse_args()
if not args.seed_state:
parser.error("specify --seed-state (no other modes supported yet)")
print(f"[seed] thordata = {args.thordata}")
print(f"[seed] state = {args.state}")
print(f"[seed] max_age = {args.max_age_days} days")
state = ForwardState(args.state)
print(f"[seed] state currently has {state.count()} entries")
seed_state_from_folder(
args.thordata, state,
max_age_days=args.max_age_days,
logger=lambda m: print(m),
)
print(f"[seed] state now has {state.count()} entries")
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())
+41
View File
@@ -0,0 +1,41 @@
; Inno Setup script for Thor Watcher
; Run through Inno Setup Compiler after building dist\thor-watcher.exe
[Setup]
AppName=Thor Watcher
AppVersion=0.2.0
AppPublisher=Terra-Mechanics Inc.
DefaultDirName={pf}\ThorWatcher
DefaultGroupName=Thor Watcher
OutputBaseFilename=thor-watcher-setup
Compression=lzma
SolidCompression=yes
; Require admin rights so we can write to Program Files
PrivilegesRequired=admin
[Tasks]
Name: "desktopicon"; Description: "Create a &desktop icon"; GroupDescription: "Additional icons:"; Flags: unchecked
[Dirs]
; Create the agent_logs folder so the watcher can write logs on first run
Name: "{app}\agent_logs"
[Files]
; Main executable — built by build.bat / PyInstaller
Source: "dist\thor-watcher.exe"; DestDir: "{app}"; Flags: ignoreversion
[Icons]
; Start Menu shortcut
Name: "{group}\Thor Watcher"; Filename: "{app}\thor-watcher.exe"
; Start Menu uninstall shortcut
Name: "{group}\Uninstall Thor Watcher"; Filename: "{uninstallexe}"
; Desktop shortcut (optional — controlled by [Tasks] above)
Name: "{commondesktop}\Thor Watcher"; Filename: "{app}\thor-watcher.exe"; Tasks: desktopicon
; Startup folder shortcut so the tray app launches on login
Name: "{userstartup}\Thor Watcher"; Filename: "{app}\thor-watcher.exe"
[Run]
; Offer to launch the app after install (unchecked by default)
Filename: "{app}\thor-watcher.exe"; \
Description: "Launch Thor Watcher"; \
Flags: nowait postinstall skipifsilent unchecked
+364 -326
View File
@@ -1,145 +1,145 @@
""" """
Series 4 Ingest Agent v0.1.2 Thor Watcher — Series 4 Ingest Agent v0.3.0
Micromate (Series 4) ingest agent for Seismo Fleet Manager (SFM). Micromate (Series 4) ingest agent for Terra-View.
Behavior: Behavior:
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG - Scans C:\THORDATA\<Project>\<UM####>\*.MLG
- For each UM####, finds the newest .MLG by timestamp in the filename - For each UM####, finds the newest .MLG by timestamp in the filename
- Computes "age" from last_call -> now - Posts JSON heartbeat payload to Terra-View backend
- Classifies status as OK / LATE / STALE - Forwards .IDFH/.IDFW event files (+ TXT sidecars) to a seismo-relay
- Prints a console heartbeat SFM server when sfm_forward_enabled=true. See event_forwarder.py.
- (Optional) Posts JSON payload to SFM backend - Tray-friendly: run_watcher(state, stop_event) for background thread use
No roster. SFM backend decides what to do with each unit.
""" """
import os import os
import re import re
import sys
import time import time
import json import json
import sys import threading
from datetime import datetime, timedelta, timezone import urllib.request
from typing import Dict, Any, Optional, Tuple import urllib.error
from zoneinfo import ZoneInfo from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from socket import gethostname
try: import event_forwarder
# urllib is in stdlib; used instead of requests for portability
import urllib.request
import urllib.error
except ImportError:
urllib = None # type: ignore
# ---------------- Config ----------------
def load_config(config_path: str = "config.json") -> Dict[str, Any]: # ── Version ───────────────────────────────────────────────────────────────────
VERSION = "0.3.0"
# ── Config ────────────────────────────────────────────────────────────────────
def load_config(config_path: str) -> Dict[str, Any]:
""" """
Load configuration from a JSON file. Load configuration from config.json.
Merges with defaults so any missing key is always present.
Falls back to defaults if file doesn't exist or has errors. Raises on file-not-found or malformed JSON (caller handles).
""" """
defaults = { defaults: Dict[str, Any] = {
"thordata_path": r"C:\THORDATA", "thordata_path": r"C:\THORDATA",
"scan_interval": 60, "scan_interval": 60,
"late_days": 2, "api_url": "",
"stale_days": 60, "api_timeout": 5,
"sfm_endpoint": "", "api_interval": 300,
"sfm_timeout": 5, "source_id": "",
"debug": True "source_type": "series4_watcher",
"local_timezone": "America/New_York",
"enable_logging": True,
"log_file": os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
"ThorWatcher", "agent_logs", "thor_watcher.log"
),
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
"debug": False,
# SFM event forwarding — default OFF, opt-in via Settings.
"sfm_forward_enabled": False,
"sfm_url": "", # e.g. "http://10.0.0.44:8200"
"sfm_forward_interval": 60, # seconds between forward passes
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "", # blank → <log_dir>/thor_forwarded.json
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365,
} }
# Try to find config file relative to script location with open(config_path, "r", encoding="utf-8") as f:
script_dir = os.path.dirname(os.path.abspath(__file__)) raw = json.load(f)
full_config_path = os.path.join(script_dir, config_path)
if not os.path.exists(full_config_path): return {**defaults, **raw}
print(f"[WARN] Config file not found at {full_config_path}, using defaults", file=sys.stderr)
return defaults
# ── Logging ───────────────────────────────────────────────────────────────────
def log_message(path: str, enabled: bool, msg: str) -> None:
if not enabled:
return
try: try:
with open(full_config_path, 'r') as f: d = os.path.dirname(path) or "."
config = json.load(f) if not os.path.exists(d):
# Merge with defaults to ensure all keys exist os.makedirs(d)
return {**defaults, **config} with open(path, "a", encoding="utf-8") as f:
except json.JSONDecodeError as e: f.write("{} {}\n".format(datetime.now(timezone.utc).isoformat(), msg))
print(f"[WARN] Invalid JSON in config file: {e}, using defaults", file=sys.stderr) except Exception:
return defaults pass
except Exception as e:
print(f"[WARN] Error loading config file: {e}, using defaults", file=sys.stderr)
return defaults
# Load configuration
config = load_config()
THORDATA_PATH = config["thordata_path"]
SCAN_INTERVAL = config["scan_interval"]
LATE_DAYS = config["late_days"]
STALE_DAYS = config["stale_days"]
SFM_ENDPOINT = config["sfm_endpoint"]
SFM_TIMEOUT = config["sfm_timeout"]
DEBUG = config["debug"]
# Regex: UM12345_YYYYMMDDHHMMSS.MLG
MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
# ---------------- Helpers ---------------- def _read_log_tail(log_file: str, n: int = 25) -> Optional[List[str]]:
"""Return the last n lines of the log file as a list, or None."""
if not log_file:
return None
try:
with open(log_file, "r", errors="replace") as f:
lines = f.readlines()
return [line.rstrip("\n") for line in lines[-n:]]
except Exception:
return None
def debug(msg: str) -> None: # ── MLG filename parsing ──────────────────────────────────────────────────────
if DEBUG:
print(f"[DEBUG] {msg}", file=sys.stderr, flush=True) # Matches: UM12345_20251204193042.MLG
_MLG_PATTERN = re.compile(r"^(UM\d+)_([0-9]{14})\.MLG$", re.IGNORECASE)
def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]: def parse_mlg_filename(name: str) -> Optional[Tuple[str, datetime]]:
""" """Parse UM####_YYYYMMDDHHMMSS.MLG -> (unit_id, timestamp) or None."""
Parse a Micromate MLG filename of the form: m = _MLG_PATTERN.match(name)
UM12345_20251204193042.MLG
Returns:
(unit_id, timestamp) or None if pattern doesn't match.
"""
m = MLG_PATTERN.match(name)
if not m: if not m:
return None return None
unit_id_raw = m.group(1) # e.g. "UM12345" unit_id = m.group(1).upper()
ts_str = m.group(2) # "YYYYMMDDHHMMSS"
try: try:
ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S") ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError: except ValueError:
return None return None
# Normalize unit_id to uppercase for consistency return unit_id, ts
return unit_id_raw.upper(), ts
# ── THORDATA scanner ──────────────────────────────────────────────────────────
def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]: def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
""" """
Scan THORDATA folder for Micromate MLG files. Scan THORDATA folder structure: <root>/<Project>/<UM####>/*.MLG
Expected structure:
C:\THORDATA\<Project>\<UM####>\*.MLG
Returns: Returns:
unit_map: { { "UM12345": { "unit_id", "project", "last_call" (datetime naive local), "mlg_path" }, ... }
"UM12345": {
"unit_id": "UM12345",
"project": "Clearwater - ECMS 57940",
"last_call": datetime(...),
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM12345_....MLG"
},
...
}
""" """
unit_map: Dict[str, Dict[str, Any]] = {} unit_map: Dict[str, Dict[str, Any]] = {}
if not os.path.isdir(root): if not os.path.isdir(root):
debug(f"THORDATA_PATH does not exist or is not a directory: {root}")
return unit_map return unit_map
try: try:
project_names = os.listdir(root) project_names = os.listdir(root)
except OSError as e: except OSError:
debug(f"Failed to list THORDATA root '{root}': {e}")
return unit_map return unit_map
for project_name in project_names: for project_name in project_names:
@@ -147,11 +147,9 @@ def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
if not os.path.isdir(project_path): if not os.path.isdir(project_path):
continue continue
# Each project contains UM#### subfolders
try: try:
unit_dirs = os.listdir(project_path) unit_dirs = os.listdir(project_path)
except OSError as e: except OSError:
debug(f"Failed to list project '{project_path}': {e}")
continue continue
for unit_name in unit_dirs: for unit_name in unit_dirs:
@@ -159,280 +157,320 @@ def scan_thordata(root: str) -> Dict[str, Dict[str, Any]]:
if not os.path.isdir(unit_path): if not os.path.isdir(unit_path):
continue continue
# We expect folder names like "UM12345"
# but we'll parse filenames anyway, so we don't rely on folder naming.
try: try:
files = os.listdir(unit_path) files = os.listdir(unit_path)
except OSError as e: except OSError:
debug(f"Failed to list unit folder '{unit_path}': {e}")
continue continue
for fname in files: for fname in files:
if not fname.upper().endswith(".MLG"): if not fname.upper().endswith(".MLG"):
continue continue
parsed = parse_mlg_filename(fname) parsed = parse_mlg_filename(fname)
if not parsed: if not parsed:
continue continue
unit_id, ts = parsed unit_id, ts = parsed
full_path = os.path.join(unit_path, fname) full_path = os.path.join(unit_path, fname)
current = unit_map.get(unit_id) current = unit_map.get(unit_id)
if current is None or ts > current["last_call"]: if current is None or ts > current["last_call"]:
unit_map[unit_id] = { unit_map[unit_id] = {
"unit_id": unit_id, "unit_id": unit_id,
"project": project_name, "project": project_name,
"last_call": ts, "last_call": ts,
"mlg_path": full_path, "mlg_path": full_path,
} }
return unit_map return unit_map
def determine_status(last_call: datetime, now: Optional[datetime] = None) -> Tuple[str, float]: # ── API payload ───────────────────────────────────────────────────────────────
"""
Determine status (OK / LATE / STALE) based on age in days.
Returns: def build_api_payload(unit_map: Dict[str, Dict[str, Any]], cfg: Dict[str, Any]) -> dict:
(status, age_days) """Build the Terra-View JSON heartbeat payload."""
"""
if now is None:
now = datetime.now()
age = now - last_call
# Protect against clocks being off; don't go negative.
if age.total_seconds() < 0:
age = timedelta(seconds=0)
age_days = age.total_seconds() / 86400.0
if age_days < LATE_DAYS:
status = "OK"
elif age_days < STALE_DAYS:
status = "LATE"
else:
status = "STALE"
return status, age_days
def format_age(td: timedelta) -> str:
"""
Format a timedelta into a human-readable age string.
Examples:
1d 2h
3h 15m
42m
"""
total_seconds = int(td.total_seconds())
if total_seconds < 0:
total_seconds = 0
days, rem = divmod(total_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if days == 0 and minutes > 0:
parts.append(f"{minutes}m") # only show minutes if < 1d
if not parts:
return "0m"
return " ".join(parts)
def clear_console() -> None:
"""Clear the console screen (Windows / *nix)."""
if os.name == "nt":
os.system("cls")
else:
os.system("clear")
def print_heartbeat(unit_map: Dict[str, Dict[str, Any]]) -> None:
"""
Print a console heartbeat table of all units.
Example:
UM11719 OK Age: 1h 12m Last: 2025-12-04 19:30:42 Project: Clearwater - ECMS 57940
"""
now = datetime.now()
clear_console()
print("Series 4 Ingest Agent — Micromate Heartbeat (v0.1.2)")
print(f"THORDATA root: {THORDATA_PATH}")
print(f"Now: {now.strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 80)
if not unit_map:
print("No units found (no .MLG files detected).")
return
# Sort by unit_id for stable output
for unit_id in sorted(unit_map.keys()):
entry = unit_map[unit_id]
last_call = entry["last_call"]
project = entry["project"]
age_td = now - last_call
status, _age_days = determine_status(last_call, now)
age_str = format_age(age_td)
last_str = last_call.strftime("%Y-%m-%d %H:%M:%S")
print(
f"{unit_id:<8} {status:<6} Age: {age_str:<8} "
f"Last: {last_str} Project: {project}"
)
print("-" * 80)
print(f"Total units: {len(unit_map)}")
print(f"Next scan in {SCAN_INTERVAL} seconds...")
sys.stdout.flush()
def build_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""
Build a JSON-serializable payload for SFM backend.
All timestamps are converted to UTC for transmission (standard practice).
Terra-View stores UTC and converts to local time for display.
Structure (example):
{
"source": "series4_ingest",
"generated_at": "2025-12-04T20:01:00Z",
"units": [
{
"unit_id": "UM11719",
"type": "micromate",
"project_hint": "Clearwater - ECMS 57940",
"last_call": "2025-12-05T00:30:42Z",
"status": "OK",
"age_days": 0.04,
"age_hours": 0.9,
"mlg_path": "C:\\THORDATA\\Clearwater...\\UM11719_....MLG"
},
...
]
}
"""
now_local = datetime.now()
now_utc = datetime.now(timezone.utc) now_utc = datetime.now(timezone.utc)
local_tz = ZoneInfo("America/New_York") now_local = datetime.now()
payload_units = []
source_id = (cfg.get("source_id") or "").strip() or gethostname()
# Resolve local timezone for MLG timestamp conversion
try:
from zoneinfo import ZoneInfo
local_tz = ZoneInfo(cfg.get("local_timezone") or "America/New_York")
except Exception:
local_tz = None
units = []
for unit_id, entry in unit_map.items(): for unit_id, entry in unit_map.items():
last_call: datetime = entry["last_call"] last_call: datetime = entry["last_call"]
project = entry["project"] age_seconds = max(0.0, (now_local - last_call).total_seconds())
mlg_path = entry["mlg_path"] age_minutes = int(age_seconds // 60)
# Use local time for status calculation (age comparison) # MLG timestamps are local naive — convert to UTC for transmission
status, age_days = determine_status(last_call, now_local) try:
age_hours = age_days * 24.0 if local_tz is not None:
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc)
last_call_str = last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
# Fallback: send as-is with Z and accept the inaccuracy
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
last_call_str = last_call.strftime("%Y-%m-%dT%H:%M:%SZ")
# Convert last_call from local time to UTC for transmission units.append({
last_call_utc = last_call.replace(tzinfo=local_tz).astimezone(timezone.utc) "unit_id": unit_id,
"last_call": last_call_str,
"age_minutes": age_minutes,
"mlg_path": entry["mlg_path"],
"project_hint": entry["project"],
})
payload_units.append( return {
{ "source_id": source_id,
"unit_id": unit_id, "source_type": cfg.get("source_type", "series4_watcher"),
"type": "micromate", "version": VERSION,
"project_hint": project,
"last_call": last_call_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"status": status,
"age_days": age_days,
"age_hours": age_hours,
"mlg_path": mlg_path,
}
)
payload = {
"source": "series4_ingest",
"generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"), "generated_at": now_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"units": payload_units, "units": units,
} }
return payload
def emit_sfm_payload(unit_map: Dict[str, Dict[str, Any]]) -> None: def send_api_payload(payload: dict, api_url: str, timeout: int) -> Optional[dict]:
""" """POST payload to Terra-View. Returns parsed JSON response or None on failure."""
Send heartbeat payload to SFM backend, if SFM_ENDPOINT is configured. if not api_url:
return None
This is intentionally conservative:
- If SFM_ENDPOINT is empty -> do nothing
- If any error occurs -> print to stderr, but do not crash the agent
"""
if not SFM_ENDPOINT:
return
if urllib is None:
print(
"[WARN] urllib not available; cannot POST to SFM. "
"Install standard Python or disable SFM_ENDPOINT.",
file=sys.stderr,
)
return
payload = build_sfm_payload(unit_map)
data = json.dumps(payload).encode("utf-8") data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request( req = urllib.request.Request(
SFM_ENDPOINT, api_url, data=data,
data=data,
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
method="POST",
) )
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
print("[API] POST success: {}".format(resp.status))
try:
return json.loads(resp.read().decode("utf-8"))
except Exception:
return {}
except urllib.error.URLError as e:
print("[API] POST failed: {}".format(e))
return None
except Exception as e:
print("[API] Unexpected error: {}".format(e))
return None
# ── Watcher loop (tray-friendly) ──────────────────────────────────────────────
def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
"""
Main watcher loop. Runs in a background thread when launched from the tray.
state keys written each cycle:
state["status"] — "running" | "error" | "starting"
state["api_status"] — "ok" | "fail" | "disabled"
state["units"] — list of unit dicts for tray display
state["last_scan"] — datetime of last successful scan
state["last_error"] — last error string or None
state["log_dir"] — directory containing the log file
state["cfg"] — loaded config dict
state["update_available"] — set True when API response signals an update
state["sfm_status"] — "ok" | "fail" | "disabled" | "ready"
state["last_forward"] — datetime of last forwarder pass (or None)
state["last_forward_counts"] — dict from event_forwarder.forward_pending
"""
# Resolve config path
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or ""
config_dir = os.path.join(_appdata, "ThorWatcher")
else:
config_dir = os.path.dirname(os.path.abspath(__file__)) or "."
config_path = os.path.join(config_dir, "config.json")
state["status"] = "starting"
state["units"] = []
state["last_scan"] = None
state["last_error"] = None
state["log_dir"] = None
state["cfg"] = {}
state["update_available"] = False
try: try:
with urllib.request.urlopen(req, timeout=SFM_TIMEOUT) as resp: cfg = load_config(config_path)
_ = resp.read() # we don't care about the body for now
debug(f"SFM POST OK: HTTP {resp.status}")
except urllib.error.URLError as e:
print(f"[WARN] Failed to POST to SFM: {e}", file=sys.stderr)
except Exception as e: except Exception as e:
print(f"[WARN] Unexpected error during SFM POST: {e}", file=sys.stderr) state["status"] = "error"
state["last_error"] = "Config load failed: {}".format(e)
return
state["cfg"] = cfg
log_file = cfg["log_file"]
state["log_dir"] = os.path.dirname(log_file) or config_dir
THORDATA_PATH = cfg["thordata_path"]
SCAN_INTERVAL = int(cfg["scan_interval"])
API_URL = cfg["api_url"]
API_TIMEOUT = int(cfg["api_timeout"])
API_INTERVAL = int(cfg["api_interval"])
ENABLE_LOGGING = bool(cfg["enable_logging"])
# SFM forwarder config
SFM_FORWARD_ENABLED = bool(cfg.get("sfm_forward_enabled", False))
SFM_URL = str(cfg.get("sfm_url", "")).strip()
SFM_FORWARD_INTERVAL = int(cfg.get("sfm_forward_interval", 60))
SFM_QUIESCENCE = int(cfg.get("sfm_quiescence_seconds", 5))
SFM_GRACE = int(cfg.get("sfm_missing_report_grace_seconds", 60))
SFM_HTTP_TIMEOUT = int(cfg.get("sfm_http_timeout", 60))
SFM_MAX_PER_PASS = int(cfg.get("sfm_max_forwards_per_pass", 500))
SFM_MAX_AGE_DAYS = int(cfg.get("sfm_max_event_age_days", 365))
sfm_state_path = str(cfg.get("sfm_state_file", "")).strip() or \
os.path.join(state["log_dir"], "thor_forwarded.json")
log_message(log_file, ENABLE_LOGGING,
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={} SFM={}".format(
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL),
bool(SFM_FORWARD_ENABLED and SFM_URL),
)
)
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={} SFM={}".format(
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL),
bool(SFM_FORWARD_ENABLED and SFM_URL),
))
# Initialize SFM forwarder state (if enabled)
sfm_state_obj: Optional[event_forwarder.ForwardState] = None
if SFM_FORWARD_ENABLED and SFM_URL:
try:
sfm_state_obj = event_forwarder.ForwardState(sfm_state_path)
state["sfm_status"] = "ready"
log_message(log_file, ENABLE_LOGGING,
"[sfm] forwarder ready url={} state_file={} known={}".format(
SFM_URL, sfm_state_path, sfm_state_obj.count(),
)
)
print("[SFM] forwarder ready url={} known={}".format(
SFM_URL, sfm_state_obj.count(),
))
except Exception as exc:
state["sfm_status"] = "fail"
state["last_error"] = "SFM init failed: {}".format(exc)
log_message(log_file, ENABLE_LOGGING,
"[sfm] init failed: {}".format(exc))
else:
state["sfm_status"] = "disabled"
state["last_forward"] = None
state["last_forward_counts"] = None
last_api_ts = 0.0
last_forward_ts = 0.0
while not stop_event.is_set():
try:
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("-" * 80)
print("Heartbeat @ {}".format(now_str))
print("-" * 80)
unit_map = scan_thordata(THORDATA_PATH)
now_local = datetime.now()
unit_list = []
for uid in sorted(unit_map.keys()):
entry = unit_map[uid]
last_call = entry["last_call"]
age_seconds = max(0.0, (now_local - last_call).total_seconds())
age_minutes = int(age_seconds // 60)
unit_list.append({
"uid": uid,
"age_minutes": age_minutes,
"last_call": last_call.strftime("%Y-%m-%d %H:%M:%S"),
"mlg_path": entry["mlg_path"],
"project": entry["project"],
})
line = "{uid:<8} Age: {h}h {m}m Last: {last} Project: {proj}".format(
uid=uid,
h=age_minutes // 60,
m=age_minutes % 60,
last=last_call.strftime("%Y-%m-%d %H:%M:%S"),
proj=entry["project"],
)
print(line)
log_message(log_file, ENABLE_LOGGING, line)
if not unit_list:
msg = "[info] No Micromate units found in THORDATA"
print(msg)
log_message(log_file, ENABLE_LOGGING, msg)
state["status"] = "running"
state["units"] = unit_list
state["last_scan"] = datetime.now()
state["last_error"] = None
# ── API heartbeat ──────────────────────────────────────────────────
if API_URL:
now_ts = time.time()
if now_ts - last_api_ts >= API_INTERVAL:
payload = build_api_payload(unit_map, cfg)
payload["log_tail"] = _read_log_tail(log_file, 25)
response = send_api_payload(payload, API_URL, API_TIMEOUT)
last_api_ts = now_ts
if response is not None:
state["api_status"] = "ok"
if response.get("update_available"):
state["update_available"] = True
else:
state["api_status"] = "fail"
else:
state["api_status"] = "disabled"
# ── SFM event forwarding ───────────────────────────────────────────
if sfm_state_obj is not None:
now_ts = time.time()
if now_ts - last_forward_ts >= SFM_FORWARD_INTERVAL:
last_forward_ts = now_ts
try:
counts = event_forwarder.forward_pending(
THORDATA_PATH, SFM_URL, sfm_state_obj,
max_age_days=SFM_MAX_AGE_DAYS,
quiescence_seconds=SFM_QUIESCENCE,
missing_report_grace_seconds=SFM_GRACE,
timeout=SFM_HTTP_TIMEOUT,
max_per_pass=SFM_MAX_PER_PASS,
logger=lambda m: log_message(log_file, ENABLE_LOGGING, m),
)
state["last_forward"] = datetime.now()
state["last_forward_counts"] = counts
if counts["errors"] > 0:
state["sfm_status"] = "fail"
else:
state["sfm_status"] = "ok"
summary = ("[sfm] pass scanned={scanned} forwarded={forwarded} "
"errors={errors} with_report={with_report}").format(**counts)
print(summary)
log_message(log_file, ENABLE_LOGGING, summary)
except Exception as exc:
state["sfm_status"] = "fail"
msg = "[sfm] pass failed: {}".format(exc)
print(msg)
log_message(log_file, ENABLE_LOGGING, msg)
except Exception as e:
err = "[loop-error] {}".format(e)
print(err)
log_message(log_file, ENABLE_LOGGING, err)
state["status"] = "error"
state["last_error"] = str(e)
stop_event.wait(timeout=SCAN_INTERVAL)
# ── Standalone entry point ────────────────────────────────────────────────────
def main() -> None: def main() -> None:
print("Starting Series 4 Ingest Agent (Micromate) v0.1.2") state: Dict[str, Any] = {}
print(f"THORDATA_PATH = {THORDATA_PATH}") stop_event = threading.Event()
print(f"SCAN_INTERVAL = {SCAN_INTERVAL} seconds")
print(f"LATE_DAYS = {LATE_DAYS}, STALE_DAYS = {STALE_DAYS}")
if not os.path.isdir(THORDATA_PATH):
print(f"[WARN] THORDATA_PATH does not exist: {THORDATA_PATH}", file=sys.stderr)
loop_counter = 0
try: try:
while True: run_watcher(state, stop_event)
loop_counter += 1
print(f"\n[LOOP] Iteration {loop_counter} starting...", flush=True)
try:
unit_map = scan_thordata(THORDATA_PATH)
debug(f"scan_thordata found {len(unit_map)} units")
print_heartbeat(unit_map)
emit_sfm_payload(unit_map)
print("[LOOP] Iteration complete, entering sleep...", flush=True)
except Exception as e:
# Catch-all so a single error doesn't kill the loop
print(f"[ERROR] Exception in main loop: {e}", file=sys.stderr)
sys.stderr.flush()
# Sleep in 1-second chunks to avoid VM time drift weirdness
for i in range(SCAN_INTERVAL):
time.sleep(1)
print("[LOOP] Woke up for next scan", flush=True)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nSeries 4 Ingest Agent stopped by user.") print("\nStopping...")
stop_event.set()
if __name__ == "__main__": if __name__ == "__main__":
+716
View File
@@ -0,0 +1,716 @@
"""
test_event_forwarder.py — unit tests for Thor Watcher's SFM event forwarder.
Covers:
- is_event_binary() filename matching (positive + negative cases)
- parse_event_filename() / serial_from_filename()
- idf_report_path() — the TXT/ subfolder convention
- ForwardState load/save round-trip + idempotency check
- find_pending_events() against the THORDATA/<Project>/<Unit>/ tree,
plus quiescence + grace-period + re-pair logic
- _encode_multipart() byte-level shape (boundary + headers)
- forward_event_pair() end-to-end against a tiny stdlib HTTP server
that mimics seismo-relay's POST /db/import/idf_file endpoint
- seed_state_from_folder() walks the tree without POSTing
Stdlib only — runs with `python -m pytest test_event_forwarder.py`
on Python 3.8+ (the watcher's compat target).
"""
from __future__ import annotations
import http.server
import json
import os
import tempfile
import threading
import time
import unittest
from pathlib import Path
import event_forwarder as ef
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_thordata(root: Path, project: str, unit: str) -> Path:
"""Create a THORDATA/<project>/<unit>/ folder pair; return unit_dir."""
unit_dir = root / project / unit
unit_dir.mkdir(parents=True, exist_ok=True)
return unit_dir
def _touch_with_age(p: Path, age_seconds: float, content: bytes = b"x") -> Path:
"""Create a file with controlled mtime."""
p.write_bytes(content)
target = time.time() - age_seconds
os.utime(p, (target, target))
return p
def _make_event(unit_dir: Path, name: str, age_seconds: float = 100,
content: bytes = b"x") -> Path:
return _touch_with_age(unit_dir / name, age_seconds, content)
def _make_txt(unit_dir: Path, base_name: str, age_seconds: float = 100,
content: bytes = b"r") -> Path:
txt_dir = unit_dir / "TXT"
txt_dir.mkdir(exist_ok=True)
return _touch_with_age(txt_dir / ef.idf_report_name(base_name),
age_seconds, content)
# ── is_event_binary() ────────────────────────────────────────────────────────
class TestIsEventBinary(unittest.TestCase):
def test_recognises_typical_thor_filenames(self):
for name in [
"UM11719_20231219163444.IDFH",
"UM11719_20231219162723.IDFW",
"BE9439_20200713124251.IDFH",
"UM13981_20220808082418.IDFH",
# case-insensitive
"um11719_20231219163444.idfh",
]:
self.assertTrue(ef.is_event_binary(name), name)
def test_rejects_non_event_extensions(self):
for name in [
"UM11719_20231219163436.MLG", # monitor log
"UM11719_20231219163444.IDFH.txt", # report sidecar
"UM11719_20231219164135.IDFW.CDB", # cache database variant
"UM11719_20231219164135.IDFH.CDB",
"agent.log",
"config.json",
"foo.bak",
"bar.tmp",
"UM11719_20231219163444.csv",
"UM11719_20231219163444.pdf",
"UM11719_20231219163444.html",
"UM11719_20231219163444.xml",
]:
self.assertFalse(ef.is_event_binary(name), name)
def test_rejects_malformed_filenames(self):
for name in [
"",
"no_extension",
"UM_20231219163444.IDFH", # missing serial digits
"1234_20231219163444.IDFH", # serial must start with letters
"UM11719_2023121916.IDFH", # short timestamp
"UM11719_20231219163444.IDFX", # wrong kind
"UM11719-20231219163444.IDFH", # wrong separator
]:
self.assertFalse(ef.is_event_binary(name), name)
def test_parse_event_filename(self):
from datetime import datetime
parsed = ef.parse_event_filename("UM11719_20231219163444.IDFW")
self.assertIsNotNone(parsed)
serial, ts, kind = parsed
self.assertEqual(serial, "UM11719")
self.assertEqual(ts, datetime(2023, 12, 19, 16, 34, 44))
self.assertEqual(kind, "IDFW")
def test_serial_from_filename(self):
self.assertEqual(ef.serial_from_filename("UM11719_20231219163444.IDFH"),
"UM11719")
self.assertEqual(ef.serial_from_filename("BE9439_20200713124251.IDFH"),
"BE9439")
self.assertIsNone(ef.serial_from_filename("not_an_event.bin"))
def test_idf_report_path_uses_txt_subfolder(self):
binary = "/foo/THORDATA/Project A/UM11719/UM11719_20231219163444.IDFW"
self.assertEqual(
ef.idf_report_path(binary),
os.path.join("/foo/THORDATA/Project A/UM11719",
"TXT", "UM11719_20231219163444.IDFW.txt"),
)
def test_is_histogram_event(self):
self.assertTrue(ef.is_histogram_event("UM11719_20231219163444.IDFH"))
self.assertTrue(ef.is_histogram_event("um11719_20231219163444.idfh"))
self.assertFalse(ef.is_histogram_event("UM11719_20231219162723.IDFW"))
# ── ForwardState ─────────────────────────────────────────────────────────────
class TestForwardState(unittest.TestCase):
def test_round_trip_persists_marked_entries(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
s = ef.ForwardState(path)
self.assertFalse(s.is_forwarded("abc123"))
s.mark_forwarded("abc123", "UM11719_20231219163444.IDFW", 8800)
self.assertTrue(s.is_forwarded("abc123"))
# Re-load from disk
s2 = ef.ForwardState(path)
self.assertTrue(s2.is_forwarded("abc123"))
self.assertEqual(s2.count(), 1)
def test_corrupt_state_file_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
f.write("not valid json {{{")
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
def test_version_mismatch_starts_fresh(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
json.dump({"version": 999, "forwarded": {"x": {}}}, f)
s = ef.ForwardState(path)
self.assertEqual(s.count(), 0)
def test_legacy_entries_default_to_had_report_true(self):
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "fwd.json")
with open(path, "w") as f:
json.dump({
"version": 1,
"forwarded": {
"abc123": {
"filename": "UM11719_20231219163444.IDFW",
"size": 123,
"forwarded_at": "2025-01-01T00:00:00Z",
# No had_report field — legacy entry
}
}
}, f)
state = ef.ForwardState(path)
self.assertIs(state.status("abc123"), True)
def test_status_returns_none_for_unknown_sha(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
self.assertIs(state.status("never-seen"), None)
def test_mark_with_had_report_false_then_promote(self):
with tempfile.TemporaryDirectory() as tmp:
state = ef.ForwardState(str(Path(tmp) / "fwd.json"))
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
had_report=False)
self.assertIs(state.status("xyz"), False)
state.mark_forwarded("xyz", "UM11719_20231219163444.IDFW", 100,
had_report=True)
self.assertIs(state.status("xyz"), True)
# ── find_pending_events() ────────────────────────────────────────────────────
class TestFindPendingEvents(unittest.TestCase):
def test_returns_pair_when_both_files_present_and_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][0]),
"UM11719_20231219163444.IDFW")
self.assertEqual(os.path.basename(pending[0][1]),
"UM11719_20231219163444.IDFW.txt")
def test_idfh_and_idfw_are_separate_events(self):
"""A single timestamp produces both .IDFH and .IDFW — they
forward as two independent events with their own state entries."""
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=120, content=b"histogram")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"waveform")
_make_txt(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=100, content=b"hreport")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"wreport")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 2)
names = sorted(os.path.basename(p[0]) for p in pending)
self.assertEqual(names, [
"UM11719_20231219163444.IDFH",
"UM11719_20231219163444.IDFW",
])
def test_pairing_when_txt_is_in_unit_root_does_not_match(self):
"""Sidecars MUST live in the TXT/ subfolder. A stray .txt
next to the binary is not the canonical location and should
not be picked up as a sidecar."""
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"bin")
# .txt is in the unit dir, not unit/TXT/
_touch_with_age(unit_dir / "UM11719_20231219163444.IDFW.txt",
age_seconds=100, content=b"misplaced")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
# Forward proceeds (grace period elapsed), but WITHOUT pairing
self.assertEqual(len(pending), 1)
self.assertIsNone(pending[0][1])
def test_skips_if_already_forwarded(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=120, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW", len(b"binary"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_if_too_fresh_to_be_quiescent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=1, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=1, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_forwards_alone_after_grace_when_txt_missing(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFH",
age_seconds=200, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
bin_path, txt_path = pending[0]
self.assertEqual(os.path.basename(bin_path),
"UM11719_20231219163444.IDFH")
self.assertIsNone(txt_path)
def test_re_pair_after_late_arriving_txt(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
bin_p = _make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
digest = ef.sha256_of_file(str(bin_p))
state.mark_forwarded(digest, "UM11719_20231219163444.IDFW",
len(b"binary"), had_report=False)
# First scan: TXT not present → still skipped.
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(pending, [])
# TXT finally appears.
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 1)
self.assertEqual(os.path.basename(pending[0][1]),
"UM11719_20231219163444.IDFW.txt")
def test_defers_when_txt_missing_and_within_grace(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=15, content=b"binary")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_skips_old_files_beyond_max_age_days(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=10 * 86400, content=b"binary")
_make_txt(unit_dir, "UM11719_20231219163444.IDFW",
age_seconds=10 * 86400, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=1,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_ignores_mlg_and_other_non_event_files(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_dir = _make_thordata(root, "Project A", "UM11719")
_make_event(unit_dir, "UM11719_20231219163436.MLG",
age_seconds=120, content=b"mlg")
_make_event(unit_dir, "UM11719_20231219164135.IDFW.CDB",
age_seconds=120, content=b"cache")
_touch_with_age(unit_dir / "agent.log", age_seconds=120, content=b"log")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_walks_multiple_projects_and_units(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit_a = _make_thordata(root, "Project A", "UM11719")
unit_b = _make_thordata(root, "Project B", "BE9439")
_make_event(unit_a, "UM11719_20231219163444.IDFW", age_seconds=200, content=b"a")
_make_event(unit_b, "BE9439_20200713131747.IDFW", age_seconds=200, content=b"b")
_make_txt(unit_a, "UM11719_20231219163444.IDFW", age_seconds=100, content=b"ar")
_make_txt(unit_b, "BE9439_20200713131747.IDFW", age_seconds=100, content=b"br")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=10000, # BE event is from 2020
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 2)
names = sorted(os.path.basename(p[0]) for p in pending)
self.assertEqual(names, [
"BE9439_20200713131747.IDFW",
"UM11719_20231219163444.IDFW",
])
def test_max_per_pass_caps_returned_count(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
for i in range(5):
name = "UM11719_2023121916344{}.IDFW".format(i)
_make_event(unit, name, age_seconds=120 + i, content=("bin-" + str(i)).encode())
_make_txt(unit, name, age_seconds=110 + i, content=b"report")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
max_per_pass=2,
)
self.assertEqual(len(pending), 2)
def test_max_per_pass_returns_oldest_first(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
ages = [200, 150, 100, 50]
for i, age in enumerate(ages):
name = "UM11719_2023121916344{}.IDFW".format(i)
_make_event(unit, name, age_seconds=age, content=("c" + str(i)).encode())
_make_txt(unit, name, age_seconds=max(1, age - 10), content=b"r")
state = ef.ForwardState(str(root / "fwd.json"))
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
max_per_pass=2,
)
names = [os.path.basename(p[0]) for p in pending]
# Oldest two should be index 0 (200s) and 1 (150s)
self.assertEqual(names, [
"UM11719_20231219163440.IDFW",
"UM11719_20231219163441.IDFW",
])
# ── Seed-state mode ──────────────────────────────────────────────────────────
class TestSeedStateFromFolder(unittest.TestCase):
def test_seeds_every_in_window_event_without_posting(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
for i in range(3):
_make_event(unit, "UM11719_2023121916344{}.IDFW".format(i),
age_seconds=120 + i, content=("e" + str(i)).encode())
# Ignored
_make_event(unit, "UM11719_20231219163436.MLG", age_seconds=120, content=b"mlg")
state = ef.ForwardState(str(root / "seed.json"))
counts = ef.seed_state_from_folder(str(root), state, max_age_days=30)
self.assertEqual(counts["scanned"], 3)
self.assertEqual(counts["seeded"], 3)
self.assertEqual(counts["already_known"], 0)
self.assertEqual(state.count(), 3)
def test_seeded_files_are_then_skipped_by_normal_scan(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
_make_txt(unit, "UM11719_20231219163444.IDFW", age_seconds=110, content=b"r")
_make_event(unit, "UM11719_20231219163444.IDFH", age_seconds=120, content=b"y")
_make_txt(unit, "UM11719_20231219163444.IDFH", age_seconds=110, content=b"r")
state = ef.ForwardState(str(root / "seed.json"))
ef.seed_state_from_folder(str(root), state, max_age_days=30)
pending = ef.find_pending_events(
str(root), state, max_age_days=30,
quiescence_seconds=5, missing_report_grace_seconds=60,
)
self.assertEqual(len(pending), 0)
def test_seed_is_idempotent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW", age_seconds=120, content=b"x")
state = ef.ForwardState(str(root / "seed.json"))
counts1 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
counts2 = ef.seed_state_from_folder(str(root), state, max_age_days=30)
self.assertEqual(counts1["seeded"], 1)
self.assertEqual(counts2["seeded"], 0)
self.assertEqual(counts2["already_known"], 1)
self.assertEqual(state.count(), 1)
# ── Multipart encoder ────────────────────────────────────────────────────────
class TestMultipartEncoder(unittest.TestCase):
def test_encodes_two_parts_with_proper_boundary(self):
body, content_type = ef._encode_multipart([
("files", "a.bin", "application/octet-stream", b"\x01\x02"),
("files", "a.txt", "text/plain", b"hello"),
])
self.assertTrue(content_type.startswith("multipart/form-data; boundary="))
boundary = content_type.split("boundary=", 1)[1]
self.assertIn(boundary.encode("ascii"), body)
text = body.decode("latin-1")
self.assertIn('name="files"; filename="a.bin"', text)
self.assertIn('name="files"; filename="a.txt"', text)
self.assertIn("Content-Type: application/octet-stream", text)
self.assertIn("Content-Type: text/plain", text)
self.assertTrue(text.rstrip("\r\n").endswith(f"--{boundary}--"))
# ── End-to-end forward_event_pair against a fake server ──────────────────────
class _FakeImportHandler(http.server.BaseHTTPRequestHandler):
"""Mimics seismo-relay's POST /db/import/idf_file response."""
received = [] # class-level capture for test inspection
def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
ctype = self.headers.get("Content-Type", "")
parts = body.split(b"--" + ctype.split("boundary=")[-1].encode())
filenames = []
for p in parts:
for line in p.split(b"\r\n"):
if b'filename="' in line:
fn = line.split(b'filename="', 1)[1].split(b'"', 1)[0]
filenames.append(fn.decode("latin-1"))
self.__class__.received.append({
"path": self.path,
"ctype": ctype,
"filenames": filenames,
})
results = []
binary_fn = next(
(fn for fn in filenames if not fn.lower().endswith(".txt")),
None,
)
if binary_fn:
results.append({
"filename": binary_fn,
"status": "ok",
"stored_filename": binary_fn,
"filesize": len(body),
"sha256": "00" * 32,
"report_attached": any(fn.lower().endswith(".txt") for fn in filenames),
"inserted": 1,
"skipped": 0,
})
payload = json.dumps({"count": len(results), "results": results}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def log_message(self, *_a, **_kw): # silence the test runner
pass
def _start_fake_server():
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeImportHandler)
threading.Thread(target=server.serve_forever, daemon=True).start()
host, port = server.server_address
return server, f"http://{host}:{port}"
class TestForwardEventPair(unittest.TestCase):
def setUp(self):
_FakeImportHandler.received = []
self.server, self.base_url = _start_fake_server()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
def test_post_with_paired_report(self):
with tempfile.TemporaryDirectory() as tmp:
tmp_p = Path(tmp)
bin_p = tmp_p / "UM11719_20231219163444.IDFW"
txt_p = tmp_p / "UM11719_20231219163444.IDFW.txt"
bin_p.write_bytes(b"\x10\x20\x30 binary")
txt_p.write_bytes(b'"SerialNumber : UM11719"\n')
result = ef.forward_event_pair(
self.base_url, str(bin_p), str(txt_p), timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertEqual(result["filename"], "UM11719_20231219163444.IDFW")
self.assertTrue(result["report_attached"])
self.assertEqual(len(_FakeImportHandler.received), 1)
req = _FakeImportHandler.received[0]
# Path includes the serial-hint auto-extracted from the filename
self.assertTrue(req["path"].startswith("/db/import/idf_file"))
self.assertIn("serial=UM11719", req["path"])
self.assertIn("UM11719_20231219163444.IDFW", req["filenames"])
self.assertIn("UM11719_20231219163444.IDFW.txt", req["filenames"])
def test_post_without_report(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "UM11719_20231219163444.IDFH"
bin_p.write_bytes(b"binary only")
result = ef.forward_event_pair(
self.base_url, str(bin_p), None, timeout=5.0,
)
self.assertEqual(result["status"], "ok")
self.assertFalse(result["report_attached"])
req = _FakeImportHandler.received[0]
self.assertEqual(req["filenames"], ["UM11719_20231219163444.IDFH"])
def test_explicit_serial_hint_overrides_auto(self):
with tempfile.TemporaryDirectory() as tmp:
bin_p = Path(tmp) / "UM11719_20231219163444.IDFW"
bin_p.write_bytes(b"x")
ef.forward_event_pair(
self.base_url, str(bin_p), None,
serial_hint="OVERRIDE99", timeout=5.0,
)
req = _FakeImportHandler.received[0]
self.assertIn("serial=OVERRIDE99", req["path"])
# ── forward_pending() smoke test ─────────────────────────────────────────────
class TestForwardPending(unittest.TestCase):
"""End-to-end: tree → find → POST → state-update → no re-POST."""
def setUp(self):
_FakeImportHandler.received = []
self.server, self.base_url = _start_fake_server()
def tearDown(self):
self.server.shutdown()
self.server.server_close()
def test_pass_then_re_pass_is_idempotent(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
unit = _make_thordata(root, "Project A", "UM11719")
_make_event(unit, "UM11719_20231219163444.IDFW",
age_seconds=200, content=b"binary")
_make_txt(unit, "UM11719_20231219163444.IDFW",
age_seconds=100, content=b"report")
_make_event(unit, "UM11719_20231219163444.IDFH",
age_seconds=200, content=b"histogram")
_make_txt(unit, "UM11719_20231219163444.IDFH",
age_seconds=100, content=b"hreport")
state = ef.ForwardState(str(root / "fwd.json"))
counts = ef.forward_pending(
str(root), self.base_url, state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60, timeout=5.0,
)
self.assertEqual(counts["scanned"], 2)
self.assertEqual(counts["forwarded"], 2)
self.assertEqual(counts["errors"], 0)
self.assertEqual(counts["with_report"], 2)
self.assertEqual(state.count(), 2)
self.assertEqual(len(_FakeImportHandler.received), 2)
# Re-pass: nothing pending; no new POSTs.
counts2 = ef.forward_pending(
str(root), self.base_url, state,
max_age_days=30, quiescence_seconds=5,
missing_report_grace_seconds=60, timeout=5.0,
)
self.assertEqual(counts2["scanned"], 0)
self.assertEqual(counts2["forwarded"], 0)
self.assertEqual(len(_FakeImportHandler.received), 2)
if __name__ == "__main__":
unittest.main()
+669
View File
@@ -0,0 +1,669 @@
"""
Thor Watcher — Settings Dialog v0.3.0
Provides a Tkinter settings dialog that doubles as a first-run wizard.
Public API:
show_dialog(config_path, wizard=False) -> bool
Returns True if the user saved, False if they cancelled.
"""
import os
import json
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from socket import gethostname
import series4_ingest as watcher
# ── Defaults (mirror config.example.json) ────────────────────────────────────
DEFAULTS = {
"thordata_path": r"C:\THORDATA",
"scan_interval": 60,
"api_url": "",
"api_timeout": 5,
"api_interval": 300,
"source_id": "",
"source_type": "series4_watcher",
"local_timezone": "America/New_York",
"enable_logging": True,
"log_file": os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "C:\\",
"ThorWatcher", "agent_logs", "thor_watcher.log"
),
"log_retention_days": 30,
"update_source": "gitea",
"update_url": "",
# SFM forwarder defaults — mirror series4_ingest.load_config
"sfm_forward_enabled": False,
"sfm_url": "",
"sfm_forward_interval": 60,
"sfm_quiescence_seconds": 5,
"sfm_missing_report_grace_seconds": 60,
"sfm_http_timeout": 60,
"sfm_state_file": "",
"sfm_max_forwards_per_pass": 500,
"sfm_max_event_age_days": 365,
}
# ── Config I/O ────────────────────────────────────────────────────────────────
def _load_config(config_path):
"""Load existing config.json, merged with DEFAULTS for any missing key."""
values = dict(DEFAULTS)
if not os.path.exists(config_path):
return values
try:
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
values.update(raw)
except Exception:
pass
return values
def _save_config(config_path, values):
"""Write values dict to config_path as JSON."""
config_dir = os.path.dirname(config_path)
if config_dir and not os.path.exists(config_dir):
os.makedirs(config_dir)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(values, f, indent=2)
# ── Widget helpers ────────────────────────────────────────────────────────────
def _make_spinbox(parent, from_, to, width=8):
try:
sb = ttk.Spinbox(parent, from_=from_, to=to, width=width)
except AttributeError:
sb = tk.Spinbox(parent, from_=from_, to=to, width=width)
return sb
def _add_label_entry(frame, row, label_text, var, hint=None, readonly=False):
tk.Label(frame, text=label_text, anchor="w").grid(
row=row, column=0, sticky="w", padx=(8, 4), pady=4
)
state = "readonly" if readonly else "normal"
entry = ttk.Entry(frame, textvariable=var, width=42, state=state)
entry.grid(row=row, column=1, sticky="ew", padx=(0, 8), pady=4)
if hint and not var.get():
entry.config(foreground="grey")
entry.insert(0, hint)
def _on_focus_in(event, e=entry, h=hint, v=var):
if e.get() == h:
e.delete(0, tk.END)
e.config(foreground="black")
def _on_focus_out(event, e=entry, h=hint, v=var):
if not e.get():
e.config(foreground="grey")
e.insert(0, h)
v.set("")
entry.bind("<FocusIn>", _on_focus_in)
entry.bind("<FocusOut>", _on_focus_out)
return entry
def _add_label_spinbox(frame, row, label_text, var, from_, to):
tk.Label(frame, text=label_text, anchor="w").grid(
row=row, column=0, sticky="w", padx=(8, 4), pady=4
)
sb = _make_spinbox(frame, from_=from_, to=to, width=8)
sb.grid(row=row, column=1, sticky="w", padx=(0, 8), pady=4)
sb.delete(0, tk.END)
sb.insert(0, str(var.get()))
def _on_change(*args):
var.set(sb.get())
sb.config(command=_on_change)
sb.bind("<KeyRelease>", _on_change)
return sb
def _add_label_check(frame, row, label_text, var):
cb = ttk.Checkbutton(frame, text=label_text, variable=var)
cb.grid(row=row, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=4)
return cb
def _add_label_browse_entry(frame, row, label_text, var, browse_fn):
tk.Label(frame, text=label_text, anchor="w").grid(
row=row, column=0, sticky="w", padx=(8, 4), pady=4
)
inner = tk.Frame(frame)
inner.grid(row=row, column=1, sticky="ew", padx=(0, 8), pady=4)
inner.columnconfigure(0, weight=1)
entry = ttk.Entry(inner, textvariable=var, width=36)
entry.grid(row=0, column=0, sticky="ew")
btn = ttk.Button(inner, text="Browse...", command=browse_fn, width=9)
btn.grid(row=0, column=1, padx=(4, 0))
return entry
# ── Main dialog class ─────────────────────────────────────────────────────────
class SettingsDialog:
def __init__(self, parent, config_path, wizard=False):
self.config_path = config_path
self.wizard = wizard
self.saved = False
self.root = parent
kind = "Setup" if wizard else "Settings"
title = "Thor Watcher v{}{}".format(watcher.VERSION, kind)
self.root.title(title)
self.root.resizable(False, False)
self.root.update_idletasks()
self._values = _load_config(config_path)
self._build_vars()
self._build_ui()
self.root.grab_set()
self.root.protocol("WM_DELETE_WINDOW", self._on_cancel)
# ── Variable setup ────────────────────────────────────────────────────────
def _build_vars(self):
v = self._values
# Connection
raw_url = str(v.get("api_url", ""))
_suffix = "/api/series4/heartbeat"
if raw_url.endswith(_suffix):
raw_url = raw_url[:-len(_suffix)]
self.var_api_url = tk.StringVar(value=raw_url)
self.var_api_interval = tk.StringVar(value=str(v.get("api_interval", 300)))
self.var_source_id = tk.StringVar(value=str(v.get("source_id", "")))
self.var_source_type = tk.StringVar(value=str(v.get("source_type", "series4_watcher")))
# Paths
self.var_thordata_path = tk.StringVar(value=str(v.get("thordata_path", r"C:\THORDATA")))
self.var_log_file = tk.StringVar(value=str(v.get("log_file", DEFAULTS["log_file"])))
# Scanning
self.var_scan_interval = tk.StringVar(value=str(v.get("scan_interval", 60)))
# Logging
en = v.get("enable_logging", True)
self.var_enable_logging = tk.BooleanVar(value=bool(en) if isinstance(en, bool) else str(en).lower() in ("true", "1", "yes"))
self.var_log_retention_days = tk.StringVar(value=str(v.get("log_retention_days", 30)))
# Updates
src = str(v.get("update_source", "gitea")).lower()
if src not in ("gitea", "url", "disabled"):
src = "gitea"
self.var_local_timezone = tk.StringVar(value=str(v.get("local_timezone", "America/New_York")))
self.var_update_source = tk.StringVar(value=src)
self.var_update_url = tk.StringVar(value=str(v.get("update_url", "")))
# SFM Forwarder
sfm_en = v.get("sfm_forward_enabled", False)
self.var_sfm_enabled = tk.BooleanVar(
value=bool(sfm_en) if isinstance(sfm_en, bool) else str(sfm_en).lower() in ("true", "1", "yes")
)
self.var_sfm_url = tk.StringVar(value=str(v.get("sfm_url", "")))
self.var_sfm_forward_interval = tk.StringVar(value=str(v.get("sfm_forward_interval", 60)))
self.var_sfm_quiescence = tk.StringVar(value=str(v.get("sfm_quiescence_seconds", 5)))
self.var_sfm_grace = tk.StringVar(value=str(v.get("sfm_missing_report_grace_seconds", 60)))
self.var_sfm_http_timeout = tk.StringVar(value=str(v.get("sfm_http_timeout", 60)))
self.var_sfm_max_per_pass = tk.StringVar(value=str(v.get("sfm_max_forwards_per_pass", 500)))
self.var_sfm_max_age_days = tk.StringVar(value=str(v.get("sfm_max_event_age_days", 365)))
self.var_sfm_state_file = tk.StringVar(value=str(v.get("sfm_state_file", "")))
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
outer = tk.Frame(self.root, padx=10, pady=8)
outer.pack(fill="both", expand=True)
if self.wizard:
welcome = (
"Welcome to Thor Watcher!\n\n"
"No configuration file was found. Please review the settings below\n"
"and click \"Save & Start\" when you are ready."
)
tk.Label(
outer, text=welcome, justify="left",
wraplength=460, fg="#1a5276", font=("TkDefaultFont", 9, "bold"),
).pack(fill="x", pady=(0, 8))
nb = ttk.Notebook(outer)
nb.pack(fill="both", expand=True)
self._build_tab_connection(nb)
self._build_tab_paths(nb)
self._build_tab_scanning(nb)
self._build_tab_logging(nb)
self._build_tab_forwarding(nb)
self._build_tab_updates(nb)
btn_frame = tk.Frame(outer)
btn_frame.pack(fill="x", pady=(10, 0))
save_label = "Save & Start" if self.wizard else "Save"
ttk.Button(btn_frame, text=save_label, command=self._on_save, width=14).pack(side="right", padx=(4, 0))
ttk.Button(btn_frame, text="Cancel", command=self._on_cancel, width=10).pack(side="right")
def _tab_frame(self, nb, title):
outer = tk.Frame(nb, padx=4, pady=4)
nb.add(outer, text=title)
outer.columnconfigure(1, weight=1)
return outer
def _build_tab_connection(self, nb):
f = self._tab_frame(nb, "Connection")
# URL row with Test button
tk.Label(f, text="Terra-View URL", anchor="w").grid(
row=0, column=0, sticky="w", padx=(8, 4), pady=4
)
url_frame = tk.Frame(f)
url_frame.grid(row=0, column=1, sticky="ew", padx=(0, 8), pady=4)
url_frame.columnconfigure(0, weight=1)
url_entry = ttk.Entry(url_frame, textvariable=self.var_api_url, width=32)
url_entry.grid(row=0, column=0, sticky="ew")
_hint = "http://192.168.x.x:8000"
if not self.var_api_url.get():
url_entry.config(foreground="grey")
url_entry.insert(0, _hint)
def _on_focus_in(e):
if url_entry.get() == _hint:
url_entry.delete(0, tk.END)
url_entry.config(foreground="black")
def _on_focus_out(e):
if not url_entry.get():
url_entry.config(foreground="grey")
url_entry.insert(0, _hint)
self.var_api_url.set("")
url_entry.bind("<FocusIn>", _on_focus_in)
url_entry.bind("<FocusOut>", _on_focus_out)
self._test_btn = ttk.Button(url_frame, text="Test", width=6,
command=self._test_connection)
self._test_btn.grid(row=0, column=1, padx=(4, 0))
self._test_status = tk.Label(url_frame, text="", anchor="w", width=20)
self._test_status.grid(row=0, column=2, padx=(6, 0))
_add_label_spinbox(f, 1, "API Interval (sec)", self.var_api_interval, 30, 3600)
source_id_hint = "Defaults to hostname ({})".format(gethostname())
_add_label_entry(f, 2, "Source ID", self.var_source_id, hint=source_id_hint)
_add_label_entry(f, 3, "Source Type", self.var_source_type, readonly=True)
_add_label_entry(f, 4, "Local Timezone", self.var_local_timezone,
hint="e.g. America/New_York, America/Chicago")
tk.Label(
f,
text="Used to convert MLG file timestamps (local time) to UTC for terra-view.",
justify="left", fg="#555555", wraplength=340,
).grid(row=5, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(0, 4))
def _test_connection(self):
import urllib.request
import urllib.error
self._test_status.config(text="Testing...", foreground="grey")
self._test_btn.config(state="disabled")
self.root.update_idletasks()
raw = self.var_api_url.get().strip()
if not raw or raw == "http://192.168.x.x:8000":
self._test_status.config(text="Enter a URL first", foreground="orange")
self._test_btn.config(state="normal")
return
url = raw.rstrip("/") + "/health"
try:
with urllib.request.urlopen(urllib.request.Request(url), timeout=5) as resp:
if resp.status == 200:
self._test_status.config(text="Connected!", foreground="green")
else:
self._test_status.config(text="HTTP {}".format(resp.status), foreground="orange")
except urllib.error.URLError as e:
reason = str(e.reason) if hasattr(e, "reason") else str(e)
self._test_status.config(text="Failed: {}".format(reason[:30]), foreground="red")
except Exception as e:
self._test_status.config(text="Error: {}".format(str(e)[:30]), foreground="red")
finally:
self._test_btn.config(state="normal")
def _build_tab_paths(self, nb):
f = self._tab_frame(nb, "Paths")
def browse_thordata():
d = filedialog.askdirectory(
title="Select THORDATA Folder",
initialdir=self.var_thordata_path.get() or "C:\\",
)
if d:
self.var_thordata_path.set(d.replace("/", "\\"))
_add_label_browse_entry(f, 0, "THORDATA Path", self.var_thordata_path, browse_thordata)
def browse_log():
p = filedialog.asksaveasfilename(
title="Select Log File",
defaultextension=".log",
filetypes=[("Log files", "*.log"), ("Text files", "*.txt"), ("All files", "*.*")],
initialfile=os.path.basename(self.var_log_file.get() or "thor_watcher.log"),
initialdir=os.path.dirname(self.var_log_file.get() or "C:\\"),
)
if p:
self.var_log_file.set(p.replace("/", "\\"))
_add_label_browse_entry(f, 1, "Log File", self.var_log_file, browse_log)
def _build_tab_scanning(self, nb):
f = self._tab_frame(nb, "Scanning")
_add_label_spinbox(f, 0, "Scan Interval (sec)", self.var_scan_interval, 10, 3600)
def _build_tab_logging(self, nb):
f = self._tab_frame(nb, "Logging")
_add_label_check(f, 0, "Enable Logging", self.var_enable_logging)
_add_label_spinbox(f, 1, "Log Retention (days)", self.var_log_retention_days, 1, 365)
def _build_tab_forwarding(self, nb):
f = self._tab_frame(nb, "SFM Forward")
# Row 0: enable checkbox
_add_label_check(f, 0, "Enable SFM Forwarding", self.var_sfm_enabled)
# Row 1: SFM URL + Test button
tk.Label(f, text="SFM URL", anchor="w").grid(
row=1, column=0, sticky="w", padx=(8, 4), pady=4
)
url_frame = tk.Frame(f)
url_frame.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
url_frame.columnconfigure(0, weight=1)
sfm_entry = ttk.Entry(url_frame, textvariable=self.var_sfm_url, width=32)
sfm_entry.grid(row=0, column=0, sticky="ew")
_hint = "http://10.0.0.44:8200"
if not self.var_sfm_url.get():
sfm_entry.config(foreground="grey")
sfm_entry.insert(0, _hint)
def _on_focus_in(e, ent=sfm_entry, h=_hint):
if ent.get() == h:
ent.delete(0, tk.END)
ent.config(foreground="black")
def _on_focus_out(e, ent=sfm_entry, h=_hint, v=self.var_sfm_url):
if not ent.get():
ent.config(foreground="grey")
ent.insert(0, h)
v.set("")
sfm_entry.bind("<FocusIn>", _on_focus_in)
sfm_entry.bind("<FocusOut>", _on_focus_out)
self._sfm_test_btn = ttk.Button(url_frame, text="Test", width=6,
command=self._test_sfm_connection)
self._sfm_test_btn.grid(row=0, column=1, padx=(4, 0))
self._sfm_test_status = tk.Label(url_frame, text="", anchor="w", width=20)
self._sfm_test_status.grid(row=0, column=2, padx=(6, 0))
# Rows 2-7: timing/limits spinboxes
_add_label_spinbox(f, 2, "Forward Interval (sec)", self.var_sfm_forward_interval, 30, 3600)
_add_label_spinbox(f, 3, "Quiescence (sec)", self.var_sfm_quiescence, 1, 60)
_add_label_spinbox(f, 4, "Missing-Report Grace (sec)", self.var_sfm_grace, 0, 3600)
_add_label_spinbox(f, 5, "HTTP Timeout (sec)", self.var_sfm_http_timeout, 5, 300)
_add_label_spinbox(f, 6, "Max Forwards Per Pass", self.var_sfm_max_per_pass, 1, 5000)
_add_label_spinbox(f, 7, "Max Event Age (days)", self.var_sfm_max_age_days, 1, 3650)
# Row 8: state file browse
def browse_state():
p = filedialog.asksaveasfilename(
title="Select SFM State File",
defaultextension=".json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
initialfile=os.path.basename(self.var_sfm_state_file.get() or "thor_forwarded.json"),
initialdir=os.path.dirname(self.var_sfm_state_file.get() or "C:\\"),
)
if p:
self.var_sfm_state_file.set(p.replace("/", "\\"))
_add_label_browse_entry(f, 8, "State File", self.var_sfm_state_file, browse_state)
# Row 9: help text
help_text = (
"Forwards .IDFH (histogram) and .IDFW (waveform) event files plus their\n"
"TXT/<basename>.txt sidecars to a seismo-relay SFM server.\n"
"Idempotent: each file is tracked by sha256, so re-scans never re-POST.\n"
"If the TXT sidecar appears AFTER the binary was forwarded alone, the\n"
"next pass will re-forward so the relay can refresh the DB row with\n"
"device-authoritative PPV/ZCFreq/peak values.\n"
"State file blank → defaults to <log_dir>\\thor_forwarded.json."
)
tk.Label(
f, text=help_text, justify="left", fg="#555555", wraplength=420,
).grid(row=9, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(8, 4))
def _test_sfm_connection(self):
import urllib.request
import urllib.error
self._sfm_test_status.config(text="Testing...", foreground="grey")
self._sfm_test_btn.config(state="disabled")
self.root.update_idletasks()
raw = self.var_sfm_url.get().strip()
if not raw or raw == "http://10.0.0.44:8200":
self._sfm_test_status.config(text="Enter a URL first", foreground="orange")
self._sfm_test_btn.config(state="normal")
return
url = raw.rstrip("/") + "/health"
try:
with urllib.request.urlopen(urllib.request.Request(url), timeout=5) as resp:
if resp.status == 200:
self._sfm_test_status.config(text="Connected!", foreground="green")
else:
self._sfm_test_status.config(text="HTTP {}".format(resp.status), foreground="orange")
except urllib.error.URLError as e:
reason = str(e.reason) if hasattr(e, "reason") else str(e)
self._sfm_test_status.config(text="Failed: {}".format(reason[:30]), foreground="red")
except Exception as e:
self._sfm_test_status.config(text="Error: {}".format(str(e)[:30]), foreground="red")
finally:
self._sfm_test_btn.config(state="normal")
def _build_tab_updates(self, nb):
f = self._tab_frame(nb, "Updates")
tk.Label(f, text="Auto-Update Source", anchor="w").grid(
row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 2)
)
radio_frame = tk.Frame(f)
radio_frame.grid(row=0, column=1, sticky="w", padx=(0, 8), pady=(8, 2))
ttk.Radiobutton(
radio_frame, text="Gitea (default)",
variable=self.var_update_source, value="gitea",
command=self._on_update_source_change,
).grid(row=0, column=0, sticky="w", padx=(0, 12))
ttk.Radiobutton(
radio_frame, text="Custom URL",
variable=self.var_update_source, value="url",
command=self._on_update_source_change,
).grid(row=0, column=1, sticky="w", padx=(0, 12))
ttk.Radiobutton(
radio_frame, text="Disabled",
variable=self.var_update_source, value="disabled",
command=self._on_update_source_change,
).grid(row=0, column=2, sticky="w")
tk.Label(f, text="Update Server URL", anchor="w").grid(
row=1, column=0, sticky="w", padx=(8, 4), pady=4
)
self._update_url_entry = ttk.Entry(f, textvariable=self.var_update_url, width=42)
self._update_url_entry.grid(row=1, column=1, sticky="ew", padx=(0, 8), pady=4)
tk.Label(
f,
text=(
"Gitea: checks the Gitea release page automatically every 5 minutes.\n"
"Custom URL: fetches version.txt and thor-watcher.exe from a web\n"
"server — use when Gitea is not reachable (e.g. terra-view URL).\n"
"Disabled: no automatic update checks. Remote push from terra-view\n"
"still works when disabled."
),
justify="left", fg="#555555", wraplength=380,
).grid(row=2, column=0, columnspan=2, sticky="w", padx=(8, 8), pady=(4, 8))
self._on_update_source_change()
def _on_update_source_change(self):
if self.var_update_source.get() == "url":
self._update_url_entry.config(state="normal")
else:
self._update_url_entry.config(state="disabled")
# ── Validation ────────────────────────────────────────────────────────────
def _get_int_var(self, var, name, min_val, max_val):
raw = str(var.get()).strip()
try:
val = int(raw)
except ValueError:
messagebox.showerror("Validation Error",
"{} must be an integer (got: {!r}).".format(name, raw))
return None
if val < min_val or val > max_val:
messagebox.showerror("Validation Error",
"{} must be between {} and {} (got {}).".format(name, min_val, max_val, val))
return None
return val
# ── Save / Cancel ─────────────────────────────────────────────────────────
def _on_save(self):
checks = [
(self.var_api_interval, "API Interval", 30, 3600),
(self.var_scan_interval, "Scan Interval", 10, 3600),
(self.var_log_retention_days, "Log Retention Days", 1, 365),
(self.var_sfm_forward_interval, "Forward Interval", 30, 3600),
(self.var_sfm_quiescence, "Quiescence", 1, 60),
(self.var_sfm_grace, "Missing-Report Grace", 0, 3600),
(self.var_sfm_http_timeout, "HTTP Timeout", 5, 300),
(self.var_sfm_max_per_pass, "Max Forwards Per Pass", 1, 5000),
(self.var_sfm_max_age_days, "Max Event Age (days)", 1, 3650),
]
int_values = {}
for var, name, mn, mx in checks:
result = self._get_int_var(var, name, mn, mx)
if result is None:
return
int_values[name] = result
source_id = self.var_source_id.get().strip()
if source_id.startswith("Defaults to hostname"):
source_id = ""
api_url = self.var_api_url.get().strip()
if api_url == "http://192.168.x.x:8000" or not api_url:
api_url = ""
else:
api_url = api_url.rstrip("/") + "/api/series4/heartbeat"
sfm_url = self.var_sfm_url.get().strip()
if sfm_url == "http://10.0.0.44:8200":
sfm_url = ""
sfm_url = sfm_url.rstrip("/") # event_forwarder adds the endpoint path
values = {
"thordata_path": self.var_thordata_path.get().strip(),
"scan_interval": int_values["Scan Interval"],
"api_url": api_url,
"api_timeout": 5,
"api_interval": int_values["API Interval"],
"source_id": source_id,
"source_type": self.var_source_type.get().strip() or "series4_watcher",
"local_timezone": self.var_local_timezone.get().strip() or "America/New_York",
"enable_logging": self.var_enable_logging.get(),
"log_file": self.var_log_file.get().strip(),
"log_retention_days": int_values["Log Retention Days"],
"update_source": self.var_update_source.get().strip() or "gitea",
"update_url": self.var_update_url.get().strip(),
"sfm_forward_enabled": self.var_sfm_enabled.get(),
"sfm_url": sfm_url,
"sfm_forward_interval": int_values["Forward Interval"],
"sfm_quiescence_seconds": int_values["Quiescence"],
"sfm_missing_report_grace_seconds": int_values["Missing-Report Grace"],
"sfm_http_timeout": int_values["HTTP Timeout"],
"sfm_max_forwards_per_pass": int_values["Max Forwards Per Pass"],
"sfm_max_event_age_days": int_values["Max Event Age (days)"],
"sfm_state_file": self.var_sfm_state_file.get().strip(),
}
try:
_save_config(self.config_path, values)
except Exception as e:
messagebox.showerror("Save Error", "Could not write config.json:\n{}".format(e))
return
self.saved = True
self.root.destroy()
def _on_cancel(self):
self.saved = False
self.root.destroy()
# ── Public API ────────────────────────────────────────────────────────────────
def show_dialog(config_path, wizard=False):
"""
Open the settings dialog.
Parameters
----------
config_path : str
Absolute path to config.json (read if exists, written on Save).
wizard : bool
If True, shows first-run welcome message and "Save & Start" button.
Returns
-------
bool
True if the user saved, False if they cancelled.
"""
root = tk.Tk()
root.withdraw()
top = tk.Toplevel(root)
top.deiconify()
dlg = SettingsDialog(top, config_path, wizard=wizard)
top.update_idletasks()
w = top.winfo_reqwidth()
h = top.winfo_reqheight()
sw = top.winfo_screenwidth()
sh = top.winfo_screenheight()
top.geometry("{}x{}+{}+{}".format(w, h, (sw - w) // 2, (sh - h) // 2))
root.wait_window(top)
root.destroy()
return dlg.saved
+555
View File
@@ -0,0 +1,555 @@
"""
Thor Watcher — System Tray Launcher v0.3.0
Requires: pystray, Pillow, tkinter (stdlib)
Run with: pythonw thor_tray.py (no console window)
or: python thor_tray.py (with console, for debugging)
Put a shortcut to this in shell:startup for auto-start on login.
"""
import os
import sys
import json
import subprocess
import tempfile
import threading
import urllib.request
import urllib.error
from datetime import datetime
import pystray
from PIL import Image, ImageDraw
import series4_ingest as watcher
# ── Auto-updater ──────────────────────────────────────────────────────────────
GITEA_BASE = "https://gitea.serversdown.net"
GITEA_USER = "serversdown"
GITEA_REPO = "thor-watcher"
GITEA_API_URL = "{}/api/v1/repos/{}/{}/releases?limit=1&page=1".format(
GITEA_BASE, GITEA_USER, GITEA_REPO
)
_CURRENT_VERSION = getattr(watcher, "VERSION", "0.0.0")
def _version_tuple(v):
"""Convert '0.2.0' -> (0, 2, 0) for comparison."""
parts = []
for p in str(v).lstrip("v").split(".")[:3]:
try:
parts.append(int(p))
except ValueError:
parts.append(0)
while len(parts) < 3:
parts.append(0)
return tuple(parts)
def _update_log(msg):
"""Append a timestamped [updater] line to the watcher log."""
try:
log_path = os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "",
"ThorWatcher", "agent_logs", "thor_watcher.log"
)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a") as f:
f.write("[{}] [updater] {}\n".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), msg
))
except Exception:
pass
def _check_for_update_gitea():
"""Query Gitea API for latest release. Returns (tag, download_url) or (None, None)."""
try:
req = urllib.request.Request(
GITEA_API_URL,
headers={"User-Agent": "thor-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=8) as resp:
releases = json.loads(resp.read().decode("utf-8"))
if not releases:
return None, None
latest = releases[0]
tag = latest.get("tag_name", "")
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
assets = latest.get("assets", [])
for asset in assets:
name = asset.get("name", "").lower()
if name.endswith(".exe") and "setup" not in name:
return tag, asset.get("browser_download_url")
_update_log("Newer release {} found but no valid .exe asset".format(tag))
return tag, None
except Exception as e:
_update_log("check_for_update (gitea) failed: {}".format(e))
return None, None
def _check_for_update_url(base_url):
"""Query a custom URL server for latest version. Returns (tag, download_url) or (None, None)."""
if not base_url:
_update_log("update_source=url but update_url is empty — skipping")
return None, None
try:
ver_url = base_url.rstrip("/") + "/api/updates/thor-watcher/version.txt"
req = urllib.request.Request(
ver_url,
headers={"User-Agent": "thor-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=8) as resp:
tag = resp.read().decode("utf-8").strip()
if not tag:
return None, None
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
exe_url = base_url.rstrip("/") + "/api/updates/thor-watcher/thor-watcher.exe"
return tag, exe_url
except Exception as e:
_update_log("check_for_update (url mode) failed: {}".format(e))
return None, None
def check_for_update():
"""
Check for an update using the configured source (gitea, url, or disabled).
Reads update_source and update_url from config.json at check time.
Returns (tag, download_url) if an update is available, else (None, None).
"""
try:
cfg = _read_config()
update_source = str(cfg.get("update_source", "gitea")).strip().lower()
update_url = str(cfg.get("update_url", "")).strip()
except Exception:
update_source = "gitea"
update_url = ""
if update_source == "disabled":
return None, None
_update_log("Checking for update (source={}, version={})".format(
update_source, _CURRENT_VERSION
))
if update_source == "url":
return _check_for_update_url(update_url)
else:
return _check_for_update_gitea()
def apply_update(download_url):
"""
Download new .exe, validate it, write swap .bat, launch it, exit.
Backs up old exe to .exe.old before replacing.
"""
exe_path = os.path.abspath(sys.executable if getattr(sys, "frozen", False) else sys.argv[0])
try:
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".exe", prefix="tw_update_")
os.close(tmp_fd)
_update_log("Downloading update from: {}".format(download_url))
req = urllib.request.Request(
download_url,
headers={"User-Agent": "thor-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=60) as resp:
with open(tmp_path, "wb") as f:
f.write(resp.read())
# Three-layer validation
try:
dl_size = os.path.getsize(tmp_path)
current_size = os.path.getsize(exe_path)
_update_log("Download complete ({} bytes), validating...".format(dl_size))
if dl_size < 100 * 1024:
_update_log("Validation failed: too small ({} bytes) — aborting".format(dl_size))
os.remove(tmp_path)
return False
if current_size > 0 and dl_size < current_size * 0.5:
_update_log("Validation failed: suspiciously small ({} vs {} bytes) — aborting".format(
dl_size, current_size))
os.remove(tmp_path)
return False
with open(tmp_path, "rb") as _f:
magic = _f.read(2)
if magic != b"MZ":
_update_log("Validation failed: not a valid Windows exe — aborting")
os.remove(tmp_path)
return False
_update_log("Validation passed ({} bytes, MZ ok)".format(dl_size))
except Exception as e:
_update_log("Validation error: {} — aborting".format(e))
try:
os.remove(tmp_path)
except Exception:
pass
return False
bat_fd, bat_path = tempfile.mkstemp(suffix=".bat", prefix="tw_swap_")
os.close(bat_fd)
bat_content = (
"@echo off\r\n"
"ping 127.0.0.1 -n 4 > nul\r\n"
"copy /Y \"{exe}\" \"{exe}.old\"\r\n"
"set RETRIES=0\r\n"
":retry\r\n"
"copy /Y \"{new}\" \"{exe}\"\r\n"
"if errorlevel 1 (\r\n"
" set /a RETRIES+=1\r\n"
" if %RETRIES% GEQ 5 goto fail\r\n"
" ping 127.0.0.1 -n 3 > nul\r\n"
" goto retry\r\n"
")\r\n"
"start \"\" \"{exe}\"\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 0\r\n"
":fail\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 1\r\n"
).format(new=tmp_path, exe=exe_path)
with open(bat_path, "w") as f:
f.write(bat_content)
_update_log("Launching swap bat — exiting for update")
subprocess.Popen(
["cmd", "/C", bat_path],
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0,
)
return True
except Exception as e:
_update_log("apply_update failed: {}".format(e))
return False
# ── Config helpers ────────────────────────────────────────────────────────────
def _read_config():
"""Read config.json from the appropriate location and return as dict."""
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or ""
config_dir = os.path.join(_appdata, "ThorWatcher")
else:
config_dir = os.path.dirname(os.path.abspath(__file__)) or "."
config_path = os.path.join(config_dir, "config.json")
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
# ── Paths ─────────────────────────────────────────────────────────────────────
if getattr(sys, "frozen", False):
HERE = os.path.dirname(os.path.abspath(sys.executable))
else:
HERE = os.path.dirname(os.path.abspath(__file__))
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or HERE
CONFIG_DIR = os.path.join(_appdata, "ThorWatcher")
os.makedirs(CONFIG_DIR, exist_ok=True)
else:
CONFIG_DIR = HERE
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.json")
# ── Icon drawing ──────────────────────────────────────────────────────────────
COLORS = {
"ok": (60, 200, 80), # green
"pending": (230, 180, 0), # amber
"missing": (210, 40, 40), # red
"error": (160, 40, 200), # purple
"starting": (120, 120, 120), # grey
}
ICON_SIZE = 64
def make_icon(status):
"""Draw a plain colored circle for the system tray."""
color = COLORS.get(status, COLORS["starting"])
img = Image.new("RGBA", (ICON_SIZE, ICON_SIZE), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 6
draw.ellipse(
[margin, margin, ICON_SIZE - margin, ICON_SIZE - margin],
fill=color,
)
return img
# ── First-run check ───────────────────────────────────────────────────────────
def ensure_config():
"""
If config.json is missing, launch the first-run wizard.
Returns True if config is ready, False if user cancelled.
"""
if os.path.exists(CONFIG_PATH):
return True
from thor_settings_dialog import show_dialog
saved = show_dialog(CONFIG_PATH, wizard=True)
if not saved:
_show_cancel_message()
return False
return True
def _show_cancel_message():
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
messagebox.showwarning(
"Thor Watcher",
"No configuration was saved.\nThe application will now exit.",
)
root.destroy()
except Exception:
pass
# ── Tray app ──────────────────────────────────────────────────────────────────
class WatcherTray:
def __init__(self):
self.state = {}
self.stop_event = threading.Event()
self._watcher_thread = None
self._icon = None
self._menu_lock = threading.Lock()
# ── Watcher thread management ─────────────────────────────────────────────
def _start_watcher(self):
self.stop_event.clear()
self._watcher_thread = threading.Thread(
target=watcher.run_watcher,
args=(self.state, self.stop_event),
daemon=True,
name="watcher",
)
self._watcher_thread.start()
def _stop_watcher(self):
self.stop_event.set()
if self._watcher_thread is not None:
self._watcher_thread.join(timeout=10)
self._watcher_thread = None
def _restart_watcher(self):
self._stop_watcher()
self.stop_event = threading.Event()
self.state["status"] = "starting"
self.state["units"] = []
self.state["last_scan"] = None
self.state["last_error"] = None
self._start_watcher()
# ── Menu callbacks ────────────────────────────────────────────────────────
def _open_settings(self, icon, item):
def _run():
from thor_settings_dialog import show_dialog
saved = show_dialog(CONFIG_PATH, wizard=False)
if saved:
self._restart_watcher()
threading.Thread(target=_run, daemon=True, name="settings-dialog").start()
def _open_logs(self, icon, item):
log_dir = self.state.get("log_dir")
if not log_dir:
log_dir = HERE
if os.path.exists(log_dir):
subprocess.Popen(["explorer", log_dir])
else:
parent = os.path.dirname(log_dir)
if os.path.exists(parent):
subprocess.Popen(["explorer", parent])
else:
subprocess.Popen(["explorer", HERE])
def _exit(self, icon, item):
self.stop_event.set()
icon.stop()
# ── Dynamic menu text ─────────────────────────────────────────────────────
def _status_text(self):
status = self.state.get("status", "starting")
last_err = self.state.get("last_error")
last_scan = self.state.get("last_scan")
api_status = self.state.get("api_status", "disabled")
unit_count = len(self.state.get("units", []))
if status == "error":
return "Error — {}".format(last_err or "unknown")
if status == "starting":
return "Starting..."
if last_scan is not None:
age_secs = int((datetime.now() - last_scan).total_seconds())
age_str = "{}s ago".format(age_secs) if age_secs < 60 else "{}m ago".format(age_secs // 60)
else:
age_str = "never"
if api_status == "ok":
api_str = "API OK"
elif api_status == "fail":
api_str = "API FAIL"
else:
api_str = "API off"
base_line = "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
sfm_status = self.state.get("sfm_status", "disabled")
if sfm_status in ("ok", "fail", "ready"):
counts = self.state.get("last_forward_counts") or {}
fwd = counts.get("forwarded", 0)
errs = counts.get("errors", 0)
last_fwd = self.state.get("last_forward")
if last_fwd is not None:
fwd_age = int((datetime.now() - last_fwd).total_seconds())
fwd_age_str = "{}s ago".format(fwd_age) if fwd_age < 60 else "{}m ago".format(fwd_age // 60)
else:
fwd_age_str = "pending"
sfm_line = "SFM {} | {} fwd, {} err | last {}".format(
sfm_status.upper(), fwd, errs, fwd_age_str,
)
return base_line + "\n" + sfm_line
return base_line
def _tray_status(self):
status = self.state.get("status", "starting")
if status == "error":
return "error"
if status == "starting":
return "starting"
api_status = self.state.get("api_status", "disabled")
if api_status == "fail":
return "missing" # red — API failing
sfm_status = self.state.get("sfm_status", "disabled")
if api_status == "ok" and sfm_status == "fail":
return "pending" # amber — heartbeat OK but forwarder is failing
if api_status == "disabled":
return "pending" # amber — running but not reporting
return "ok" # green — running and API good
def _build_menu(self):
return pystray.Menu(
pystray.MenuItem("Thor Watcher v{}".format(_CURRENT_VERSION), None, enabled=False),
pystray.Menu.SEPARATOR,
pystray.MenuItem(lambda item: self._status_text(), None, enabled=False),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Settings...", self._open_settings),
pystray.MenuItem("Open Log Folder", self._open_logs),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Exit", self._exit),
)
# ── Icon/menu update loop ─────────────────────────────────────────────────
def _icon_updater(self):
"""Periodically refresh the tray icon and check for updates."""
last_status = None
update_check_counter = 0 # check every ~5 min (30 × 10s ticks)
while not self.stop_event.is_set():
icon_status = self._tray_status()
if self._icon is not None:
with self._menu_lock:
self._icon.menu = self._build_menu()
if icon_status != last_status:
self._icon.icon = make_icon(icon_status)
self._icon.title = "Thor Watcher — {}".format(self._status_text())
last_status = icon_status
# Terra-View push-triggered update
if self.state.get("update_available"):
self.state["update_available"] = False
self._do_update()
return
# Periodic update check
update_check_counter += 1
if update_check_counter >= 30:
update_check_counter = 0
tag, url = check_for_update()
if tag and url:
self._do_update(url)
return
self.stop_event.wait(timeout=10)
def _do_update(self, download_url=None):
"""Notify tray then apply update. If url is None, fetch it first."""
if download_url is None:
_, download_url = check_for_update()
if not download_url:
return
if self._icon is not None:
self._icon.title = "Thor Watcher — Updating..."
self._icon.icon = make_icon("starting")
success = apply_update(download_url)
if success:
self.stop_event.set()
if self._icon is not None:
self._icon.stop()
# ── Entry point ───────────────────────────────────────────────────────────
def run(self):
self._start_watcher()
icon_img = make_icon("starting")
self._icon = pystray.Icon(
name="thor_watcher",
icon=icon_img,
title="Thor Watcher — Starting...",
menu=self._build_menu(),
)
updater = threading.Thread(
target=self._icon_updater, daemon=True, name="icon-updater"
)
updater.start()
self._icon.run()
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
if not ensure_config():
sys.exit(0)
app = WatcherTray()
app.run()
if __name__ == "__main__":
main()