v0.12.6 #10

Merged
serversdown merged 43 commits from seismo-lab-new into main 2026-05-04 13:22:56 -04:00
3 changed files with 551 additions and 2 deletions
Showing only changes of commit 625b0a4dfc - Show all commits
+10 -2
View File
@@ -21,7 +21,15 @@ Typical usage (TCP / modem):
from .client import MiniMateClient
from .models import DeviceInfo, Event, MonitorLogEntry
from .transport import SerialTransport, TcpTransport
from .transport import CapturingTransport, SerialTransport, TcpTransport
__version__ = "0.1.0"
__all__ = ["MiniMateClient", "DeviceInfo", "Event", "MonitorLogEntry", "SerialTransport", "TcpTransport"]
__all__ = [
"MiniMateClient",
"DeviceInfo",
"Event",
"MonitorLogEntry",
"SerialTransport",
"TcpTransport",
"CapturingTransport",
]
+99
View File
@@ -454,3 +454,102 @@ class SocketTransport(TcpTransport):
def __repr__(self) -> str:
return f"SocketTransport(peer={self.host!r})"
# ── Capturing transport (MITM-style raw byte mirror) ──────────────────────────
class CapturingTransport(BaseTransport):
"""
Wraps another BaseTransport and mirrors every byte to two raw capture files:
raw_bw_<...>.bin — bytes WE wrote to the device (BW-side TX)
raw_s3_<...>.bin — bytes the device wrote back (S3-side TX)
The file naming and on-wire byte layout are identical to the captures
produced by `bridges/ach_mitm.py`, so the resulting `.bin` files can be
loaded directly by the Analyzer (File > Open Capture) and parsed by the
same tooling used for genuine Blastware MITM captures.
All BaseTransport methods are forwarded to the inner transport; the only
side-effect is that successful read/write byte streams are appended to the
two open binary files.
Args:
inner: An already-built BaseTransport (SerialTransport / TcpTransport).
bw_path: File path for the "BW TX" stream (bytes we send). Opened "wb".
s3_path: File path for the "S3 TX" stream (bytes the device sends).
Opened "wb".
Example:
with CapturingTransport(TcpTransport("1.2.3.4", 9034),
"raw_bw.bin", "raw_s3.bin") as t:
client = MiniMateClient(transport=t)
client.connect()
client.get_events()
# both .bin files now hold the full bidirectional capture.
"""
def __init__(self, inner: BaseTransport, bw_path: str, s3_path: str) -> None:
self._inner = inner
self._bw_path = bw_path
self._s3_path = s3_path
self._bw_fh = None
self._s3_fh = None
# Forward inner attrs so callers can introspect (e.g. .host, .port).
self.host = getattr(inner, "host", None)
self.port = getattr(inner, "port", None)
# ── BaseTransport interface ───────────────────────────────────────────────
def connect(self) -> None:
if self._bw_fh is None:
self._bw_fh = open(self._bw_path, "wb", buffering=0)
if self._s3_fh is None:
self._s3_fh = open(self._s3_path, "wb", buffering=0)
self._inner.connect()
def disconnect(self) -> None:
try:
self._inner.disconnect()
finally:
for fh_attr in ("_bw_fh", "_s3_fh"):
fh = getattr(self, fh_attr)
if fh is not None:
try:
fh.flush()
fh.close()
except Exception:
pass
setattr(self, fh_attr, None)
@property
def is_connected(self) -> bool:
return self._inner.is_connected
def write(self, data: bytes) -> None:
self._inner.write(data)
if data and self._bw_fh is not None:
try:
self._bw_fh.write(data)
except Exception:
pass
def read(self, n: int) -> bytes:
got = self._inner.read(n)
if got and self._s3_fh is not None:
try:
self._s3_fh.write(got)
except Exception:
pass
return got
@property
def bw_path(self) -> str:
return self._bw_path
@property
def s3_path(self) -> str:
return self._s3_path
def __repr__(self) -> str:
return f"CapturingTransport({self._inner!r}, bw={self._bw_path!r}, s3={self._s3_path!r})"
+442
View File
@@ -1997,6 +1997,434 @@ class ConsolePanel(tk.Frame):
self.after(100, self._poll_q)
# ─────────────────────────────────────────────────────────────────────────────
# Download panel — connect to a device, run get_events(), capture wire bytes
# ─────────────────────────────────────────────────────────────────────────────
class DownloadPanel(tk.Frame):
"""
Connect directly to a MiniMate Plus and download events while transparently
saving every wire byte in the same format as a Blastware MITM capture.
Each download produces a session directory containing:
seismo_dl_<ts>/raw_bw_<ts>.bin — bytes WE sent (BW TX)
seismo_dl_<ts>/raw_s3_<ts>.bin — bytes the unit sent (S3 TX)
These files are byte-for-byte compatible with the captures produced by
`bridges/ach_mitm.py` and load directly in the Analyzer tab.
Use this when you want to reproduce or troubleshoot a flow that Blastware
is doing — any session captured here can be diffed against a real BW
capture to confirm wire-level parity.
"""
TAG_TX = "tx"
TAG_RX_RAW = "rx_raw"
TAG_PARSED = "parsed"
TAG_ERROR = "error"
TAG_STATUS = "status"
TAG_HEAD = "head"
MAX_LINES = 5000
def __init__(self, parent: tk.Widget, on_capture_ready=None, **kw):
"""
on_capture_ready(bw_path, s3_path, label) — invoked when a capture
completes successfully so the parent can hand the files to the Analyzer.
"""
super().__init__(parent, bg=BG2, **kw)
self._on_capture_ready = on_capture_ready
self._q: queue.Queue = queue.Queue()
self._running = False
self._cmd_btns: list[tk.Button] = []
self._last_paths: Optional[tuple[str, str, str]] = None # (bw, s3, label)
self._build()
self._poll_q()
# ── build ─────────────────────────────────────────────────────────────
def _build(self) -> None:
pad = {"padx": 5, "pady": 3}
cfg = tk.Frame(self, bg=BG2)
cfg.pack(side=tk.TOP, fill=tk.X, padx=6, pady=4)
# Transport radio
self._transport_var = tk.StringVar(value="tcp")
tk.Radiobutton(
cfg, text="TCP", variable=self._transport_var, value="tcp",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=0, padx=(0, 4))
tk.Radiobutton(
cfg, text="Serial", variable=self._transport_var, value="serial",
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2,
font=MONO, command=self._on_transport_change,
).grid(row=0, column=1, padx=(0, 12))
# TCP fields
self._tcp_frame = tk.Frame(cfg, bg=BG2)
self._tcp_frame.grid(row=0, column=2, sticky="w")
tk.Label(self._tcp_frame, text="Host:", bg=BG2, fg=FG, font=MONO
).pack(side=tk.LEFT, **pad)
self._host_var = tk.StringVar(value="127.0.0.1")
tk.Entry(self._tcp_frame, textvariable=self._host_var, width=18,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._tcp_frame, text="Port:", bg=BG2, fg=FG, font=MONO
).pack(side=tk.LEFT, padx=(10, 4))
self._tcp_port_var = tk.StringVar(value="9034")
tk.Entry(self._tcp_frame, textvariable=self._tcp_port_var, width=6,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Serial fields (hidden by default)
self._serial_frame = tk.Frame(cfg, bg=BG2)
tk.Label(self._serial_frame, text="Port:", bg=BG2, fg=FG, font=MONO
).pack(side=tk.LEFT, **pad)
self._port_var = tk.StringVar(value="COM5")
tk.Entry(self._serial_frame, textvariable=self._port_var, width=10,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
tk.Label(self._serial_frame, text="Baud:", bg=BG2, fg=FG, font=MONO
).pack(side=tk.LEFT, padx=(10, 4))
self._baud_var = tk.StringVar(value="38400")
tk.Entry(self._serial_frame, textvariable=self._baud_var, width=8,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).pack(side=tk.LEFT, padx=2)
# Timeout
tk.Label(cfg, text="Timeout:", bg=BG2, fg=FG, font=MONO
).grid(row=0, column=3, padx=(18, 4))
self._timeout_var = tk.StringVar(value="60")
tk.Entry(cfg, textvariable=self._timeout_var, width=5,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).grid(row=0, column=4, padx=2)
tk.Label(cfg, text="s", bg=BG2, fg=FG_DIM, font=MONO
).grid(row=0, column=5)
# Row 1 — output dir + label
tk.Label(cfg, text="Save to:", bg=BG2, fg=FG, font=MONO
).grid(row=1, column=0, columnspan=2, sticky="e", padx=4, pady=4)
self._dir_var = tk.StringVar(
value=str(SCRIPT_DIR / "bridges" / "captures"))
tk.Entry(cfg, textvariable=self._dir_var, width=46,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).grid(row=1, column=2, columnspan=3, sticky="we", padx=4, pady=4)
tk.Button(cfg, text="Browse", bg=BG3, fg=FG, relief="flat",
cursor="hand2", font=MONO, command=self._choose_dir
).grid(row=1, column=5, padx=4, pady=4)
tk.Label(cfg, text="Label:", bg=BG2, fg=FG, font=MONO
).grid(row=2, column=0, columnspan=2, sticky="e", padx=4, pady=4)
self._label_var = tk.StringVar(value="")
tk.Entry(cfg, textvariable=self._label_var, width=46,
bg=BG3, fg=FG, insertbackground=FG, relief="flat", font=MONO,
).grid(row=2, column=2, columnspan=3, sticky="we", padx=4, pady=4)
tk.Label(cfg, text="(optional)", bg=BG2, fg=FG_DIM, font=MONO_SM
).grid(row=2, column=5, sticky="w", padx=4)
# Row 2 — full waveform toggle
opts = tk.Frame(self, bg=BG2)
opts.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4))
self._full_wf_var = tk.BooleanVar(value=False)
tk.Checkbutton(
opts, text="Full waveform (download raw ADC samples too)",
variable=self._full_wf_var,
bg=BG2, fg=FG, selectcolor=BG3, activebackground=BG2, font=MONO,
).pack(side=tk.LEFT, padx=4)
# Command button row
cmd_row = tk.Frame(self, bg=BG2)
cmd_row.pack(side=tk.TOP, fill=tk.X, padx=6, pady=(0, 4))
for label, cmd in [
("Connect Only", "connect"),
("List Event Keys", "list_keys"),
("Download Events", "download_events"),
]:
btn = tk.Button(
cmd_row, text=label, bg=ACCENT, fg="#ffffff",
relief="flat", padx=10, cursor="hand2", font=MONO,
command=lambda c=cmd: self._run_command(c),
)
btn.pack(side=tk.LEFT, padx=4)
self._cmd_btns.append(btn)
self._open_btn = tk.Button(
cmd_row, text="Open in Analyzer", bg=BG3, fg=FG_DIM,
relief="flat", padx=10, cursor="hand2", font=MONO,
command=self._open_in_analyzer, state="disabled",
)
self._open_btn.pack(side=tk.LEFT, padx=14)
self._status_var = tk.StringVar(value="Ready")
tk.Label(cmd_row, textvariable=self._status_var,
bg=BG2, fg=FG_DIM, font=MONO).pack(side=tk.LEFT, padx=14)
# Console output
self._console = scrolledtext.ScrolledText(
self, height=22, font=MONO_SM,
bg=BG, fg=FG, insertbackground=FG,
relief="flat", state="disabled",
)
self._console.pack(fill=tk.BOTH, expand=True, padx=6, pady=4)
self._console.tag_configure(self.TAG_TX, foreground=ACCENT)
self._console.tag_configure(self.TAG_RX_RAW, foreground=COL_S3)
self._console.tag_configure(self.TAG_PARSED, foreground=GREEN)
self._console.tag_configure(self.TAG_ERROR, foreground=RED)
self._console.tag_configure(self.TAG_STATUS, foreground=FG_DIM)
self._console.tag_configure(self.TAG_HEAD, foreground=YELLOW, font=MONO_B)
# Bottom bar
bot = tk.Frame(self, bg=BG2)
bot.pack(side=tk.BOTTOM, fill=tk.X, padx=6, pady=4)
tk.Button(bot, text="Clear", bg=BG3, fg=FG, relief="flat",
padx=10, cursor="hand2", font=MONO,
command=self._clear_console).pack(side=tk.LEFT, padx=4)
# ── transport toggle ──────────────────────────────────────────────────
def _on_transport_change(self) -> None:
if self._transport_var.get() == "tcp":
self._serial_frame.grid_remove()
self._tcp_frame.grid(row=0, column=2, sticky="w")
else:
self._tcp_frame.grid_remove()
self._serial_frame.grid(row=0, column=2, sticky="w")
def _choose_dir(self) -> None:
d = filedialog.askdirectory(initialdir=self._dir_var.get())
if d:
self._dir_var.set(d)
# ── console helpers ───────────────────────────────────────────────────
def _append(self, text: str, tag: str = "status") -> None:
self._console.configure(state="normal")
self._console.insert(tk.END, text, tag)
line_count = int(self._console.index("end-1c").split(".")[0])
if line_count > self.MAX_LINES:
self._console.delete("1.0", f"{line_count - self.MAX_LINES}.0")
self._console.see(tk.END)
self._console.configure(state="disabled")
def _clear_console(self) -> None:
self._console.configure(state="normal")
self._console.delete("1.0", tk.END)
self._console.configure(state="disabled")
# ── command dispatch ──────────────────────────────────────────────────
def _set_buttons_state(self, state: str) -> None:
for btn in self._cmd_btns:
btn.configure(state=state)
def _run_command(self, cmd: str) -> None:
if self._running:
return
try:
tcp_port = int(self._tcp_port_var.get().strip() or "9034")
baud = int(self._baud_var.get().strip() or "38400")
timeout = float(self._timeout_var.get().strip() or "60")
except ValueError:
messagebox.showerror("Error", "Invalid numeric field.")
return
cfg = {
"transport": self._transport_var.get(),
"host": self._host_var.get().strip(),
"tcp_port": tcp_port,
"port": self._port_var.get().strip(),
"baud": baud,
"timeout": timeout,
"cmd": cmd,
"out_dir": self._dir_var.get().strip() or ".",
"label": self._label_var.get().strip(),
"full_waveform": bool(self._full_wf_var.get()),
}
self._running = True
self._set_buttons_state("disabled")
self._status_var.set("Running…")
threading.Thread(target=self._worker, args=(cfg,), daemon=True).start()
# ── worker thread ─────────────────────────────────────────────────────
def _worker(self, cfg: dict) -> None:
q = self._q
def post(kind: str, text: str) -> None:
q.put((kind, text))
try:
from minimateplus.transport import ( # noqa: WPS433
CapturingTransport,
SerialTransport,
TcpTransport,
)
from minimateplus.client import MiniMateClient # noqa: WPS433
except ImportError as exc:
post("error", f"Import error: {exc}\n")
q.put(("done", None))
return
# Build session directory with timestamp + optional label
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
sess_name = f"seismo_dl_{ts}"
if cfg["label"]:
safe_label = "".join(
ch if ch.isalnum() or ch in ("-", "_") else "_"
for ch in cfg["label"]
)
sess_name = f"{sess_name}_{safe_label}"
try:
session_dir = Path(cfg["out_dir"]) / sess_name
session_dir.mkdir(parents=True, exist_ok=True)
except OSError as exc:
post("error", f"Cannot create session dir: {exc}\n")
q.put(("done", None))
return
bw_path = str(session_dir / f"raw_bw_{ts}.bin")
s3_path = str(session_dir / f"raw_s3_{ts}.bin")
# Build inner transport
if cfg["transport"] == "tcp":
host = cfg["host"]
tcp_port = cfg["tcp_port"]
post("status", f"Connecting {host}:{tcp_port} (TCP)…")
inner = TcpTransport(host, tcp_port, connect_timeout=cfg["timeout"])
else:
port = cfg["port"]
baud = cfg["baud"]
post("status", f"Opening {port} @ {baud} baud…")
inner = SerialTransport(port, baud)
transport = CapturingTransport(inner, bw_path, s3_path)
post("head", f"\n── Session {sess_name} ─────────────────────────────\n")
post("status", f"BW capture: {bw_path}")
post("status", f"S3 capture: {s3_path}")
client = MiniMateClient(transport=transport, timeout=cfg["timeout"])
success = False
try:
with client:
post("head", "\n── connect() — POLL + serial + config + index ──\n")
info = client.connect()
post("parsed", f" serial: {info.serial!r}\n")
if getattr(info, "firmware", None):
post("parsed", f" firmware: {info.firmware!r}\n")
if getattr(info, "model", None):
post("parsed", f" model: {info.model!r}\n")
if cfg["cmd"] == "connect":
success = True
elif cfg["cmd"] == "list_keys":
post("head", "\n── list_event_keys() — browse 1E/0A/1F walk ──\n")
keys = client.list_event_keys()
if not keys:
post("parsed", " (no events stored)\n")
else:
post("parsed", f" {len(keys)} event(s):\n")
for i, k in enumerate(keys):
post("parsed", f" [{i}] {k}\n")
success = True
elif cfg["cmd"] == "download_events":
post("head", "\n── get_events() — full download ──────────────\n")
if cfg["full_waveform"]:
post("status", "Full-waveform mode (raw ADC samples).")
events = client.get_events(full_waveform=cfg["full_waveform"])
post("parsed", f" downloaded {len(events)} event(s)\n")
for ev in events:
ts_str = (
ev.event_time.isoformat(sep=" ", timespec="seconds")
if getattr(ev, "event_time", None) else "?"
)
ppv = getattr(ev, "peaks", None)
ppv_str = ""
if ppv is not None:
try:
ppv_str = f" PPV={ppv.peak_vector_sum:.4f} in/s"
except Exception:
pass
key = (
ev._waveform_key.hex()
if getattr(ev, "_waveform_key", None) else "?"
)
post("parsed",
f" [{ev.index:2d}] key={key} {ts_str}{ppv_str}\n")
success = True
else:
post("error", f"Unknown command: {cfg['cmd']}\n")
post("status", "Done.")
except Exception as exc:
post("error", f"\nError: {exc}\n")
finally:
# Capture files are flushed on transport.disconnect() (via __exit__).
try:
bw_size = Path(bw_path).stat().st_size
s3_size = Path(s3_path).stat().st_size
post("status",
f"Capture closed. BW={bw_size}B S3={s3_size}B")
post("head", f"\n── Capture saved → {session_dir} ─────────\n")
if success:
q.put(("ready", (bw_path, s3_path, sess_name)))
except OSError:
pass
q.put(("done", None))
# ── queue poll ────────────────────────────────────────────────────────
def _poll_q(self) -> None:
try:
while True:
kind, payload = self._q.get_nowait()
if kind == "tx":
self._append(payload, self.TAG_TX)
elif kind == "rx_raw":
self._append(payload, self.TAG_RX_RAW)
elif kind == "parsed":
self._append(payload, self.TAG_PARSED)
elif kind == "error":
self._append(payload, self.TAG_ERROR)
elif kind == "head":
self._append(payload, self.TAG_HEAD)
elif kind == "status":
self._status_var.set(str(payload))
self._append(f" [{payload}]\n", self.TAG_STATUS)
elif kind == "ready":
bw_path, s3_path, label = payload
self._last_paths = (bw_path, s3_path, label)
self._open_btn.configure(state="normal", fg=FG)
elif kind == "done":
self._running = False
self._set_buttons_state("normal")
self._status_var.set("Ready")
except queue.Empty:
pass
finally:
self.after(100, self._poll_q)
# ── analyzer hand-off ─────────────────────────────────────────────────
def _open_in_analyzer(self) -> None:
if not self._last_paths or not self._on_capture_ready:
return
bw_path, s3_path, label = self._last_paths
self._on_capture_ready(bw_path, s3_path, label)
# ─────────────────────────────────────────────────────────────────────────────
# Main application window
# ─────────────────────────────────────────────────────────────────────────────
@@ -2046,6 +2474,12 @@ class SeismoLab(tk.Tk):
)
nb.add(self._serial_watch_panel, text=" Serial Watch ")
self._download_panel = DownloadPanel(
nb,
on_capture_ready=self._on_download_capture_ready,
)
nb.add(self._download_panel, text=" Download ")
self._nb = nb
self.protocol("WM_DELETE_WINDOW", self._on_close)
@@ -2080,6 +2514,14 @@ class SeismoLab(tk.Tk):
self._analyzer_panel.s3_var.set(raw_s3_path)
self._nb.select(1)
def _on_download_capture_ready(self, bw_path: str, s3_path: str, label: str) -> None:
"""Download capture done → load both BW + S3 files into Analyzer and run."""
self._analyzer_panel.stop_live()
self._analyzer_panel.s3_var.set(s3_path)
self._analyzer_panel.bw_var.set(bw_path)
self._analyzer_panel._run_analyze()
self._nb.select(1)
def _on_close(self) -> None:
self._bridge_panel.stop_bridge()
self._serial_watch_panel._stop()