Files
series3-watcher/series3_tray.py
serversdwn 439feb9942 Feat: Update settings tab implemented.
Auto-updates now configurable (URL, source (gitea or private server), log activity for auto updates.
fix: Update now hardened to prevent installation of corrupt or incorrect .exe files. (security to be hardened in the future)
2026-03-17 21:08:37 -04:00

543 lines
18 KiB
Python

"""
Series 3 Watcher — System Tray Launcher v1.4.3
Requires: pystray, Pillow, tkinter (stdlib)
Run with: pythonw series3_tray.py (no console window)
or: python series3_tray.py (with console, for debugging)
Put a shortcut to this in shell:startup for auto-start on login.
Python 3.8 compatible — no walrus operators, no f-string = specifier,
no match statements, no 3.9+ syntax.
"""
import os
import sys
import subprocess
import tempfile
import threading
import configparser
import urllib.request
import urllib.error
from datetime import datetime
import pystray
from PIL import Image, ImageDraw
import series3_watcher as watcher
# --------------- Auto-updater ---------------
GITEA_BASE = "https://gitea.serversdown.net"
GITEA_USER = "serversdown"
GITEA_REPO = "series3-watcher"
GITEA_API_URL = "{}/api/v1/repos/{}/{}/releases?limit=1&page=1".format(
GITEA_BASE, GITEA_USER, GITEA_REPO
)
# Populated from watcher version string at startup
_CURRENT_VERSION = getattr(watcher, "VERSION", "0.0.0")
def _version_tuple(v):
"""Convert '1.4.0' -> (1, 4, 0) for comparison. Non-numeric parts -> 0."""
parts = []
for p in str(v).lstrip("v").split(".")[:3]:
try:
parts.append(int(p))
except ValueError:
parts.append(0)
while len(parts) < 3:
parts.append(0)
return tuple(parts)
def _update_log(msg):
"""Append a timestamped line to the watcher log for update events."""
try:
log_path = os.path.join(
os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or "",
"Series3Watcher", "agent_logs", "series3_watcher.log"
)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a") as f:
f.write("[{}] [updater] {}\n".format(
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), msg
))
except Exception:
pass
def _check_for_update_gitea():
"""Query Gitea API for latest release. Returns (tag, download_url) or (None, None)."""
import json as _json
try:
req = urllib.request.Request(
GITEA_API_URL,
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=8) as resp:
releases = _json.loads(resp.read().decode("utf-8"))
if not releases:
return None, None
latest = releases[0]
tag = latest.get("tag_name", "")
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
assets = latest.get("assets", [])
for asset in assets:
name = asset.get("name", "").lower()
if name.endswith(".exe") and "setup" not in name:
return tag, asset.get("browser_download_url")
_update_log("Newer release {} found but no valid .exe asset".format(tag))
return tag, None
except Exception as e:
_update_log("check_for_update (gitea) failed: {}".format(e))
return None, None
def _check_for_update_url(base_url):
"""Query a custom URL server for latest version. Returns (tag, download_url) or (None, None)."""
if not base_url:
_update_log("UPDATE_SOURCE=url but UPDATE_URL is empty — skipping")
return None, None
try:
ver_url = base_url.rstrip("/") + "/api/updates/series3-watcher/version.txt"
req = urllib.request.Request(
ver_url,
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=8) as resp:
tag = resp.read().decode("utf-8").strip()
if not tag:
return None, None
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
return None, None
exe_url = base_url.rstrip("/") + "/api/updates/series3-watcher/series3-watcher.exe"
return tag, exe_url
except Exception as e:
_update_log("check_for_update (url mode) failed: {}".format(e))
return None, None
def check_for_update():
"""
Check for an update using the configured source (gitea, url, or disabled).
Reads UPDATE_SOURCE and UPDATE_URL from config.ini at check time.
Returns (tag, download_url) if an update is available, else (None, None).
Returns (None, None) immediately if UPDATE_SOURCE = disabled.
"""
try:
cp = configparser.ConfigParser(inline_comment_prefixes=(";", "#"))
cp.optionxform = str
cp.read(CONFIG_PATH, encoding="utf-8")
section = cp["agent"] if cp.has_section("agent") else {}
update_source = section.get("UPDATE_SOURCE", "gitea").strip().lower()
update_url = section.get("UPDATE_URL", "").strip()
except Exception:
update_source = "gitea"
update_url = ""
if update_source == "disabled":
return None, None
_update_log("Checking for update (source={}, version={})".format(
update_source, _CURRENT_VERSION
))
if update_source == "url":
return _check_for_update_url(update_url)
else:
return _check_for_update_gitea()
def apply_update(download_url):
"""
Download new .exe to a temp file, validate it, write a swap .bat, launch it, exit.
The bat backs up the old exe, retries the copy up to 5 times if locked, then relaunches.
The .exe.old backup is left in place as a rollback copy.
"""
exe_path = os.path.abspath(sys.executable if getattr(sys, "frozen", False) else sys.argv[0])
try:
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".exe", prefix="s3w_update_")
os.close(tmp_fd)
_update_log("Downloading update from: {}".format(download_url))
req = urllib.request.Request(
download_url,
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
)
with urllib.request.urlopen(req, timeout=60) as resp:
with open(tmp_path, "wb") as f:
f.write(resp.read())
# Three-layer validation before touching the live exe
try:
dl_size = os.path.getsize(tmp_path)
current_size = os.path.getsize(exe_path)
_update_log("Download complete ({} bytes), validating...".format(dl_size))
if dl_size < 100 * 1024:
_update_log("Validation failed: too small ({} bytes) — aborting".format(dl_size))
os.remove(tmp_path)
return False
if current_size > 0 and dl_size < current_size * 0.5:
_update_log("Validation failed: suspiciously small ({} bytes vs current {} bytes) — aborting".format(
dl_size, current_size
))
os.remove(tmp_path)
return False
with open(tmp_path, "rb") as _f:
magic = _f.read(2)
if magic != b"MZ":
_update_log("Validation failed: not a valid Windows exe (bad magic bytes) — aborting")
os.remove(tmp_path)
return False
_update_log("Validation passed ({} bytes, MZ ok)".format(dl_size))
except Exception as e:
_update_log("Validation error: {} — aborting".format(e))
try:
os.remove(tmp_path)
except Exception:
pass
return False
bat_fd, bat_path = tempfile.mkstemp(suffix=".bat", prefix="s3w_swap_")
os.close(bat_fd)
bat_content = (
"@echo off\r\n"
"ping 127.0.0.1 -n 4 > nul\r\n"
"copy /Y \"{exe}\" \"{exe}.old\"\r\n"
"set RETRIES=0\r\n"
":retry\r\n"
"copy /Y \"{new}\" \"{exe}\"\r\n"
"if errorlevel 1 (\r\n"
" set /a RETRIES+=1\r\n"
" if %RETRIES% GEQ 5 goto fail\r\n"
" ping 127.0.0.1 -n 3 > nul\r\n"
" goto retry\r\n"
")\r\n"
"start \"\" \"{exe}\"\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 0\r\n"
":fail\r\n"
"del \"{new}\"\r\n"
"del \"%~f0\"\r\n"
"exit /b 1\r\n"
).format(new=tmp_path, exe=exe_path)
with open(bat_path, "w") as f:
f.write(bat_content)
_update_log("Launching swap bat — exiting for update")
subprocess.Popen(
["cmd", "/C", bat_path],
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0,
)
return True
except Exception as e:
_update_log("apply_update failed: {}".format(e))
return False
# --------------- Paths ---------------
# Executable location — used for bundled assets (icon.ico etc.)
if getattr(sys, "frozen", False):
HERE = os.path.dirname(os.path.abspath(sys.executable))
else:
HERE = os.path.dirname(os.path.abspath(__file__))
# config.ini lives in AppData so normal users can write it without UAC issues.
# Fall back to the exe directory when running from source (dev mode).
if getattr(sys, "frozen", False):
_appdata = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA") or HERE
CONFIG_DIR = os.path.join(_appdata, "Series3Watcher")
os.makedirs(CONFIG_DIR, exist_ok=True)
else:
CONFIG_DIR = HERE
CONFIG_PATH = os.path.join(CONFIG_DIR, "config.ini")
# --------------- Icon drawing ---------------
COLORS = {
"ok": (60, 200, 80), # green
"pending": (230, 180, 0), # amber
"missing": (210, 40, 40), # red
"error": (160, 40, 200), # purple
"starting": (120, 120, 120), # grey
}
ICON_SIZE = 64
def make_icon(status):
"""Draw a plain colored circle for the system tray — clean and readable at 16px."""
color = COLORS.get(status, COLORS["starting"])
img = Image.new("RGBA", (ICON_SIZE, ICON_SIZE), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 6
draw.ellipse(
[margin, margin, ICON_SIZE - margin, ICON_SIZE - margin],
fill=color,
)
return img
# --------------- First-run check ---------------
def ensure_config():
"""
If config.ini is missing, launch the first-run wizard.
Returns True if config is ready, False if user cancelled.
"""
if os.path.exists(CONFIG_PATH):
return True
# Import here to avoid pulling in tkinter unless needed
from settings_dialog import show_dialog
saved = show_dialog(CONFIG_PATH, wizard=True)
if not saved:
_show_cancel_message()
return False
return True
def _show_cancel_message():
"""Show a plain messagebox telling the user the app cannot start."""
try:
import tkinter as tk
from tkinter import messagebox
root = tk.Tk()
root.withdraw()
messagebox.showwarning(
"Series 3 Watcher",
"No configuration was saved.\nThe application will now exit.",
)
root.destroy()
except Exception:
pass
# --------------- Tray app ---------------
class WatcherTray:
def __init__(self):
self.state = {}
self.stop_event = threading.Event()
self._watcher_thread = None
self._icon = None
# Lock guards _rebuild_menu calls from the updater thread
self._menu_lock = threading.Lock()
# --- Watcher thread management ---
def _start_watcher(self):
self.stop_event.clear()
self._watcher_thread = threading.Thread(
target=watcher.run_watcher,
args=(self.state, self.stop_event),
daemon=True,
name="watcher",
)
self._watcher_thread.start()
def _stop_watcher(self):
self.stop_event.set()
if self._watcher_thread is not None:
self._watcher_thread.join(timeout=10)
self._watcher_thread = None
def _restart_watcher(self):
"""Stop any running watcher and start a fresh one."""
self._stop_watcher()
self.stop_event = threading.Event()
self.state["status"] = "starting"
self.state["units"] = []
self.state["last_scan"] = None
self.state["last_error"] = None
self._start_watcher()
# --- Menu item callbacks ---
def _open_settings(self, icon, item):
"""Open the settings dialog in its own thread so the tray stays responsive."""
def _run():
from settings_dialog import show_dialog
saved = show_dialog(CONFIG_PATH, wizard=False)
if saved:
self._restart_watcher()
threading.Thread(target=_run, daemon=True, name="settings-dialog").start()
def _open_logs(self, icon, item):
log_dir = self.state.get("log_dir")
if not log_dir:
log_dir = HERE
if os.path.exists(log_dir):
subprocess.Popen(["explorer", log_dir])
else:
parent = os.path.dirname(log_dir)
if os.path.exists(parent):
subprocess.Popen(["explorer", parent])
else:
subprocess.Popen(["explorer", HERE])
def _exit(self, icon, item):
self.stop_event.set()
icon.stop()
# --- Dynamic menu text helpers ---
def _status_text(self):
status = self.state.get("status", "starting")
last_err = self.state.get("last_error")
last_scan = self.state.get("last_scan")
api_status = self.state.get("api_status", "disabled")
unit_count = len(self.state.get("units", []))
if status == "error":
return "Error — {}".format(last_err or "unknown")
if status == "starting":
return "Starting..."
# Scan age
if last_scan is not None:
age_secs = int((datetime.now() - last_scan).total_seconds())
age_str = "{}s ago".format(age_secs) if age_secs < 60 else "{}m ago".format(age_secs // 60)
else:
age_str = "never"
# API status label
if api_status == "ok":
api_str = "API OK"
elif api_status == "fail":
api_str = "API FAIL"
else:
api_str = "API off"
return "Running — {} | {} unit(s) | scan {}".format(api_str, unit_count, age_str)
def _tray_status(self):
"""Return the icon status key based on watcher + API health."""
status = self.state.get("status", "starting")
if status == "error":
return "error"
if status == "starting":
return "starting"
api_status = self.state.get("api_status", "disabled")
if api_status == "fail":
return "missing" # red — API failing
if api_status == "disabled":
return "pending" # amber — running but not reporting
return "ok" # green — running and API good
def _build_menu(self):
return pystray.Menu(
pystray.MenuItem(lambda item: self._status_text(), None, enabled=False),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Settings...", self._open_settings),
pystray.MenuItem("Open Log Folder", self._open_logs),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Exit", self._exit),
)
# --- Icon / menu update loop ---
def _icon_updater(self):
"""Periodically refresh the tray icon color and menu to match watcher state."""
last_status = None
update_check_counter = 0 # check for updates every ~5 min (30 * 10s ticks)
while not self.stop_event.is_set():
icon_status = self._tray_status()
if self._icon is not None:
# Always rebuild menu every cycle so status text stays fresh
with self._menu_lock:
self._icon.menu = self._build_menu()
if icon_status != last_status:
self._icon.icon = make_icon(icon_status)
self._icon.title = "Series 3 Watcher — {}".format(self._status_text())
last_status = icon_status
# Check if terra-view signalled an update via heartbeat response
if self.state.get("update_available"):
self.state["update_available"] = False
self._do_update()
return # exit loop; swap bat will relaunch
# Periodic update check (every ~5 min)
update_check_counter += 1
if update_check_counter >= 30:
update_check_counter = 0
tag, url = check_for_update()
if tag and url:
self._do_update(url)
return # exit loop; swap bat will relaunch
self.stop_event.wait(timeout=10)
def _do_update(self, download_url=None):
"""Notify tray icon then apply update. If url is None, fetch it first."""
if download_url is None:
_, download_url = check_for_update()
if not download_url:
return
if self._icon is not None:
self._icon.title = "Series 3 Watcher — Updating..."
self._icon.icon = make_icon("starting")
success = apply_update(download_url)
if success:
self.stop_event.set()
if self._icon is not None:
self._icon.stop()
# If update failed, keep running silently — error is in the log
# --- Entry point ---
def run(self):
self._start_watcher()
icon_img = make_icon("starting")
self._icon = pystray.Icon(
name="series3_watcher",
icon=icon_img,
title="Series 3 Watcher — Starting...",
menu=self._build_menu(),
)
updater = threading.Thread(
target=self._icon_updater, daemon=True, name="icon-updater"
)
updater.start()
self._icon.run()
# --------------- Entry point ---------------
def main():
if not ensure_config():
sys.exit(0)
app = WatcherTray()
app.run()
if __name__ == "__main__":
main()