Files

905 lines
39 KiB
Python

#!/usr/bin/env python3
"""
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
This IS your test server. Run it on any machine on the same network, point a
unit's ACEmanager call-home destination at it, and it will speak the full BW
protocol to the device: handshake, pull device info, download all events, save
everything as JSON.
The key thing this script tells you that no amount of packet sniffing can:
- Does the device speak first (push) or wait for us to send POLL (pull)?
If startup() completes normally → it's pull protocol, same as Blastware.
If startup() times out → the device sent something first; check raw_rx.bin.
Usage
-----
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
Setup
-----
1. Run this script on a machine on your local network.
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
find the Call Home / ACH settings. Set:
Remote Host: <this machine's LAN IP>
Remote Port: 12345
3. Trigger the unit (wait for a vibration event, or use the manual call-home
button if your firmware version has one).
4. The unit connects. This script handshakes, downloads all events,
and saves a timestamped session directory.
Output per session
------------------
bridges/captures/ach_inbound_<ts>/
device_info.json — serial number, firmware version, calibration date, etc.
events.json — all events: timestamp, PPV per channel, peaks, metadata
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
raw_tx_<ts>.bin — raw bytes we sent to the device (BW side) for Analyzer
session_<ts>.log — detailed protocol log
What to look for
----------------
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
gets a clean response, it's pull.
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
bulk waveform data — if frequency is sent pre-computed there will be float32
values before the ADC sample blocks.
ACH-specific framing: Does the unit send anything extra before the DLE+STX
framing starts? raw_rx.bin will show raw bytes including any preamble.
"""
from __future__ import annotations
import argparse
import datetime
import json
import logging
import socket
import sys
import threading
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.transport import SocketTransport
from minimateplus.client import MiniMateClient
from minimateplus.models import DeviceInfo, Event, MonitorLogEntry
from sfm.database import SeismoDb
from sfm.waveform_store import WaveformStore
log = logging.getLogger("ach_server")
# ── Per-unit state (downloaded events index) ──────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format (current — v2):
# {
# "BE11529": {
# "downloaded_events": { # key_hex → ISO timestamp string
# "01110000": "2026-04-11T00:42:17",
# "0111245a": "2026-04-11T01:04:30"
# },
# "max_downloaded_key": "0111245a",
# "last_seen": "2026-04-11T01:04:36",
# "serial": "BE11529",
# "peer": "63.43.212.232:51920"
# }
# }
#
# Why (key, timestamp) and not key alone:
# The device's event-key counter resets to 0x01110000 after every memory
# erase (internal or external). A bare-key dedup (the v1 format) cannot
# distinguish a re-recorded event with the same key from one we already
# downloaded. The 0C waveform record's timestamp IS unique per physical
# event, so we pair (key, timestamp) and treat a key with a different
# timestamp as a new event regardless of `max_downloaded_key`.
#
# Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on
# read: the keys are kept under a sentinel of "" (empty string) timestamp so
# the (key, timestamp) compare always sees a mismatch and forces a one-time
# re-download. After that pass the state is rewritten in v2 form.
_state_lock = threading.Lock()
def _load_state(state_path: Path) -> dict:
"""
Load ach_state.json, transparently migrating any legacy
`downloaded_keys: list` entries into the v2 `downloaded_events: dict`
schema. Returns the migrated state.
"""
if not state_path.exists():
return {}
try:
with open(state_path) as f:
state = json.load(f)
except Exception:
return {}
# Per-unit migration: legacy list → dict-with-empty-timestamps
for unit_key, unit_state in list(state.items()):
if not isinstance(unit_state, dict):
continue
if "downloaded_events" in unit_state:
continue
legacy_keys = unit_state.get("downloaded_keys")
if isinstance(legacy_keys, list):
unit_state["downloaded_events"] = {k: "" for k in legacy_keys}
log.info(
"ach_state: migrated %s from v1 (downloaded_keys list) → v2 "
"(downloaded_events dict, %d keys with empty timestamps; "
"they will re-validate on next session)",
unit_key, len(legacy_keys),
)
else:
unit_state["downloaded_events"] = {}
# keep legacy field for one cycle; cleared on next save
unit_state.pop("downloaded_keys", None)
return state
def _save_state(state_path: Path, state: dict) -> None:
with _state_lock:
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
# ── Per-session handler ────────────────────────────────────────────────────────
class AchSession:
"""
Handles one inbound unit connection in its own thread.
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
standard connect → get_device_info → get_events sequence.
State tracking (ach_state.json in output_dir):
On each successful download we record the SET of event keys downloaded.
On the next call-home we compare: if all device keys are already in the
set, there's nothing new. If any key is new (including after the device
was wiped and re-recorded), we download and save only those events.
"""
def __init__(
self,
sock: socket.socket,
peer: str,
output_dir: Path,
timeout: float,
events_only: bool,
max_events: Optional[int],
state_path: Path,
db: "SeismoDb",
store: "WaveformStore",
clear_after_download: bool = False,
restart_monitoring: bool = False,
force_redownload: bool = False,
) -> None:
self.sock = sock
self.peer = peer
self.output_dir = output_dir
self.timeout = timeout
self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
self.db = db
self.store = store
self.clear_after_download = clear_after_download
self.restart_monitoring = restart_monitoring
# `force_redownload` tells this session to ignore ach_state and
# re-download every event currently on the device, regardless of any
# (key, timestamp) match. Useful as a manual override when state has
# become inconsistent with what's actually on disk / in the DB.
self.force_redownload = force_redownload
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Session dir and file handler are created lazily — only after startup
# succeeds. This prevents internet scanners and dropped connections from
# littering the output directory with empty session folders.
try:
self._run_inner(ts)
except Exception as exc:
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
finally:
try:
self.sock.close()
except Exception:
pass
def _run_inner(self, ts: str) -> None:
transport = SocketTransport(self.sock, peer=self.peer)
# Collect raw bytes in memory until startup succeeds, then flush to disk.
raw_rx_buf: list[bytes] = [] # device → us (S3 side)
raw_tx_buf: list[bytes] = [] # us → device (BW side)
_orig_read = transport.read
_orig_write = transport.write
def tapped_read(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_rx_buf.append(data)
return data
def tapped_write(data: bytes) -> None:
_orig_write(data)
if data:
raw_tx_buf.append(data)
transport.read = tapped_read # type: ignore[method-assign]
transport.write = tapped_write # type: ignore[method-assign]
serial: Optional[str] = None
# ── Step 1: startup handshake ─────────────────────────────────────────
# Do this BEFORE creating the session directory so that scanner probes
# and dropped connections leave no trace on disk.
try:
from minimateplus.protocol import MiniMateProtocol
client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open()
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup()
except Exception as exc:
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
return # no session dir created
# Startup succeeded — this is a real unit. Create session dir now.
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_rx_path = session_dir / f"raw_rx_{ts}.bin" # device → us (S3 side)
raw_tx_path = session_dir / f"raw_tx_{ts}.bin" # us → device (BW side)
# Flush buffered bytes to files and switch to direct file writes.
raw_rx_fh = open(raw_rx_path, "wb")
raw_tx_fh = open(raw_tx_path, "wb")
for chunk in raw_rx_buf:
raw_rx_fh.write(chunk)
for chunk in raw_tx_buf:
raw_tx_fh.write(chunk)
raw_rx_buf.clear()
raw_tx_buf.clear()
def tapped_read_file(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_rx_fh.write(data)
raw_rx_fh.flush()
return data
def tapped_write_file(data: bytes) -> None:
_orig_write(data)
if data:
raw_tx_fh.write(data)
raw_tx_fh.flush()
transport.read = tapped_read_file # type: ignore[method-assign]
transport.write = tapped_write_file # type: ignore[method-assign]
# Wire up file handler now that the session dir exists.
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
try:
# ── Step 2: device info ───────────────────────────────────────────
device_info = None
if not self.events_only:
log.info("Step 2/3: reading device info")
try:
device_info = client.connect()
serial = device_info.serial
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
log.info(
" [OK] Device: serial=%s firmware=%s model=%s events=%d",
serial,
device_info.firmware_version,
device_info.model,
device_info.event_count or 0,
)
except Exception as exc:
log.error(" [FAIL] Device info failed: %s", exc)
else:
log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: check for new events by comparing key sets ────────────
log.info("Step 3/3: checking for new events")
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
unit_state = state.get(unit_key, {})
# downloaded_events is the v2 (key_hex → timestamp_iso) dict.
# Empty-string timestamps are migrated v1 entries — they force a
# one-time re-download because the (key, timestamp) compare always
# mismatches against any non-empty timestamp from a fresh 0C read.
seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {}))
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
if self.force_redownload:
log.info(" --force-redownload-all set — ignoring %d cached "
"(key, timestamp) entries for this session",
len(seen_events))
seen_events = {}
# Walk the event index (browse-mode, no 5A) to get the actual current
# key list. The SUB 08 event_count field is a lifetime "total events
# ever recorded" counter that does NOT decrement on erase — confirmed
# 2026-04-13. list_event_keys() via the 1E/1F chain is the only
# reliable way to know what is actually stored on the device right now.
log.info(" Checking device key list (browse walk, no waveform download)...")
try:
device_keys = client.list_event_keys()
except Exception as exc:
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
device_keys = None
current_count = len(device_keys) if device_keys is not None else 0
log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded",
current_count, len(seen_events))
if device_keys is not None and current_count == 0:
log.info(" [OK] No events on device -- nothing to download")
log.info("Session complete (no events) -> %s", session_dir)
return
if device_keys is not None:
# ── Post-erase detection (best-effort, key-only signal) ───────
# After erase the device's key counter resets to 01110000.
# If the device's current max key is below our high-water mark
# we know erase happened. This catches the cleanest case but
# does NOT catch erase-then-record-many-events (where the new
# max may climb past the old max). The (key, timestamp) check
# in get_events() is what handles those.
if device_keys and max_seen_key != "00000000":
max_device_key = max(device_keys)
if max_device_key < max_seen_key:
log.info(
" Post-erase reset detected: "
"device max key %s < historical max %s "
"-- discarding stale (key, ts) state for this session",
max_device_key, max_seen_key,
)
seen_events = {}
# Note: no early-exit "all already downloaded" short-circuit
# here. Without per-event timestamps we cannot tell whether
# device_keys ⊆ seen_events.keys() actually means we have
# those physical events. get_events() will read 0C on its
# skip path and decide per event.
# Apply max_events cap
# stop_idx: when we know the count from list_event_keys, use it as
# an upper bound. When list_event_keys failed (device_keys is None),
# pass None — get_events will run until the null sentinel naturally.
stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None
if self.max_events is not None:
cap = self.max_events - 1
stop_idx = cap if stop_idx is None else min(stop_idx, cap)
if device_keys is not None and self.max_events < current_count:
log.warning(
" max_events=%d cap: will download events 0-%d only "
"(unit has %d total)",
self.max_events, stop_idx, current_count,
)
try:
# Pass `seen_events` (key → ISO timestamp) so the client can
# read 0C on its skip path and only skip 5A when the per-event
# timestamp matches what we already have on disk. When force_-
# redownload is set, seen_events was already cleared above.
#
# Filter out empty-string timestamps (legacy v1 entries) — the
# client's 0C-on-skip-path only trusts entries with a
# populated timestamp; otherwise it falls through to a full
# 5A download.
skip_dict = {k: ts for k, ts in seen_events.items() if ts}
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
skip_waveform_for_events=skip_dict if skip_dict else None,
)
# New events are those that came back with _a5_frames populated
# (= 5A actually ran on this session). Skipped events have
# _a5_frames = None because the client matched (key, timestamp)
# against skip_dict and bypassed 5A.
new_events = [
e for e in all_events
if getattr(e, "_a5_frames", None)
]
skipped = len(all_events) - len(new_events)
log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)",
len(all_events), len(new_events), skipped)
# ── Persist event file + A5 sidecar to the waveform store ──
# Saves ride alongside the existing JSON dump so the on-disk
# event file and events.json reference the same set of events.
waveform_records: dict[str, dict] = {}
for ev in new_events:
if not ev._a5_frames:
continue
try:
rec = self.store.save(
ev,
serial=serial or "UNKNOWN",
a5_frames=ev._a5_frames,
)
if ev._waveform_key is not None:
waveform_records[ev._waveform_key.hex()] = rec
log.info(
" [WAVE] saved %s (%d bytes)",
rec["filename"], rec["filesize"],
)
except Exception as exc:
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
log.warning(
" [WARN] Waveform store save failed for %s: %s",
key_hex, exc,
)
if new_events:
_save_json(
session_dir / "events.json",
[_event_to_dict(e, waveform_records) for e in new_events],
)
for ev in new_events:
pv = ev.peak_values
pi = ev.project_info
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
log.info(
" NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
key_hex,
str(ev.timestamp) if ev.timestamp else "?",
pv.tran if pv else 0,
pv.vert if pv else 0,
pv.long if pv else 0,
pv.peak_vector_sum if pv else 0,
pi.project if pi else "",
)
else:
log.info(" [OK] No new events since last call-home -- nothing to save")
# ── Monitor log entries (partial records / continuous monitoring) ──
# Browse walk (0A + 1F only) to collect monitor log entries for
# recording intervals where no threshold was crossed. This is a
# second 1E-based pass over the device's record list, separate from
# the get_events() download loop above.
log.info(" Collecting monitor log entries (browse walk)...")
new_monitor_entries: list[MonitorLogEntry] = []
try:
new_monitor_entries = client.get_monitor_log_entries(
skip_keys=seen_keys if seen_keys else None,
)
if new_monitor_entries:
_save_json(
session_dir / "monitor_log.json",
[_monitor_log_entry_to_dict(e) for e in new_monitor_entries],
)
log.info(
" [OK] %d new monitor log entry(s) saved",
len(new_monitor_entries),
)
for ml in new_monitor_entries:
log.info(
" MONLOG [%s] %s%s (%s)",
ml.key,
ml.start_time.isoformat() if ml.start_time else "?",
ml.stop_time.isoformat() if ml.stop_time else "?",
f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s",
)
else:
log.info(" [OK] No new monitor log entries")
except Exception as exc:
log.warning(
" [WARN] Monitor log collection failed: %s -- continuing",
exc,
)
# ── Persist to SQLite DB ─────────────────────────────────────
_session_start = datetime.datetime.now()
try:
_ev_ins, _ev_skip = self.db.insert_events(
new_events,
serial=serial or self.peer,
session_id=None,
waveform_records=waveform_records,
device_family="series3",
)
_ml_ins, _ml_skip = self.db.insert_monitor_log(
new_monitor_entries, session_id=None
)
_session_id = self.db.insert_ach_session(
serial=serial or self.peer,
peer=self.peer,
events_downloaded=_ev_ins,
monitor_entries=_ml_ins,
duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(),
session_time=_session_start,
)
log.info(
" [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)",
_session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip,
)
except Exception as exc:
log.warning(" [WARN] DB write failed: %s -- continuing", exc)
# ── Optional: erase device memory after successful download ────
erased_successfully = False
if self.clear_after_download and new_events:
log.info(" Clearing device memory (--clear-after-download)...")
try:
client.delete_all_events()
log.info(" [OK] Device memory cleared")
erased_successfully = True
except Exception as exc:
log.error(
" [WARN] Event deletion failed: %s -- events NOT cleared",
exc,
)
# ── Update persistent state ───────────────────────────────────
# Build a fresh (key → ISO timestamp) map from THIS session's
# results. For each event currently on the device, prefer the
# timestamp we just observed (from 0C); fall back to whatever
# was already in seen_events for that key (so we don't lose an
# entry just because get_events skipped it on the (key, ts)
# match path).
def _ts_iso(ev) -> str:
ts = getattr(ev, "timestamp", None)
if ts is None:
return ""
try:
return datetime.datetime(
ts.year, ts.month, ts.day,
ts.hour or 0, ts.minute or 0, ts.second or 0,
).isoformat()
except Exception:
return str(ts)
current_events_map: dict[str, str] = {}
for ev in all_events:
if ev._waveform_key is None:
continue
key_hex = ev._waveform_key.hex()
ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "")
current_events_map[key_hex] = ts_iso
# Monitor-log entries don't have a 0C-style timestamp, but
# they DO have a start_time; use that so the monitor-log keys
# are properly entered into the (key, ts) map.
for ml in new_monitor_entries:
key_hex = ml.key
ts = ml.start_time
ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "")
# If a triggered event already populated this key, keep
# whichever has a non-empty timestamp.
if key_hex not in current_events_map or not current_events_map[key_hex]:
current_events_map[key_hex] = ts_iso
if erased_successfully:
updated_events: dict[str, str] = {}
new_max_key = "00000000"
log.info(
" State reset after erase -- next session will download "
"from key 0 (device counter resets after erase)"
)
else:
# Merge: keep prior (key, ts) entries we still have evidence
# of (for survivors of any partial failure), plus this
# session's authoritative (key, ts) pairs.
updated_events = dict(seen_events)
updated_events.update(current_events_map)
new_max_key = (
max(updated_events.keys())
if updated_events else max_seen_key
)
state[unit_key] = {
"downloaded_events": updated_events,
"max_downloaded_key": new_max_key,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
except Exception as exc:
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
# ── Optional: restart monitoring after successful download ─────────
if self.restart_monitoring:
log.info(" Restarting monitoring on device (--restart-monitoring)...")
try:
client.start_monitoring()
log.info(" [OK] Monitoring restarted")
except Exception as exc:
log.warning(" [WARN] Failed to restart monitoring: %s", exc)
finally:
raw_rx_fh.close()
raw_tx_fh.close()
client.close() # closes transport / socket cleanly
root_logger.removeHandler(fh)
fh.close()
log.info("Session complete -> %s", session_dir)
log.info("="*60)
# ── JSON helpers ───────────────────────────────────────────────────────────────
def _save_json(path: Path, obj: object) -> None:
with open(path, "w") as f:
json.dump(obj, f, indent=2, default=str)
log.debug("Saved %s", path)
def _device_info_to_dict(d: DeviceInfo) -> dict:
cc = d.compliance_config
return {
"serial": d.serial,
"firmware_version": d.firmware_version,
"dsp_version": d.dsp_version,
"model": d.model,
"event_count": d.event_count,
# compliance config fields (None if 1A read failed)
"setup_name": cc.setup_name if cc else None,
"sample_rate": cc.sample_rate if cc else None,
"record_time": cc.record_time if cc else None,
"trigger_level_geo": cc.trigger_level_geo if cc else None,
"alarm_level_geo": cc.alarm_level_geo if cc else None,
"geo_adc_scale": cc.geo_adc_scale if cc else None, # hw scale factor (in/s)/V
"geo_range": cc.geo_range if cc else None, # 0x01=Normal 10in/s, 0x00=Sensitive 1.25in/s (unconfirmed)
"project": cc.project if cc else None,
"client": cc.client if cc else None,
"operator": cc.operator if cc else None,
"sensor_location": cc.sensor_location if cc else None,
}
def _event_to_dict(
e: Event,
waveform_records: Optional[dict[str, dict]] = None,
) -> dict:
pv = e.peak_values
pi = e.project_info
peaks = {}
if pv:
peaks = {
"transverse": pv.tran,
"vertical": pv.vert,
"longitudinal": pv.long,
"vector_sum": pv.peak_vector_sum,
"mic": pv.micl,
}
samples = {}
if e.raw_samples:
samples = {
ch: vals[:20] # first 20 sample-sets to keep the file sane
for ch, vals in e.raw_samples.items()
}
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
rec: dict = {}
if waveform_records and e._waveform_key is not None:
rec = waveform_records.get(e._waveform_key.hex(), {}) or {}
return {
"timestamp": str(e.timestamp) if e.timestamp else None,
"project": pi.project if pi else None,
"client": pi.client if pi else None,
"operator": pi.operator if pi else None,
"sensor_location": pi.sensor_location if pi else None,
"peaks": peaks,
"raw_samples_preview": samples,
"blastware_filename": rec.get("filename"),
"blastware_filesize": rec.get("filesize"),
"a5_pickle_filename": rec.get("a5_pickle_filename"),
}
def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict:
return {
"key": e.key,
"start_time": e.start_time.isoformat() if e.start_time else None,
"stop_time": e.stop_time.isoformat() if e.stop_time else None,
"duration_seconds": e.duration_seconds,
"serial": e.serial,
"geo_threshold_ips": e.geo_threshold_ips,
}
# ── Main server loop ───────────────────────────────────────────────────────────
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
state_path = output_dir / "ach_state.json"
db = SeismoDb(output_dir / "seismo_relay.db")
store = WaveformStore(output_dir / "waveforms")
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(("0.0.0.0", args.port))
server_sock.listen(5)
# Wake up every second so Ctrl-C is handled promptly on Windows.
# Without this, accept() blocks indefinitely and ignores KeyboardInterrupt.
server_sock.settimeout(1.0)
max_ev = args.max_events
print(f"\n{'='*60}")
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f" State file: {state_path}")
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}")
print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
print(f" Remote Port: {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
allow_ips = set(args.allow_ips)
if allow_ips:
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
else:
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
try:
while True:
try:
client_sock, addr = server_sock.accept()
except socket.timeout:
continue # no connection this second; loop back and check for Ctrl-C
try:
peer_ip = addr[0]
peer = f"{addr[0]}:{addr[1]}"
if allow_ips and peer_ip not in allow_ips:
log.info("Rejected connection from %s (not in allowlist)", peer)
client_sock.close()
continue
log.info("Accepted connection from %s", peer)
session = AchSession(
sock=client_sock,
peer=peer,
output_dir=output_dir,
timeout=args.timeout,
events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
db=db,
store=store,
clear_after_download=args.clear_after_download,
restart_monitoring=args.restart_monitoring,
force_redownload=args.force_redownload_all,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
except KeyboardInterrupt:
raise
except Exception as exc:
log.error("Accept error: %s", exc)
finally:
server_sock.close()
print("\nServer stopped.")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
p.add_argument(
"--port", "-p",
type=int,
default=12345,
help="Port to listen on (default: 12345).",
)
p.add_argument(
"--output", "-o",
default=str(Path(__file__).parent / "captures"),
metavar="DIR",
help="Directory to write session captures (default: bridges/captures/).",
)
p.add_argument(
"--timeout", "-t",
type=float,
default=30.0,
help="Protocol receive timeout in seconds (default: 30.0).",
)
p.add_argument(
"--events-only",
action="store_true",
help="Skip the device-info step and go straight to event download.",
)
p.add_argument(
"--max-events",
type=int,
default=None,
metavar="N",
help=(
"Safety cap: download at most N events per session (default: unlimited). "
"Useful if a unit has many old events stored — prevents a very long first run."
),
)
p.add_argument(
"--allow-ip",
metavar="IP",
action="append",
dest="allow_ips",
default=[],
help=(
"Only accept connections from this IP address (repeat for multiple). "
"Example: --allow-ip 63.43.212.232 "
"If not specified, all IPs are accepted (not recommended for public servers)."
),
)
p.add_argument(
"--restart-monitoring",
action="store_true",
default=False,
help=(
"After downloading events, send SUB 0x96 (start monitoring) before "
"disconnecting. Required for RV55 units whose firmware does not assert "
"DCD on disconnect — without this the unit stays idle after a call-home."
),
)
p.add_argument(
"--clear-after-download",
action="store_true",
default=False,
help=(
"After successfully downloading new events, erase all events from the "
"device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from "
"4-11-26 MITM capture). Only fires when at least one new event was saved. "
"This mirrors the standard Blastware ACH workflow."
),
)
p.add_argument(
"--force-redownload-all",
action="store_true",
default=False,
help=(
"Manual override: ignore ach_state.json's downloaded_events map "
"for this session and re-download every event currently on the "
"device, regardless of (key, timestamp) match. Useful when state "
"has become inconsistent with the on-disk waveform store / DB."
),
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
serve(args)
except KeyboardInterrupt:
print("\nStopped.")