a03c77af09
calibration_date, aux_trigger, setup_name etc. don't exist directly on DeviceInfo — they live in DeviceInfo.compliance_config (ComplianceConfig). _device_info_to_dict now accesses them via cc = d.compliance_config. Log line updated to show serial/firmware/model/event_count instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
510 lines
20 KiB
Python
510 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
|
||
|
||
This IS your test server. Run it on any machine on the same network, point a
|
||
unit's ACEmanager call-home destination at it, and it will speak the full BW
|
||
protocol to the device: handshake, pull device info, download all events, save
|
||
everything as JSON.
|
||
|
||
The key thing this script tells you that no amount of packet sniffing can:
|
||
- Does the device speak first (push) or wait for us to send POLL (pull)?
|
||
|
||
If startup() completes normally → it's pull protocol, same as Blastware.
|
||
If startup() times out → the device sent something first; check raw_rx.bin.
|
||
|
||
Usage
|
||
-----
|
||
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
|
||
|
||
Setup
|
||
-----
|
||
1. Run this script on a machine on your local network.
|
||
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
|
||
find the Call Home / ACH settings. Set:
|
||
Remote Host: <this machine's LAN IP>
|
||
Remote Port: 12345
|
||
3. Trigger the unit (wait for a vibration event, or use the manual call-home
|
||
button if your firmware version has one).
|
||
4. The unit connects. This script handshakes, downloads all events,
|
||
and saves a timestamped session directory.
|
||
|
||
Output per session
|
||
------------------
|
||
bridges/captures/ach_inbound_<ts>/
|
||
device_info.json — serial number, firmware version, calibration date, etc.
|
||
events.json — all events: timestamp, PPV per channel, peaks, metadata
|
||
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
|
||
session_<ts>.log — detailed protocol log
|
||
|
||
What to look for
|
||
----------------
|
||
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
|
||
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
|
||
gets a clean response, it's pull.
|
||
|
||
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
|
||
bulk waveform data — if frequency is sent pre-computed there will be float32
|
||
values before the ADC sample blocks.
|
||
|
||
ACH-specific framing: Does the unit send anything extra before the DLE+STX
|
||
framing starts? raw_rx.bin will show raw bytes including any preamble.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import logging
|
||
import socket
|
||
import sys
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from minimateplus.transport import SocketTransport
|
||
from minimateplus.client import MiniMateClient
|
||
from minimateplus.models import DeviceInfo, Event
|
||
|
||
log = logging.getLogger("ach_server")
|
||
|
||
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
|
||
# Persisted as <output_dir>/ach_state.json
|
||
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
|
||
|
||
_state_lock = threading.Lock()
|
||
|
||
|
||
def _load_state(state_path: Path) -> dict:
|
||
if state_path.exists():
|
||
try:
|
||
with open(state_path) as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_state(state_path: Path, state: dict) -> None:
|
||
with _state_lock:
|
||
with open(state_path, "w") as f:
|
||
json.dump(state, f, indent=2)
|
||
|
||
|
||
# ── Per-session handler ────────────────────────────────────────────────────────
|
||
|
||
class AchSession:
|
||
"""
|
||
Handles one inbound unit connection in its own thread.
|
||
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
|
||
standard connect → get_device_info → get_events sequence.
|
||
|
||
State tracking (ach_state.json in output_dir):
|
||
On each successful download we record how many events the unit had.
|
||
On the next call-home we compare: if count hasn't grown, there's nothing
|
||
new and we close cleanly without downloading. If it has grown, we
|
||
download all events up to the new count and save only the new ones.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
sock: socket.socket,
|
||
peer: str,
|
||
output_dir: Path,
|
||
timeout: float,
|
||
events_only: bool,
|
||
max_events: Optional[int],
|
||
state_path: Path,
|
||
) -> None:
|
||
self.sock = sock
|
||
self.peer = peer
|
||
self.output_dir = output_dir
|
||
self.timeout = timeout
|
||
self.events_only = events_only
|
||
self.max_events = max_events
|
||
self.state_path = state_path
|
||
|
||
def run(self) -> None:
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# Session dir and file handler are created lazily — only after startup
|
||
# succeeds. This prevents internet scanners and dropped connections from
|
||
# littering the output directory with empty session folders.
|
||
try:
|
||
self._run_inner(ts)
|
||
except Exception as exc:
|
||
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
|
||
finally:
|
||
try:
|
||
self.sock.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def _run_inner(self, ts: str) -> None:
|
||
transport = SocketTransport(self.sock, peer=self.peer)
|
||
|
||
# Collect raw bytes in memory until startup succeeds, then flush to disk.
|
||
raw_buf: list[bytes] = []
|
||
_orig_read = transport.read
|
||
|
||
def tapped_read(n: int) -> bytes:
|
||
data = _orig_read(n)
|
||
if data:
|
||
raw_buf.append(data)
|
||
return data
|
||
|
||
transport.read = tapped_read # type: ignore[method-assign]
|
||
|
||
serial: Optional[str] = None
|
||
|
||
# ── Step 1: startup handshake ─────────────────────────────────────────
|
||
# Do this BEFORE creating the session directory so that scanner probes
|
||
# and dropped connections leave no trace on disk.
|
||
try:
|
||
from minimateplus.protocol import MiniMateProtocol
|
||
client = MiniMateClient(transport=transport, timeout=self.timeout)
|
||
client.open()
|
||
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
|
||
proto.startup()
|
||
except Exception as exc:
|
||
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
|
||
return # no session dir created
|
||
|
||
# Startup succeeded — this is a real unit. Create session dir now.
|
||
session_dir = self.output_dir / f"ach_inbound_{ts}"
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
log_path = session_dir / f"session_{ts}.log"
|
||
raw_path = session_dir / f"raw_rx_{ts}.bin"
|
||
|
||
# Flush buffered raw bytes to file and switch to direct file writes.
|
||
raw_fh = open(raw_path, "wb")
|
||
for chunk in raw_buf:
|
||
raw_fh.write(chunk)
|
||
raw_buf.clear()
|
||
|
||
def tapped_read_file(n: int) -> bytes:
|
||
data = _orig_read(n)
|
||
if data:
|
||
raw_fh.write(data)
|
||
raw_fh.flush()
|
||
return data
|
||
|
||
transport.read = tapped_read_file # type: ignore[method-assign]
|
||
|
||
# Wire up file handler now that the session dir exists.
|
||
fh = logging.FileHandler(log_path, encoding="utf-8")
|
||
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
|
||
root_logger = logging.getLogger()
|
||
root_logger.addHandler(fh)
|
||
|
||
try:
|
||
# ── Step 2: device info ───────────────────────────────────────────
|
||
device_info = None
|
||
if not self.events_only:
|
||
log.info("Step 2/3: reading device info")
|
||
try:
|
||
device_info = client.connect()
|
||
serial = device_info.serial
|
||
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
|
||
log.info(
|
||
" [OK] Device: serial=%s firmware=%s model=%s events=%d",
|
||
serial,
|
||
device_info.firmware_version,
|
||
device_info.model,
|
||
device_info.event_count or 0,
|
||
)
|
||
except Exception as exc:
|
||
log.error(" [FAIL] Device info failed: %s", exc)
|
||
else:
|
||
log.info("Step 2/3: skipping device info (--events-only)")
|
||
|
||
# ── Step 3: check for new events via high-water mark ───────────────
|
||
log.info("Step 3/3: checking for new events")
|
||
|
||
state = _load_state(self.state_path)
|
||
unit_key = serial or self.peer # fall back to IP if no serial
|
||
last_count = state.get(unit_key, {}).get("event_count", 0)
|
||
|
||
# Use the event count already read from the event index during connect().
|
||
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
|
||
# Falls back to count_events() only if connect() wasn't called.
|
||
if device_info is not None:
|
||
current_count = device_info.event_count
|
||
else:
|
||
try:
|
||
current_count = client.count_events()
|
||
except Exception as exc:
|
||
log.error(" [FAIL] count_events failed: %s", exc)
|
||
return
|
||
|
||
log.info(" Unit has %d stored event(s); last downloaded count: %d",
|
||
current_count, last_count)
|
||
|
||
if current_count <= last_count:
|
||
log.info(" [OK] No new events since last call-home -- nothing to download")
|
||
log.info("Session complete (no new events) -> %s", session_dir)
|
||
return
|
||
|
||
new_event_count = current_count - last_count
|
||
log.info(" %d new event(s) to download", new_event_count)
|
||
|
||
# Download all events up to current_count, apply max_events cap.
|
||
# We re-download old events too (get_events always starts from 0),
|
||
# but we only SAVE the new ones (the last new_event_count of the list).
|
||
stop_idx = current_count - 1
|
||
if self.max_events is not None:
|
||
stop_idx = min(stop_idx, self.max_events - 1)
|
||
if self.max_events < current_count:
|
||
log.warning(
|
||
" max_events=%d cap: will download events 0–%d only "
|
||
"(unit has %d total)",
|
||
self.max_events, stop_idx, current_count,
|
||
)
|
||
|
||
try:
|
||
all_events = client.get_events(
|
||
full_waveform=True,
|
||
stop_after_index=stop_idx,
|
||
)
|
||
# Only the events beyond last_count are genuinely new
|
||
new_events = all_events[last_count:]
|
||
log.info(" [OK] Downloaded %d total event(s), %d new",
|
||
len(all_events), len(new_events))
|
||
|
||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
||
if last_count > 0 and len(all_events) > len(new_events):
|
||
log.info(" (skipped %d already-seen event(s))", last_count)
|
||
|
||
for i, ev in enumerate(new_events):
|
||
log.info(
|
||
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
|
||
last_count + i,
|
||
ev.timestamp.isoformat() if ev.timestamp else "?",
|
||
ev.peaks.transverse if ev.peaks else 0,
|
||
ev.peaks.vertical if ev.peaks else 0,
|
||
ev.peaks.longitudinal if ev.peaks else 0,
|
||
ev.peaks.vector_sum if ev.peaks else 0,
|
||
)
|
||
|
||
# Update high-water mark
|
||
state[unit_key] = {
|
||
"event_count": current_count,
|
||
"last_seen": datetime.datetime.now().isoformat(),
|
||
"serial": serial,
|
||
"peer": self.peer,
|
||
}
|
||
_save_state(self.state_path, state)
|
||
|
||
except Exception as exc:
|
||
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
|
||
|
||
finally:
|
||
raw_fh.close()
|
||
client.close() # closes transport / socket cleanly
|
||
root_logger.removeHandler(fh)
|
||
fh.close()
|
||
|
||
log.info("Session complete -> %s", session_dir)
|
||
log.info("="*60)
|
||
|
||
|
||
# ── JSON helpers ───────────────────────────────────────────────────────────────
|
||
|
||
def _save_json(path: Path, obj: object) -> None:
|
||
with open(path, "w") as f:
|
||
json.dump(obj, f, indent=2, default=str)
|
||
log.debug("Saved %s", path)
|
||
|
||
|
||
def _device_info_to_dict(d: DeviceInfo) -> dict:
|
||
cc = d.compliance_config
|
||
return {
|
||
"serial": d.serial,
|
||
"firmware_version": d.firmware_version,
|
||
"dsp_version": d.dsp_version,
|
||
"model": d.model,
|
||
"event_count": d.event_count,
|
||
# compliance config fields (None if 1A read failed)
|
||
"setup_name": cc.setup_name if cc else None,
|
||
"sample_rate": cc.sample_rate if cc else None,
|
||
"record_time": cc.record_time if cc else None,
|
||
"trigger_level_geo": cc.trigger_level_geo if cc else None,
|
||
"alarm_level_geo": cc.alarm_level_geo if cc else None,
|
||
"max_range_geo": cc.max_range_geo if cc else None,
|
||
"project": cc.project if cc else None,
|
||
"client": cc.client if cc else None,
|
||
"operator": cc.operator if cc else None,
|
||
"sensor_location": cc.sensor_location if cc else None,
|
||
}
|
||
|
||
|
||
def _event_to_dict(e: Event) -> dict:
|
||
peaks = {}
|
||
if e.peaks:
|
||
peaks = {
|
||
"transverse": e.peaks.transverse,
|
||
"vertical": e.peaks.vertical,
|
||
"longitudinal": e.peaks.longitudinal,
|
||
"vector_sum": e.peaks.vector_sum,
|
||
"mic": e.peaks.mic,
|
||
}
|
||
samples = {}
|
||
if e.raw_samples:
|
||
samples = {
|
||
ch: vals[:20] # first 20 sample-sets to keep the file sane
|
||
for ch, vals in e.raw_samples.items()
|
||
}
|
||
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
|
||
return {
|
||
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
|
||
"project": e.project,
|
||
"client": e.client,
|
||
"operator": e.operator,
|
||
"sensor_location": e.sensor_location,
|
||
"peaks": peaks,
|
||
"raw_samples_preview": samples,
|
||
}
|
||
|
||
|
||
# ── Main server loop ───────────────────────────────────────────────────────────
|
||
|
||
def serve(args: argparse.Namespace) -> None:
|
||
output_dir = Path(args.output)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
state_path = output_dir / "ach_state.json"
|
||
|
||
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
server_sock.bind(("0.0.0.0", args.port))
|
||
server_sock.listen(5)
|
||
# Wake up every second so Ctrl-C is handled promptly on Windows.
|
||
# Without this, accept() blocks indefinitely and ignores KeyboardInterrupt.
|
||
server_sock.settimeout(1.0)
|
||
|
||
max_ev = args.max_events
|
||
print(f"\n{'='*60}")
|
||
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
|
||
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
||
print(f" State file: {state_path}")
|
||
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
||
print(f"{'='*60}")
|
||
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
||
print(f" Remote Host: <this machine's LAN IP>")
|
||
print(f" Remote Port: {args.port}")
|
||
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
||
|
||
allow_ips = set(args.allow_ips)
|
||
if allow_ips:
|
||
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
|
||
else:
|
||
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
client_sock, addr = server_sock.accept()
|
||
except socket.timeout:
|
||
continue # no connection this second; loop back and check for Ctrl-C
|
||
try:
|
||
peer_ip = addr[0]
|
||
peer = f"{addr[0]}:{addr[1]}"
|
||
|
||
if allow_ips and peer_ip not in allow_ips:
|
||
log.info("Rejected connection from %s (not in allowlist)", peer)
|
||
client_sock.close()
|
||
continue
|
||
|
||
log.info("Accepted connection from %s", peer)
|
||
session = AchSession(
|
||
sock=client_sock,
|
||
peer=peer,
|
||
output_dir=output_dir,
|
||
timeout=args.timeout,
|
||
events_only=args.events_only,
|
||
max_events=max_ev,
|
||
state_path=state_path,
|
||
)
|
||
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
||
t.start()
|
||
except KeyboardInterrupt:
|
||
raise
|
||
except Exception as exc:
|
||
log.error("Accept error: %s", exc)
|
||
finally:
|
||
server_sock.close()
|
||
print("\nServer stopped.")
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
p = argparse.ArgumentParser(
|
||
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
p.add_argument(
|
||
"--port", "-p",
|
||
type=int,
|
||
default=12345,
|
||
help="Port to listen on (default: 12345).",
|
||
)
|
||
p.add_argument(
|
||
"--output", "-o",
|
||
default=str(Path(__file__).parent / "captures"),
|
||
metavar="DIR",
|
||
help="Directory to write session captures (default: bridges/captures/).",
|
||
)
|
||
p.add_argument(
|
||
"--timeout", "-t",
|
||
type=float,
|
||
default=30.0,
|
||
help="Protocol receive timeout in seconds (default: 30.0).",
|
||
)
|
||
p.add_argument(
|
||
"--events-only",
|
||
action="store_true",
|
||
help="Skip the device-info step and go straight to event download.",
|
||
)
|
||
p.add_argument(
|
||
"--max-events",
|
||
type=int,
|
||
default=None,
|
||
metavar="N",
|
||
help=(
|
||
"Safety cap: download at most N events per session (default: unlimited). "
|
||
"Useful if a unit has many old events stored — prevents a very long first run."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--allow-ip",
|
||
metavar="IP",
|
||
action="append",
|
||
dest="allow_ips",
|
||
default=[],
|
||
help=(
|
||
"Only accept connections from this IP address (repeat for multiple). "
|
||
"Example: --allow-ip 63.43.212.232 "
|
||
"If not specified, all IPs are accepted (not recommended for public servers)."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--verbose", "-v",
|
||
action="store_true",
|
||
help="Enable debug logging.",
|
||
)
|
||
return p.parse_args()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
args = parse_args()
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||
)
|
||
try:
|
||
serve(args)
|
||
except KeyboardInterrupt:
|
||
print("\nStopped.")
|