feat: SFM event forwarding added. v0.3.0
This commit is contained in:
+103
-14
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
Thor Watcher — Series 4 Ingest Agent v0.2.0
|
||||
Thor Watcher — Series 4 Ingest Agent v0.3.0
|
||||
|
||||
Micromate (Series 4) ingest agent for Terra-View.
|
||||
|
||||
@@ -7,6 +7,8 @@ Behavior:
|
||||
- Scans C:\THORDATA\<Project>\<UM####>\*.MLG
|
||||
- For each UM####, finds the newest .MLG by timestamp in the filename
|
||||
- Posts JSON heartbeat payload to Terra-View backend
|
||||
- Forwards .IDFH/.IDFW event files (+ TXT sidecars) to a seismo-relay
|
||||
SFM server when sfm_forward_enabled=true. See event_forwarder.py.
|
||||
- Tray-friendly: run_watcher(state, stop_event) for background thread use
|
||||
"""
|
||||
|
||||
@@ -22,10 +24,12 @@ from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from socket import gethostname
|
||||
|
||||
import event_forwarder
|
||||
|
||||
|
||||
# ── Version ───────────────────────────────────────────────────────────────────
|
||||
|
||||
VERSION = "0.2.0"
|
||||
VERSION = "0.3.0"
|
||||
|
||||
|
||||
# ── Config ────────────────────────────────────────────────────────────────────
|
||||
@@ -54,6 +58,17 @@ def load_config(config_path: str) -> Dict[str, Any]:
|
||||
"update_source": "gitea",
|
||||
"update_url": "",
|
||||
"debug": False,
|
||||
|
||||
# SFM event forwarding — default OFF, opt-in via Settings.
|
||||
"sfm_forward_enabled": False,
|
||||
"sfm_url": "", # e.g. "http://10.0.0.44:8200"
|
||||
"sfm_forward_interval": 60, # seconds between forward passes
|
||||
"sfm_quiescence_seconds": 5,
|
||||
"sfm_missing_report_grace_seconds": 60,
|
||||
"sfm_http_timeout": 60,
|
||||
"sfm_state_file": "", # blank → <log_dir>/thor_forwarded.json
|
||||
"sfm_max_forwards_per_pass": 500,
|
||||
"sfm_max_event_age_days": 365,
|
||||
}
|
||||
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
@@ -256,14 +271,17 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
||||
Main watcher loop. Runs in a background thread when launched from the tray.
|
||||
|
||||
state keys written each cycle:
|
||||
state["status"] — "running" | "error" | "starting"
|
||||
state["api_status"] — "ok" | "fail" | "disabled"
|
||||
state["units"] — list of unit dicts for tray display
|
||||
state["last_scan"] — datetime of last successful scan
|
||||
state["last_error"] — last error string or None
|
||||
state["log_dir"] — directory containing the log file
|
||||
state["cfg"] — loaded config dict
|
||||
state["update_available"] — set True when API response signals an update
|
||||
state["status"] — "running" | "error" | "starting"
|
||||
state["api_status"] — "ok" | "fail" | "disabled"
|
||||
state["units"] — list of unit dicts for tray display
|
||||
state["last_scan"] — datetime of last successful scan
|
||||
state["last_error"] — last error string or None
|
||||
state["log_dir"] — directory containing the log file
|
||||
state["cfg"] — loaded config dict
|
||||
state["update_available"] — set True when API response signals an update
|
||||
state["sfm_status"] — "ok" | "fail" | "disabled" | "ready"
|
||||
state["last_forward"] — datetime of last forwarder pass (or None)
|
||||
state["last_forward_counts"] — dict from event_forwarder.forward_pending
|
||||
"""
|
||||
# Resolve config path
|
||||
if getattr(sys, "frozen", False):
|
||||
@@ -299,16 +317,56 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
||||
API_INTERVAL = int(cfg["api_interval"])
|
||||
ENABLE_LOGGING = bool(cfg["enable_logging"])
|
||||
|
||||
# SFM forwarder config
|
||||
SFM_FORWARD_ENABLED = bool(cfg.get("sfm_forward_enabled", False))
|
||||
SFM_URL = str(cfg.get("sfm_url", "")).strip()
|
||||
SFM_FORWARD_INTERVAL = int(cfg.get("sfm_forward_interval", 60))
|
||||
SFM_QUIESCENCE = int(cfg.get("sfm_quiescence_seconds", 5))
|
||||
SFM_GRACE = int(cfg.get("sfm_missing_report_grace_seconds", 60))
|
||||
SFM_HTTP_TIMEOUT = int(cfg.get("sfm_http_timeout", 60))
|
||||
SFM_MAX_PER_PASS = int(cfg.get("sfm_max_forwards_per_pass", 500))
|
||||
SFM_MAX_AGE_DAYS = int(cfg.get("sfm_max_event_age_days", 365))
|
||||
sfm_state_path = str(cfg.get("sfm_state_file", "")).strip() or \
|
||||
os.path.join(state["log_dir"], "thor_forwarded.json")
|
||||
|
||||
log_message(log_file, ENABLE_LOGGING,
|
||||
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={}".format(
|
||||
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL)
|
||||
"[cfg] THORDATA_PATH={} SCAN_INTERVAL={}s API_INTERVAL={}s API={} SFM={}".format(
|
||||
THORDATA_PATH, SCAN_INTERVAL, API_INTERVAL, bool(API_URL),
|
||||
bool(SFM_FORWARD_ENABLED and SFM_URL),
|
||||
)
|
||||
)
|
||||
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={}".format(
|
||||
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL)
|
||||
print("[CFG] THORDATA_PATH={} SCAN_INTERVAL={}s API={} SFM={}".format(
|
||||
THORDATA_PATH, SCAN_INTERVAL, bool(API_URL),
|
||||
bool(SFM_FORWARD_ENABLED and SFM_URL),
|
||||
))
|
||||
|
||||
# Initialize SFM forwarder state (if enabled)
|
||||
sfm_state_obj: Optional[event_forwarder.ForwardState] = None
|
||||
if SFM_FORWARD_ENABLED and SFM_URL:
|
||||
try:
|
||||
sfm_state_obj = event_forwarder.ForwardState(sfm_state_path)
|
||||
state["sfm_status"] = "ready"
|
||||
log_message(log_file, ENABLE_LOGGING,
|
||||
"[sfm] forwarder ready url={} state_file={} known={}".format(
|
||||
SFM_URL, sfm_state_path, sfm_state_obj.count(),
|
||||
)
|
||||
)
|
||||
print("[SFM] forwarder ready url={} known={}".format(
|
||||
SFM_URL, sfm_state_obj.count(),
|
||||
))
|
||||
except Exception as exc:
|
||||
state["sfm_status"] = "fail"
|
||||
state["last_error"] = "SFM init failed: {}".format(exc)
|
||||
log_message(log_file, ENABLE_LOGGING,
|
||||
"[sfm] init failed: {}".format(exc))
|
||||
else:
|
||||
state["sfm_status"] = "disabled"
|
||||
|
||||
state["last_forward"] = None
|
||||
state["last_forward_counts"] = None
|
||||
|
||||
last_api_ts = 0.0
|
||||
last_forward_ts = 0.0
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
@@ -370,6 +428,37 @@ def run_watcher(state: Dict[str, Any], stop_event: threading.Event) -> None:
|
||||
else:
|
||||
state["api_status"] = "disabled"
|
||||
|
||||
# ── SFM event forwarding ───────────────────────────────────────────
|
||||
if sfm_state_obj is not None:
|
||||
now_ts = time.time()
|
||||
if now_ts - last_forward_ts >= SFM_FORWARD_INTERVAL:
|
||||
last_forward_ts = now_ts
|
||||
try:
|
||||
counts = event_forwarder.forward_pending(
|
||||
THORDATA_PATH, SFM_URL, sfm_state_obj,
|
||||
max_age_days=SFM_MAX_AGE_DAYS,
|
||||
quiescence_seconds=SFM_QUIESCENCE,
|
||||
missing_report_grace_seconds=SFM_GRACE,
|
||||
timeout=SFM_HTTP_TIMEOUT,
|
||||
max_per_pass=SFM_MAX_PER_PASS,
|
||||
logger=lambda m: log_message(log_file, ENABLE_LOGGING, m),
|
||||
)
|
||||
state["last_forward"] = datetime.now()
|
||||
state["last_forward_counts"] = counts
|
||||
if counts["errors"] > 0:
|
||||
state["sfm_status"] = "fail"
|
||||
else:
|
||||
state["sfm_status"] = "ok"
|
||||
summary = ("[sfm] pass scanned={scanned} forwarded={forwarded} "
|
||||
"errors={errors} with_report={with_report}").format(**counts)
|
||||
print(summary)
|
||||
log_message(log_file, ENABLE_LOGGING, summary)
|
||||
except Exception as exc:
|
||||
state["sfm_status"] = "fail"
|
||||
msg = "[sfm] pass failed: {}".format(exc)
|
||||
print(msg)
|
||||
log_message(log_file, ENABLE_LOGGING, msg)
|
||||
|
||||
except Exception as e:
|
||||
err = "[loop-error] {}".format(e)
|
||||
print(err)
|
||||
|
||||
Reference in New Issue
Block a user