#!/usr/bin/env python3 """ ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus. This IS your test server. Run it on any machine on the same network, point a unit's ACEmanager call-home destination at it, and it will speak the full BW protocol to the device: handshake, pull device info, download all events, save everything as JSON. The key thing this script tells you that no amount of packet sniffing can: - Does the device speak first (push) or wait for us to send POLL (pull)? If startup() completes normally → it's pull protocol, same as Blastware. If startup() times out → the device sent something first; check raw_rx.bin. Usage ----- python bridges/ach_server.py [--port 12345] [--output bridges/captures/] Setup ----- 1. Run this script on a machine on your local network. 2. In ACEmanager → Application → ALEOS Application Framework (or equivalent) find the Call Home / ACH settings. Set: Remote Host: Remote Port: 12345 3. Trigger the unit (wait for a vibration event, or use the manual call-home button if your firmware version has one). 4. The unit connects. This script handshakes, downloads all events, and saves a timestamped session directory. Output per session ------------------ bridges/captures/ach_inbound_/ device_info.json — serial number, firmware version, calibration date, etc. events.json — all events: timestamp, PPV per channel, peaks, metadata raw_rx_.bin — raw bytes from the device (S3 side) for Analyzer raw_tx_.bin — raw bytes we sent to the device (BW side) for Analyzer session_.log — detailed protocol log What to look for ---------------- Push vs pull: Check session_.log. If the first line after "Connected" shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL gets a clean response, it's pull. Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry bulk waveform data — if frequency is sent pre-computed there will be float32 values before the ADC sample blocks. ACH-specific framing: Does the unit send anything extra before the DLE+STX framing starts? raw_rx.bin will show raw bytes including any preamble. """ from __future__ import annotations import argparse import datetime import json import logging import socket import sys import threading from pathlib import Path from typing import Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from minimateplus.transport import SocketTransport from minimateplus.client import MiniMateClient from minimateplus.models import DeviceInfo, Event, MonitorLogEntry from sfm.database import SeismoDb from sfm.waveform_store import WaveformStore log = logging.getLogger("ach_server") # ── Per-unit state (downloaded events index) ────────────────────────────────── # Persisted as /ach_state.json # Format (current — v2): # { # "BE11529": { # "downloaded_events": { # key_hex → ISO timestamp string # "01110000": "2026-04-11T00:42:17", # "0111245a": "2026-04-11T01:04:30" # }, # "max_downloaded_key": "0111245a", # "last_seen": "2026-04-11T01:04:36", # "serial": "BE11529", # "peer": "63.43.212.232:51920" # } # } # # Why (key, timestamp) and not key alone: # The device's event-key counter resets to 0x01110000 after every memory # erase (internal or external). A bare-key dedup (the v1 format) cannot # distinguish a re-recorded event with the same key from one we already # downloaded. The 0C waveform record's timestamp IS unique per physical # event, so we pair (key, timestamp) and treat a key with a different # timestamp as a new event regardless of `max_downloaded_key`. # # Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on # read: the keys are kept under a sentinel of "" (empty string) timestamp so # the (key, timestamp) compare always sees a mismatch and forces a one-time # re-download. After that pass the state is rewritten in v2 form. _state_lock = threading.Lock() def _load_state(state_path: Path) -> dict: """ Load ach_state.json, transparently migrating any legacy `downloaded_keys: list` entries into the v2 `downloaded_events: dict` schema. Returns the migrated state. """ if not state_path.exists(): return {} try: with open(state_path) as f: state = json.load(f) except Exception: return {} # Per-unit migration: legacy list → dict-with-empty-timestamps for unit_key, unit_state in list(state.items()): if not isinstance(unit_state, dict): continue if "downloaded_events" in unit_state: continue legacy_keys = unit_state.get("downloaded_keys") if isinstance(legacy_keys, list): unit_state["downloaded_events"] = {k: "" for k in legacy_keys} log.info( "ach_state: migrated %s from v1 (downloaded_keys list) → v2 " "(downloaded_events dict, %d keys with empty timestamps; " "they will re-validate on next session)", unit_key, len(legacy_keys), ) else: unit_state["downloaded_events"] = {} # keep legacy field for one cycle; cleared on next save unit_state.pop("downloaded_keys", None) return state def _save_state(state_path: Path, state: dict) -> None: with _state_lock: with open(state_path, "w") as f: json.dump(state, f, indent=2) # ── Per-session handler ──────────────────────────────────────────────────────── class AchSession: """ Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. State tracking (ach_state.json in output_dir): On each successful download we record the SET of event keys downloaded. On the next call-home we compare: if all device keys are already in the set, there's nothing new. If any key is new (including after the device was wiped and re-recorded), we download and save only those events. """ def __init__( self, sock: socket.socket, peer: str, output_dir: Path, timeout: float, events_only: bool, max_events: Optional[int], state_path: Path, db: "SeismoDb", store: "WaveformStore", clear_after_download: bool = False, restart_monitoring: bool = False, force_redownload: bool = False, ) -> None: self.sock = sock self.peer = peer self.output_dir = output_dir self.timeout = timeout self.events_only = events_only self.max_events = max_events self.state_path = state_path self.db = db self.store = store self.clear_after_download = clear_after_download self.restart_monitoring = restart_monitoring # `force_redownload` tells this session to ignore ach_state and # re-download every event currently on the device, regardless of any # (key, timestamp) match. Useful as a manual override when state has # become inconsistent with what's actually on disk / in the DB. self.force_redownload = force_redownload def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Session dir and file handler are created lazily — only after startup # succeeds. This prevents internet scanners and dropped connections from # littering the output directory with empty session folders. try: self._run_inner(ts) except Exception as exc: log.error("Session failed (%s): %s", self.peer, exc, exc_info=True) finally: try: self.sock.close() except Exception: pass def _run_inner(self, ts: str) -> None: transport = SocketTransport(self.sock, peer=self.peer) # Collect raw bytes in memory until startup succeeds, then flush to disk. raw_rx_buf: list[bytes] = [] # device → us (S3 side) raw_tx_buf: list[bytes] = [] # us → device (BW side) _orig_read = transport.read _orig_write = transport.write def tapped_read(n: int) -> bytes: data = _orig_read(n) if data: raw_rx_buf.append(data) return data def tapped_write(data: bytes) -> None: _orig_write(data) if data: raw_tx_buf.append(data) transport.read = tapped_read # type: ignore[method-assign] transport.write = tapped_write # type: ignore[method-assign] serial: Optional[str] = None # ── Step 1: startup handshake ───────────────────────────────────────── # Do this BEFORE creating the session directory so that scanner probes # and dropped connections leave no trace on disk. try: from minimateplus.protocol import MiniMateProtocol client = MiniMateClient(transport=transport, timeout=self.timeout) client.open() proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() except Exception as exc: log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc) return # no session dir created # Startup succeeded — this is a real unit. Create session dir now. session_dir = self.output_dir / f"ach_inbound_{ts}" session_dir.mkdir(parents=True, exist_ok=True) log_path = session_dir / f"session_{ts}.log" raw_rx_path = session_dir / f"raw_rx_{ts}.bin" # device → us (S3 side) raw_tx_path = session_dir / f"raw_tx_{ts}.bin" # us → device (BW side) # Flush buffered bytes to files and switch to direct file writes. raw_rx_fh = open(raw_rx_path, "wb") raw_tx_fh = open(raw_tx_path, "wb") for chunk in raw_rx_buf: raw_rx_fh.write(chunk) for chunk in raw_tx_buf: raw_tx_fh.write(chunk) raw_rx_buf.clear() raw_tx_buf.clear() def tapped_read_file(n: int) -> bytes: data = _orig_read(n) if data: raw_rx_fh.write(data) raw_rx_fh.flush() return data def tapped_write_file(data: bytes) -> None: _orig_write(data) if data: raw_tx_fh.write(data) raw_tx_fh.flush() transport.read = tapped_read_file # type: ignore[method-assign] transport.write = tapped_write_file # type: ignore[method-assign] # Wire up file handler now that the session dir exists. fh = logging.FileHandler(log_path, encoding="utf-8") fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s")) root_logger = logging.getLogger() root_logger.addHandler(fh) try: # ── Step 2: device info ─────────────────────────────────────────── device_info = None if not self.events_only: log.info("Step 2/3: reading device info") try: device_info = client.connect() serial = device_info.serial _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " [OK] Device: serial=%s firmware=%s model=%s events=%d", serial, device_info.firmware_version, device_info.model, device_info.event_count or 0, ) except Exception as exc: log.error(" [FAIL] Device info failed: %s", exc) else: log.info("Step 2/3: skipping device info (--events-only)") # ── Step 3: check for new events by comparing key sets ──────────── log.info("Step 3/3: checking for new events") state = _load_state(self.state_path) unit_key = serial or self.peer # fall back to IP if no serial unit_state = state.get(unit_key, {}) # downloaded_events is the v2 (key_hex → timestamp_iso) dict. # Empty-string timestamps are migrated v1 entries — they force a # one-time re-download because the (key, timestamp) compare always # mismatches against any non-empty timestamp from a fresh 0C read. seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {})) max_seen_key: str = unit_state.get("max_downloaded_key", "00000000") if self.force_redownload: log.info(" --force-redownload-all set — ignoring %d cached " "(key, timestamp) entries for this session", len(seen_events)) seen_events = {} # Walk the event index (browse-mode, no 5A) to get the actual current # key list. The SUB 08 event_count field is a lifetime "total events # ever recorded" counter that does NOT decrement on erase — confirmed # 2026-04-13. list_event_keys() via the 1E/1F chain is the only # reliable way to know what is actually stored on the device right now. log.info(" Checking device key list (browse walk, no waveform download)...") try: device_keys = client.list_event_keys() except Exception as exc: log.warning(" list_event_keys failed: %s -- falling back to full download", exc) device_keys = None current_count = len(device_keys) if device_keys is not None else 0 log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded", current_count, len(seen_events)) if device_keys is not None and current_count == 0: log.info(" [OK] No events on device -- nothing to download") log.info("Session complete (no events) -> %s", session_dir) return if device_keys is not None: # ── Post-erase detection (best-effort, key-only signal) ─────── # After erase the device's key counter resets to 01110000. # If the device's current max key is below our high-water mark # we know erase happened. This catches the cleanest case but # does NOT catch erase-then-record-many-events (where the new # max may climb past the old max). The (key, timestamp) check # in get_events() is what handles those. if device_keys and max_seen_key != "00000000": max_device_key = max(device_keys) if max_device_key < max_seen_key: log.info( " Post-erase reset detected: " "device max key %s < historical max %s " "-- discarding stale (key, ts) state for this session", max_device_key, max_seen_key, ) seen_events = {} # Note: no early-exit "all already downloaded" short-circuit # here. Without per-event timestamps we cannot tell whether # device_keys ⊆ seen_events.keys() actually means we have # those physical events. get_events() will read 0C on its # skip path and decide per event. # Apply max_events cap # stop_idx: when we know the count from list_event_keys, use it as # an upper bound. When list_event_keys failed (device_keys is None), # pass None — get_events will run until the null sentinel naturally. stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None if self.max_events is not None: cap = self.max_events - 1 stop_idx = cap if stop_idx is None else min(stop_idx, cap) if device_keys is not None and self.max_events < current_count: log.warning( " max_events=%d cap: will download events 0-%d only " "(unit has %d total)", self.max_events, stop_idx, current_count, ) try: # Pass `seen_events` (key → ISO timestamp) so the client can # read 0C on its skip path and only skip 5A when the per-event # timestamp matches what we already have on disk. When force_- # redownload is set, seen_events was already cleared above. # # Filter out empty-string timestamps (legacy v1 entries) — the # client's 0C-on-skip-path only trusts entries with a # populated timestamp; otherwise it falls through to a full # 5A download. skip_dict = {k: ts for k, ts in seen_events.items() if ts} all_events = client.get_events( full_waveform=True, stop_after_index=stop_idx, skip_waveform_for_events=skip_dict if skip_dict else None, ) # New events are those that came back with _a5_frames populated # (= 5A actually ran on this session). Skipped events have # _a5_frames = None because the client matched (key, timestamp) # against skip_dict and bypassed 5A. new_events = [ e for e in all_events if getattr(e, "_a5_frames", None) ] skipped = len(all_events) - len(new_events) log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)", len(all_events), len(new_events), skipped) # ── Persist event file + A5 sidecar to the waveform store ── # Saves ride alongside the existing JSON dump so the on-disk # event file and events.json reference the same set of events. waveform_records: dict[str, dict] = {} for ev in new_events: if not ev._a5_frames: continue try: rec = self.store.save( ev, serial=serial or "UNKNOWN", a5_frames=ev._a5_frames, ) if ev._waveform_key is not None: waveform_records[ev._waveform_key.hex()] = rec log.info( " [WAVE] saved %s (%d bytes)", rec["filename"], rec["filesize"], ) except Exception as exc: key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" log.warning( " [WARN] Waveform store save failed for %s: %s", key_hex, exc, ) if new_events: _save_json( session_dir / "events.json", [_event_to_dict(e, waveform_records) for e in new_events], ) for ev in new_events: pv = ev.peak_values pi = ev.project_info key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????" log.info( " NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r", key_hex, str(ev.timestamp) if ev.timestamp else "?", pv.tran if pv else 0, pv.vert if pv else 0, pv.long if pv else 0, pv.peak_vector_sum if pv else 0, pi.project if pi else "", ) else: log.info(" [OK] No new events since last call-home -- nothing to save") # ── Monitor log entries (partial records / continuous monitoring) ── # Browse walk (0A + 1F only) to collect monitor log entries for # recording intervals where no threshold was crossed. This is a # second 1E-based pass over the device's record list, separate from # the get_events() download loop above. log.info(" Collecting monitor log entries (browse walk)...") new_monitor_entries: list[MonitorLogEntry] = [] try: new_monitor_entries = client.get_monitor_log_entries( skip_keys=seen_keys if seen_keys else None, ) if new_monitor_entries: _save_json( session_dir / "monitor_log.json", [_monitor_log_entry_to_dict(e) for e in new_monitor_entries], ) log.info( " [OK] %d new monitor log entry(s) saved", len(new_monitor_entries), ) for ml in new_monitor_entries: log.info( " MONLOG [%s] %s → %s (%s)", ml.key, ml.start_time.isoformat() if ml.start_time else "?", ml.stop_time.isoformat() if ml.stop_time else "?", f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s", ) else: log.info(" [OK] No new monitor log entries") except Exception as exc: log.warning( " [WARN] Monitor log collection failed: %s -- continuing", exc, ) # ── Persist to SQLite DB ───────────────────────────────────── _session_start = datetime.datetime.now() try: _ev_ins, _ev_skip = self.db.insert_events( new_events, serial=serial or self.peer, session_id=None, waveform_records=waveform_records, ) _ml_ins, _ml_skip = self.db.insert_monitor_log( new_monitor_entries, session_id=None ) _session_id = self.db.insert_ach_session( serial=serial or self.peer, peer=self.peer, events_downloaded=_ev_ins, monitor_entries=_ml_ins, duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(), session_time=_session_start, ) log.info( " [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)", _session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip, ) except Exception as exc: log.warning(" [WARN] DB write failed: %s -- continuing", exc) # ── Optional: erase device memory after successful download ──── erased_successfully = False if self.clear_after_download and new_events: log.info(" Clearing device memory (--clear-after-download)...") try: client.delete_all_events() log.info(" [OK] Device memory cleared") erased_successfully = True except Exception as exc: log.error( " [WARN] Event deletion failed: %s -- events NOT cleared", exc, ) # ── Update persistent state ─────────────────────────────────── # Build a fresh (key → ISO timestamp) map from THIS session's # results. For each event currently on the device, prefer the # timestamp we just observed (from 0C); fall back to whatever # was already in seen_events for that key (so we don't lose an # entry just because get_events skipped it on the (key, ts) # match path). def _ts_iso(ev) -> str: ts = getattr(ev, "timestamp", None) if ts is None: return "" try: return datetime.datetime( ts.year, ts.month, ts.day, ts.hour or 0, ts.minute or 0, ts.second or 0, ).isoformat() except Exception: return str(ts) current_events_map: dict[str, str] = {} for ev in all_events: if ev._waveform_key is None: continue key_hex = ev._waveform_key.hex() ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "") current_events_map[key_hex] = ts_iso # Monitor-log entries don't have a 0C-style timestamp, but # they DO have a start_time; use that so the monitor-log keys # are properly entered into the (key, ts) map. for ml in new_monitor_entries: key_hex = ml.key ts = ml.start_time ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "") # If a triggered event already populated this key, keep # whichever has a non-empty timestamp. if key_hex not in current_events_map or not current_events_map[key_hex]: current_events_map[key_hex] = ts_iso if erased_successfully: updated_events: dict[str, str] = {} new_max_key = "00000000" log.info( " State reset after erase -- next session will download " "from key 0 (device counter resets after erase)" ) else: # Merge: keep prior (key, ts) entries we still have evidence # of (for survivors of any partial failure), plus this # session's authoritative (key, ts) pairs. updated_events = dict(seen_events) updated_events.update(current_events_map) new_max_key = ( max(updated_events.keys()) if updated_events else max_seen_key ) state[unit_key] = { "downloaded_events": updated_events, "max_downloaded_key": new_max_key, "last_seen": datetime.datetime.now().isoformat(), "serial": serial, "peer": self.peer, } _save_state(self.state_path, state) except Exception as exc: log.error(" [FAIL] Event download failed: %s", exc, exc_info=True) # ── Optional: restart monitoring after successful download ───────── if self.restart_monitoring: log.info(" Restarting monitoring on device (--restart-monitoring)...") try: client.start_monitoring() log.info(" [OK] Monitoring restarted") except Exception as exc: log.warning(" [WARN] Failed to restart monitoring: %s", exc) finally: raw_rx_fh.close() raw_tx_fh.close() client.close() # closes transport / socket cleanly root_logger.removeHandler(fh) fh.close() log.info("Session complete -> %s", session_dir) log.info("="*60) # ── JSON helpers ─────────────────────────────────────────────────────────────── def _save_json(path: Path, obj: object) -> None: with open(path, "w") as f: json.dump(obj, f, indent=2, default=str) log.debug("Saved %s", path) def _device_info_to_dict(d: DeviceInfo) -> dict: cc = d.compliance_config return { "serial": d.serial, "firmware_version": d.firmware_version, "dsp_version": d.dsp_version, "model": d.model, "event_count": d.event_count, # compliance config fields (None if 1A read failed) "setup_name": cc.setup_name if cc else None, "sample_rate": cc.sample_rate if cc else None, "record_time": cc.record_time if cc else None, "trigger_level_geo": cc.trigger_level_geo if cc else None, "alarm_level_geo": cc.alarm_level_geo if cc else None, "geo_adc_scale": cc.geo_adc_scale if cc else None, # hw scale factor (in/s)/V "geo_range": cc.geo_range if cc else None, # 0x01=Normal 10in/s, 0x00=Sensitive 1.25in/s (unconfirmed) "project": cc.project if cc else None, "client": cc.client if cc else None, "operator": cc.operator if cc else None, "sensor_location": cc.sensor_location if cc else None, } def _event_to_dict( e: Event, waveform_records: Optional[dict[str, dict]] = None, ) -> dict: pv = e.peak_values pi = e.project_info peaks = {} if pv: peaks = { "transverse": pv.tran, "vertical": pv.vert, "longitudinal": pv.long, "vector_sum": pv.peak_vector_sum, "mic": pv.micl, } samples = {} if e.raw_samples: samples = { ch: vals[:20] # first 20 sample-sets to keep the file sane for ch, vals in e.raw_samples.items() } samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform" rec: dict = {} if waveform_records and e._waveform_key is not None: rec = waveform_records.get(e._waveform_key.hex(), {}) or {} return { "timestamp": str(e.timestamp) if e.timestamp else None, "project": pi.project if pi else None, "client": pi.client if pi else None, "operator": pi.operator if pi else None, "sensor_location": pi.sensor_location if pi else None, "peaks": peaks, "raw_samples_preview": samples, "blastware_filename": rec.get("filename"), "blastware_filesize": rec.get("filesize"), "a5_pickle_filename": rec.get("a5_pickle_filename"), } def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict: return { "key": e.key, "start_time": e.start_time.isoformat() if e.start_time else None, "stop_time": e.stop_time.isoformat() if e.stop_time else None, "duration_seconds": e.duration_seconds, "serial": e.serial, "geo_threshold_ips": e.geo_threshold_ips, } # ── Main server loop ─────────────────────────────────────────────────────────── def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) state_path = output_dir / "ach_state.json" db = SeismoDb(output_dir / "seismo_relay.db") store = WaveformStore(output_dir / "waveforms") server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) # Wake up every second so Ctrl-C is handled promptly on Windows. # Without this, accept() blocks indefinitely and ignores KeyboardInterrupt. server_sock.settimeout(1.0) max_ev = args.max_events print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" Output: {output_dir.resolve()}/ach_inbound_/") print(f" State file: {state_path}") print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}") print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}") print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") print(f" Remote Port: {args.port}") print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n") allow_ips = set(args.allow_ips) if allow_ips: print(f" Allowlist: {', '.join(sorted(allow_ips))}") else: print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)") try: while True: try: client_sock, addr = server_sock.accept() except socket.timeout: continue # no connection this second; loop back and check for Ctrl-C try: peer_ip = addr[0] peer = f"{addr[0]}:{addr[1]}" if allow_ips and peer_ip not in allow_ips: log.info("Rejected connection from %s (not in allowlist)", peer) client_sock.close() continue log.info("Accepted connection from %s", peer) session = AchSession( sock=client_sock, peer=peer, output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, max_events=max_ev, state_path=state_path, db=db, store=store, clear_after_download=args.clear_after_download, restart_monitoring=args.restart_monitoring, force_redownload=args.force_redownload_all, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() except KeyboardInterrupt: raise except Exception as exc: log.error("Accept error: %s", exc) finally: server_sock.close() print("\nServer stopped.") def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument( "--port", "-p", type=int, default=12345, help="Port to listen on (default: 12345).", ) p.add_argument( "--output", "-o", default=str(Path(__file__).parent / "captures"), metavar="DIR", help="Directory to write session captures (default: bridges/captures/).", ) p.add_argument( "--timeout", "-t", type=float, default=30.0, help="Protocol receive timeout in seconds (default: 30.0).", ) p.add_argument( "--events-only", action="store_true", help="Skip the device-info step and go straight to event download.", ) p.add_argument( "--max-events", type=int, default=None, metavar="N", help=( "Safety cap: download at most N events per session (default: unlimited). " "Useful if a unit has many old events stored — prevents a very long first run." ), ) p.add_argument( "--allow-ip", metavar="IP", action="append", dest="allow_ips", default=[], help=( "Only accept connections from this IP address (repeat for multiple). " "Example: --allow-ip 63.43.212.232 " "If not specified, all IPs are accepted (not recommended for public servers)." ), ) p.add_argument( "--restart-monitoring", action="store_true", default=False, help=( "After downloading events, send SUB 0x96 (start monitoring) before " "disconnecting. Required for RV55 units whose firmware does not assert " "DCD on disconnect — without this the unit stays idle after a call-home." ), ) p.add_argument( "--clear-after-download", action="store_true", default=False, help=( "After successfully downloading new events, erase all events from the " "device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from " "4-11-26 MITM capture). Only fires when at least one new event was saved. " "This mirrors the standard Blastware ACH workflow." ), ) p.add_argument( "--force-redownload-all", action="store_true", default=False, help=( "Manual override: ignore ach_state.json's downloaded_events map " "for this session and re-download every event currently on the " "device, regardless of (key, timestamp) match. Useful when state " "has become inconsistent with the on-disk waveform store / DB." ), ) p.add_argument( "--verbose", "-v", action="store_true", help="Enable debug logging.", ) return p.parse_args() if __name__ == "__main__": args = parse_args() logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)-7s %(name)s %(message)s", ) try: serve(args) except KeyboardInterrupt: print("\nStopped.")