Files
seismo-relay/bridges/ach_server.py
T
claude 4921b0489a fix: correct Event and PeakValues field names in ach_server serialization
Event model uses peak_values (not peaks) and project_info (not direct fields).
PeakValues fields are tran/vert/long/micl/peak_vector_sum (not transverse etc).
ProjectInfo fields accessed via ev.project_info.project etc.

Also fix ev.timestamp serialization: use str() instead of .isoformat() since
Timestamp is a custom dataclass, not datetime.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 21:14:58 +00:00

515 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
This IS your test server. Run it on any machine on the same network, point a
unit's ACEmanager call-home destination at it, and it will speak the full BW
protocol to the device: handshake, pull device info, download all events, save
everything as JSON.
The key thing this script tells you that no amount of packet sniffing can:
- Does the device speak first (push) or wait for us to send POLL (pull)?
If startup() completes normally → it's pull protocol, same as Blastware.
If startup() times out → the device sent something first; check raw_rx.bin.
Usage
-----
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
Setup
-----
1. Run this script on a machine on your local network.
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
find the Call Home / ACH settings. Set:
Remote Host: <this machine's LAN IP>
Remote Port: 12345
3. Trigger the unit (wait for a vibration event, or use the manual call-home
button if your firmware version has one).
4. The unit connects. This script handshakes, downloads all events,
and saves a timestamped session directory.
Output per session
------------------
bridges/captures/ach_inbound_<ts>/
device_info.json — serial number, firmware version, calibration date, etc.
events.json — all events: timestamp, PPV per channel, peaks, metadata
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
session_<ts>.log — detailed protocol log
What to look for
----------------
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
gets a clean response, it's pull.
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
bulk waveform data — if frequency is sent pre-computed there will be float32
values before the ADC sample blocks.
ACH-specific framing: Does the unit send anything extra before the DLE+STX
framing starts? raw_rx.bin will show raw bytes including any preamble.
"""
from __future__ import annotations
import argparse
import datetime
import json
import logging
import socket
import sys
import threading
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from minimateplus.transport import SocketTransport
from minimateplus.client import MiniMateClient
from minimateplus.models import DeviceInfo, Event
log = logging.getLogger("ach_server")
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
_state_lock = threading.Lock()
def _load_state(state_path: Path) -> dict:
if state_path.exists():
try:
with open(state_path) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_state(state_path: Path, state: dict) -> None:
with _state_lock:
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
# ── Per-session handler ────────────────────────────────────────────────────────
class AchSession:
"""
Handles one inbound unit connection in its own thread.
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
standard connect → get_device_info → get_events sequence.
State tracking (ach_state.json in output_dir):
On each successful download we record how many events the unit had.
On the next call-home we compare: if count hasn't grown, there's nothing
new and we close cleanly without downloading. If it has grown, we
download all events up to the new count and save only the new ones.
"""
def __init__(
self,
sock: socket.socket,
peer: str,
output_dir: Path,
timeout: float,
events_only: bool,
max_events: Optional[int],
state_path: Path,
) -> None:
self.sock = sock
self.peer = peer
self.output_dir = output_dir
self.timeout = timeout
self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Session dir and file handler are created lazily — only after startup
# succeeds. This prevents internet scanners and dropped connections from
# littering the output directory with empty session folders.
try:
self._run_inner(ts)
except Exception as exc:
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
finally:
try:
self.sock.close()
except Exception:
pass
def _run_inner(self, ts: str) -> None:
transport = SocketTransport(self.sock, peer=self.peer)
# Collect raw bytes in memory until startup succeeds, then flush to disk.
raw_buf: list[bytes] = []
_orig_read = transport.read
def tapped_read(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_buf.append(data)
return data
transport.read = tapped_read # type: ignore[method-assign]
serial: Optional[str] = None
# ── Step 1: startup handshake ─────────────────────────────────────────
# Do this BEFORE creating the session directory so that scanner probes
# and dropped connections leave no trace on disk.
try:
from minimateplus.protocol import MiniMateProtocol
client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open()
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup()
except Exception as exc:
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
return # no session dir created
# Startup succeeded — this is a real unit. Create session dir now.
session_dir = self.output_dir / f"ach_inbound_{ts}"
session_dir.mkdir(parents=True, exist_ok=True)
log_path = session_dir / f"session_{ts}.log"
raw_path = session_dir / f"raw_rx_{ts}.bin"
# Flush buffered raw bytes to file and switch to direct file writes.
raw_fh = open(raw_path, "wb")
for chunk in raw_buf:
raw_fh.write(chunk)
raw_buf.clear()
def tapped_read_file(n: int) -> bytes:
data = _orig_read(n)
if data:
raw_fh.write(data)
raw_fh.flush()
return data
transport.read = tapped_read_file # type: ignore[method-assign]
# Wire up file handler now that the session dir exists.
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(fh)
try:
# ── Step 2: device info ───────────────────────────────────────────
device_info = None
if not self.events_only:
log.info("Step 2/3: reading device info")
try:
device_info = client.connect()
serial = device_info.serial
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
log.info(
" [OK] Device: serial=%s firmware=%s model=%s events=%d",
serial,
device_info.firmware_version,
device_info.model,
device_info.event_count or 0,
)
except Exception as exc:
log.error(" [FAIL] Device info failed: %s", exc)
else:
log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: check for new events via high-water mark ───────────────
log.info("Step 3/3: checking for new events")
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
last_count = state.get(unit_key, {}).get("event_count", 0)
# Use the event count already read from the event index during connect().
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
# Falls back to count_events() only if connect() wasn't called.
if device_info is not None:
current_count = device_info.event_count
else:
try:
current_count = client.count_events()
except Exception as exc:
log.error(" [FAIL] count_events failed: %s", exc)
return
log.info(" Unit has %d stored event(s); last downloaded count: %d",
current_count, last_count)
if current_count <= last_count:
log.info(" [OK] No new events since last call-home -- nothing to download")
log.info("Session complete (no new events) -> %s", session_dir)
return
new_event_count = current_count - last_count
log.info(" %d new event(s) to download", new_event_count)
# Download all events up to current_count, apply max_events cap.
# We re-download old events too (get_events always starts from 0),
# but we only SAVE the new ones (the last new_event_count of the list).
stop_idx = current_count - 1
if self.max_events is not None:
stop_idx = min(stop_idx, self.max_events - 1)
if self.max_events < current_count:
log.warning(
" max_events=%d cap: will download events 0%d only "
"(unit has %d total)",
self.max_events, stop_idx, current_count,
)
try:
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
)
# Only the events beyond last_count are genuinely new
new_events = all_events[last_count:]
log.info(" [OK] Downloaded %d total event(s), %d new",
len(all_events), len(new_events))
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
if last_count > 0 and len(all_events) > len(new_events):
log.info(" (skipped %d already-seen event(s))", last_count)
for i, ev in enumerate(new_events):
pv = ev.peak_values
pi = ev.project_info
log.info(
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
last_count + i,
str(ev.timestamp) if ev.timestamp else "?",
pv.tran if pv else 0,
pv.vert if pv else 0,
pv.long if pv else 0,
pv.peak_vector_sum if pv else 0,
pi.project if pi else "",
)
# Update high-water mark
state[unit_key] = {
"event_count": current_count,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
except Exception as exc:
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
finally:
raw_fh.close()
client.close() # closes transport / socket cleanly
root_logger.removeHandler(fh)
fh.close()
log.info("Session complete -> %s", session_dir)
log.info("="*60)
# ── JSON helpers ───────────────────────────────────────────────────────────────
def _save_json(path: Path, obj: object) -> None:
with open(path, "w") as f:
json.dump(obj, f, indent=2, default=str)
log.debug("Saved %s", path)
def _device_info_to_dict(d: DeviceInfo) -> dict:
cc = d.compliance_config
return {
"serial": d.serial,
"firmware_version": d.firmware_version,
"dsp_version": d.dsp_version,
"model": d.model,
"event_count": d.event_count,
# compliance config fields (None if 1A read failed)
"setup_name": cc.setup_name if cc else None,
"sample_rate": cc.sample_rate if cc else None,
"record_time": cc.record_time if cc else None,
"trigger_level_geo": cc.trigger_level_geo if cc else None,
"alarm_level_geo": cc.alarm_level_geo if cc else None,
"max_range_geo": cc.max_range_geo if cc else None,
"project": cc.project if cc else None,
"client": cc.client if cc else None,
"operator": cc.operator if cc else None,
"sensor_location": cc.sensor_location if cc else None,
}
def _event_to_dict(e: Event) -> dict:
pv = e.peak_values
pi = e.project_info
peaks = {}
if pv:
peaks = {
"transverse": pv.tran,
"vertical": pv.vert,
"longitudinal": pv.long,
"vector_sum": pv.peak_vector_sum,
"mic": pv.micl,
}
samples = {}
if e.raw_samples:
samples = {
ch: vals[:20] # first 20 sample-sets to keep the file sane
for ch, vals in e.raw_samples.items()
}
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
return {
"timestamp": str(e.timestamp) if e.timestamp else None,
"project": pi.project if pi else None,
"client": pi.client if pi else None,
"operator": pi.operator if pi else None,
"sensor_location": pi.sensor_location if pi else None,
"peaks": peaks,
"raw_samples_preview": samples,
}
# ── Main server loop ───────────────────────────────────────────────────────────
def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
state_path = output_dir / "ach_state.json"
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(("0.0.0.0", args.port))
server_sock.listen(5)
# Wake up every second so Ctrl-C is handled promptly on Windows.
# Without this, accept() blocks indefinitely and ignores KeyboardInterrupt.
server_sock.settimeout(1.0)
max_ev = args.max_events
print(f"\n{'='*60}")
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f" State file: {state_path}")
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>")
print(f" Remote Port: {args.port}")
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
allow_ips = set(args.allow_ips)
if allow_ips:
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
else:
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
try:
while True:
try:
client_sock, addr = server_sock.accept()
except socket.timeout:
continue # no connection this second; loop back and check for Ctrl-C
try:
peer_ip = addr[0]
peer = f"{addr[0]}:{addr[1]}"
if allow_ips and peer_ip not in allow_ips:
log.info("Rejected connection from %s (not in allowlist)", peer)
client_sock.close()
continue
log.info("Accepted connection from %s", peer)
session = AchSession(
sock=client_sock,
peer=peer,
output_dir=output_dir,
timeout=args.timeout,
events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
)
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start()
except KeyboardInterrupt:
raise
except Exception as exc:
log.error("Accept error: %s", exc)
finally:
server_sock.close()
print("\nServer stopped.")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
p.add_argument(
"--port", "-p",
type=int,
default=12345,
help="Port to listen on (default: 12345).",
)
p.add_argument(
"--output", "-o",
default=str(Path(__file__).parent / "captures"),
metavar="DIR",
help="Directory to write session captures (default: bridges/captures/).",
)
p.add_argument(
"--timeout", "-t",
type=float,
default=30.0,
help="Protocol receive timeout in seconds (default: 30.0).",
)
p.add_argument(
"--events-only",
action="store_true",
help="Skip the device-info step and go straight to event download.",
)
p.add_argument(
"--max-events",
type=int,
default=None,
metavar="N",
help=(
"Safety cap: download at most N events per session (default: unlimited). "
"Useful if a unit has many old events stored — prevents a very long first run."
),
)
p.add_argument(
"--allow-ip",
metavar="IP",
action="append",
dest="allow_ips",
default=[],
help=(
"Only accept connections from this IP address (repeat for multiple). "
"Example: --allow-ip 63.43.212.232 "
"If not specified, all IPs are accepted (not recommended for public servers)."
),
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable debug logging.",
)
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
)
try:
serve(args)
except KeyboardInterrupt:
print("\nStopped.")