feat: windows installer with remote updates and remote management added.
This commit is contained in:
407
series3_tray.py
Normal file
407
series3_tray.py
Normal file
@@ -0,0 +1,407 @@
|
||||
"""
|
||||
Series 3 Watcher — System Tray Launcher v1.4.0
|
||||
Requires: pystray, Pillow, tkinter (stdlib)
|
||||
|
||||
Run with: pythonw series3_tray.py (no console window)
|
||||
or: python series3_tray.py (with console, for debugging)
|
||||
|
||||
Put a shortcut to this in shell:startup for auto-start on login.
|
||||
|
||||
Python 3.8 compatible — no walrus operators, no f-string = specifier,
|
||||
no match statements, no 3.9+ syntax.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime
|
||||
|
||||
import pystray
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
import series3_watcher as watcher
|
||||
|
||||
|
||||
# --------------- Auto-updater ---------------
|
||||
|
||||
GITEA_BASE = "https://gitea.serversdown.net"
|
||||
GITEA_USER = "serversdown"
|
||||
GITEA_REPO = "series3-watcher"
|
||||
GITEA_API_URL = "{}/api/v1/repos/{}/{}/releases?limit=1&page=1".format(
|
||||
GITEA_BASE, GITEA_USER, GITEA_REPO
|
||||
)
|
||||
|
||||
# Populated from watcher version string at startup
|
||||
_CURRENT_VERSION = getattr(watcher, "VERSION", "0.0.0")
|
||||
|
||||
|
||||
def _version_tuple(v):
|
||||
"""Convert '1.4.0' -> (1, 4, 0) for comparison. Non-numeric parts -> 0."""
|
||||
parts = []
|
||||
for p in str(v).lstrip("v").split(".")[:3]:
|
||||
try:
|
||||
parts.append(int(p))
|
||||
except ValueError:
|
||||
parts.append(0)
|
||||
while len(parts) < 3:
|
||||
parts.append(0)
|
||||
return tuple(parts)
|
||||
|
||||
|
||||
def check_for_update():
|
||||
"""
|
||||
Query Gitea for the latest release.
|
||||
Returns (tag, download_url) if an update is available, else (None, None).
|
||||
"""
|
||||
import json as _json
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
GITEA_API_URL,
|
||||
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
releases = _json.loads(resp.read().decode("utf-8"))
|
||||
if not releases:
|
||||
return None, None
|
||||
latest = releases[0]
|
||||
tag = latest.get("tag_name", "")
|
||||
if _version_tuple(tag) <= _version_tuple(_CURRENT_VERSION):
|
||||
return None, None
|
||||
# Find the .exe asset
|
||||
assets = latest.get("assets", [])
|
||||
for asset in assets:
|
||||
name = asset.get("name", "")
|
||||
if name.lower().endswith(".exe"):
|
||||
return tag, asset.get("browser_download_url")
|
||||
return tag, None
|
||||
except Exception:
|
||||
return None, None
|
||||
|
||||
|
||||
def apply_update(download_url):
|
||||
"""
|
||||
Download new .exe to a temp file, write a swap .bat, launch it, exit.
|
||||
The bat waits for us to exit, then swaps the files and relaunches.
|
||||
"""
|
||||
exe_path = os.path.abspath(sys.executable if getattr(sys, "frozen", False) else sys.argv[0])
|
||||
|
||||
try:
|
||||
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".exe", prefix="s3w_update_")
|
||||
os.close(tmp_fd)
|
||||
|
||||
req = urllib.request.Request(
|
||||
download_url,
|
||||
headers={"User-Agent": "series3-watcher/{}".format(_CURRENT_VERSION)},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
with open(tmp_path, "wb") as f:
|
||||
f.write(resp.read())
|
||||
|
||||
bat_fd, bat_path = tempfile.mkstemp(suffix=".bat", prefix="s3w_swap_")
|
||||
os.close(bat_fd)
|
||||
|
||||
bat_content = (
|
||||
"@echo off\r\n"
|
||||
"ping 127.0.0.1 -n 4 > nul\r\n"
|
||||
"copy /Y \"{new}\" \"{exe}\"\r\n"
|
||||
"start \"\" \"{exe}\"\r\n"
|
||||
"del \"%~f0\"\r\n"
|
||||
).format(new=tmp_path, exe=exe_path)
|
||||
|
||||
with open(bat_path, "w") as f:
|
||||
f.write(bat_content)
|
||||
|
||||
subprocess.Popen(
|
||||
["cmd", "/C", bat_path],
|
||||
creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# --------------- Paths ---------------
|
||||
|
||||
HERE = os.path.dirname(os.path.abspath(__file__))
|
||||
CONFIG_PATH = os.path.join(HERE, "config.ini")
|
||||
|
||||
|
||||
# --------------- Icon drawing ---------------
|
||||
|
||||
COLORS = {
|
||||
"ok": (60, 200, 80), # green
|
||||
"pending": (230, 180, 0), # amber
|
||||
"missing": (210, 40, 40), # red
|
||||
"error": (160, 40, 200), # purple
|
||||
"starting": (120, 120, 120), # grey
|
||||
}
|
||||
|
||||
ICON_SIZE = 64
|
||||
|
||||
|
||||
def make_icon(status):
|
||||
"""Draw a solid filled circle on a transparent background."""
|
||||
color = COLORS.get(status, COLORS["starting"])
|
||||
img = Image.new("RGBA", (ICON_SIZE, ICON_SIZE), (0, 0, 0, 0))
|
||||
draw = ImageDraw.Draw(img)
|
||||
margin = 6
|
||||
draw.ellipse(
|
||||
[margin, margin, ICON_SIZE - margin, ICON_SIZE - margin],
|
||||
fill=color,
|
||||
)
|
||||
return img
|
||||
|
||||
|
||||
# --------------- First-run check ---------------
|
||||
|
||||
def ensure_config():
|
||||
"""
|
||||
If config.ini is missing, launch the first-run wizard.
|
||||
Returns True if config is ready, False if user cancelled.
|
||||
"""
|
||||
if os.path.exists(CONFIG_PATH):
|
||||
return True
|
||||
|
||||
# Import here to avoid pulling in tkinter unless needed
|
||||
from settings_dialog import show_dialog
|
||||
saved = show_dialog(CONFIG_PATH, wizard=True)
|
||||
if not saved:
|
||||
_show_cancel_message()
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _show_cancel_message():
|
||||
"""Show a plain messagebox telling the user the app cannot start."""
|
||||
try:
|
||||
import tkinter as tk
|
||||
from tkinter import messagebox
|
||||
root = tk.Tk()
|
||||
root.withdraw()
|
||||
messagebox.showwarning(
|
||||
"Series 3 Watcher",
|
||||
"No configuration was saved.\nThe application will now exit.",
|
||||
)
|
||||
root.destroy()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# --------------- Tray app ---------------
|
||||
|
||||
class WatcherTray:
|
||||
def __init__(self):
|
||||
self.state = {}
|
||||
self.stop_event = threading.Event()
|
||||
self._watcher_thread = None
|
||||
self._icon = None
|
||||
# Lock guards _rebuild_menu calls from the updater thread
|
||||
self._menu_lock = threading.Lock()
|
||||
|
||||
# --- Watcher thread management ---
|
||||
|
||||
def _start_watcher(self):
|
||||
self.stop_event.clear()
|
||||
self._watcher_thread = threading.Thread(
|
||||
target=watcher.run_watcher,
|
||||
args=(self.state, self.stop_event),
|
||||
daemon=True,
|
||||
name="watcher",
|
||||
)
|
||||
self._watcher_thread.start()
|
||||
|
||||
def _stop_watcher(self):
|
||||
self.stop_event.set()
|
||||
if self._watcher_thread is not None:
|
||||
self._watcher_thread.join(timeout=10)
|
||||
self._watcher_thread = None
|
||||
|
||||
def _restart_watcher(self):
|
||||
"""Stop any running watcher and start a fresh one."""
|
||||
self._stop_watcher()
|
||||
self.stop_event = threading.Event()
|
||||
self.state["status"] = "starting"
|
||||
self.state["units"] = []
|
||||
self.state["last_scan"] = None
|
||||
self.state["last_error"] = None
|
||||
self._start_watcher()
|
||||
|
||||
# --- Menu item callbacks ---
|
||||
|
||||
def _open_settings(self, icon, item):
|
||||
"""Open the settings dialog. On save, restart watcher thread."""
|
||||
from settings_dialog import show_dialog
|
||||
saved = show_dialog(CONFIG_PATH, wizard=False)
|
||||
if saved:
|
||||
self._restart_watcher()
|
||||
# Rebuild menu so status label refreshes
|
||||
if self._icon is not None:
|
||||
with self._menu_lock:
|
||||
self._icon.menu = self._build_menu()
|
||||
|
||||
def _open_logs(self, icon, item):
|
||||
log_dir = self.state.get("log_dir")
|
||||
if not log_dir:
|
||||
log_dir = HERE
|
||||
if os.path.exists(log_dir):
|
||||
subprocess.Popen(["explorer", log_dir])
|
||||
else:
|
||||
parent = os.path.dirname(log_dir)
|
||||
if os.path.exists(parent):
|
||||
subprocess.Popen(["explorer", parent])
|
||||
else:
|
||||
subprocess.Popen(["explorer", HERE])
|
||||
|
||||
def _exit(self, icon, item):
|
||||
self.stop_event.set()
|
||||
icon.stop()
|
||||
|
||||
# --- Dynamic menu text helpers ---
|
||||
|
||||
def _status_text(self):
|
||||
status = self.state.get("status", "starting")
|
||||
last_err = self.state.get("last_error")
|
||||
last_scan = self.state.get("last_scan")
|
||||
|
||||
if status == "error":
|
||||
return "Status: Error — {}".format(last_err or "unknown")
|
||||
if status == "starting":
|
||||
return "Status: Starting..."
|
||||
if last_scan is not None:
|
||||
age_secs = int((datetime.now() - last_scan).total_seconds())
|
||||
if age_secs < 60:
|
||||
age_str = "{}s ago".format(age_secs)
|
||||
else:
|
||||
age_str = "{}m ago".format(age_secs // 60)
|
||||
unit_count = len(self.state.get("units", []))
|
||||
return "Status: {} | {} unit(s) | scan {}".format(
|
||||
status.upper(), unit_count, age_str
|
||||
)
|
||||
return "Status: {}".format(status.upper())
|
||||
|
||||
def _build_units_submenu(self):
|
||||
units = self.state.get("units", [])
|
||||
if not units:
|
||||
items = [pystray.MenuItem("No units detected", None, enabled=False)]
|
||||
else:
|
||||
items = []
|
||||
for u in units:
|
||||
label = "{uid} — {status} ({age:.1f}h ago)".format(
|
||||
uid=u["uid"],
|
||||
status=u["status"],
|
||||
age=u["age_hours"],
|
||||
)
|
||||
items.append(pystray.MenuItem(label, None, enabled=False))
|
||||
return pystray.Menu(*items)
|
||||
|
||||
def _build_menu(self):
|
||||
# Capture current text/submenu at build time; pystray will call
|
||||
# callables each render, but static strings are fine for infrequent
|
||||
# menu rebuilds. We use callables for the dynamic items so that the
|
||||
# text shown on hover/open is current.
|
||||
status_text = self._status_text()
|
||||
units_submenu = self._build_units_submenu()
|
||||
|
||||
return pystray.Menu(
|
||||
pystray.MenuItem(status_text, None, enabled=False),
|
||||
pystray.Menu.SEPARATOR,
|
||||
pystray.MenuItem("Units", units_submenu),
|
||||
pystray.Menu.SEPARATOR,
|
||||
pystray.MenuItem("Settings...", self._open_settings),
|
||||
pystray.MenuItem("Open Log Folder", self._open_logs),
|
||||
pystray.Menu.SEPARATOR,
|
||||
pystray.MenuItem("Exit", self._exit),
|
||||
)
|
||||
|
||||
# --- Icon / menu update loop ---
|
||||
|
||||
def _icon_updater(self):
|
||||
"""Periodically refresh the tray icon color and menu to match watcher state."""
|
||||
last_status = None
|
||||
update_check_counter = 0 # check for updates every ~5 min (30 * 10s ticks)
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
status = self.state.get("status", "starting")
|
||||
|
||||
if self._icon is not None:
|
||||
# Always rebuild menu every cycle so unit list and scan age stay fresh
|
||||
with self._menu_lock:
|
||||
self._icon.menu = self._build_menu()
|
||||
|
||||
if status != last_status:
|
||||
self._icon.icon = make_icon(status)
|
||||
self._icon.title = "Series 3 Watcher — {}".format(status.upper())
|
||||
last_status = status
|
||||
|
||||
# Check if terra-view signalled an update via heartbeat response
|
||||
if self.state.get("update_available"):
|
||||
self.state["update_available"] = False
|
||||
self._do_update()
|
||||
return # exit loop; swap bat will relaunch
|
||||
|
||||
# Periodic Gitea update check (every ~5 min)
|
||||
update_check_counter += 1
|
||||
if update_check_counter >= 30:
|
||||
update_check_counter = 0
|
||||
tag, url = check_for_update()
|
||||
if tag and url:
|
||||
self._do_update(url)
|
||||
return # exit loop; swap bat will relaunch
|
||||
|
||||
self.stop_event.wait(timeout=10)
|
||||
|
||||
def _do_update(self, download_url=None):
|
||||
"""Notify tray icon then apply update. If url is None, fetch it first."""
|
||||
if download_url is None:
|
||||
_, download_url = check_for_update()
|
||||
if not download_url:
|
||||
return
|
||||
|
||||
if self._icon is not None:
|
||||
self._icon.title = "Series 3 Watcher — Updating..."
|
||||
self._icon.icon = make_icon("starting")
|
||||
|
||||
success = apply_update(download_url)
|
||||
if success:
|
||||
self.stop_event.set()
|
||||
if self._icon is not None:
|
||||
self._icon.stop()
|
||||
# If update failed, just keep running silently
|
||||
|
||||
# --- Entry point ---
|
||||
|
||||
def run(self):
|
||||
self._start_watcher()
|
||||
|
||||
icon_img = make_icon("starting")
|
||||
self._icon = pystray.Icon(
|
||||
name="series3_watcher",
|
||||
icon=icon_img,
|
||||
title="Series 3 Watcher — Starting...",
|
||||
menu=self._build_menu(),
|
||||
)
|
||||
|
||||
updater = threading.Thread(
|
||||
target=self._icon_updater, daemon=True, name="icon-updater"
|
||||
)
|
||||
updater.start()
|
||||
|
||||
self._icon.run()
|
||||
|
||||
|
||||
# --------------- Entry point ---------------
|
||||
|
||||
def main():
|
||||
if not ensure_config():
|
||||
sys.exit(0)
|
||||
|
||||
app = WatcherTray()
|
||||
app.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user