3f7b5c07b5
- Session directory and log file are now created ONLY after startup() succeeds. Internet scanners and dropped connections no longer litter the output folder. Raw bytes are buffered in memory until startup succeeds, then flushed to disk. - Add --allow-ip IP flag (repeatable) to allowlist specific source IPs. Connections from un-listed IPs are rejected immediately (socket closed, no log). If no --allow-ip flags are given, all IPs are still accepted (original behavior). Usage: --allow-ip 63.43.212.232 --allow-ip 152.1.2.3 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
500 lines
19 KiB
Python
500 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
|
||
|
||
This IS your test server. Run it on any machine on the same network, point a
|
||
unit's ACEmanager call-home destination at it, and it will speak the full BW
|
||
protocol to the device: handshake, pull device info, download all events, save
|
||
everything as JSON.
|
||
|
||
The key thing this script tells you that no amount of packet sniffing can:
|
||
- Does the device speak first (push) or wait for us to send POLL (pull)?
|
||
|
||
If startup() completes normally → it's pull protocol, same as Blastware.
|
||
If startup() times out → the device sent something first; check raw_rx.bin.
|
||
|
||
Usage
|
||
-----
|
||
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
|
||
|
||
Setup
|
||
-----
|
||
1. Run this script on a machine on your local network.
|
||
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
|
||
find the Call Home / ACH settings. Set:
|
||
Remote Host: <this machine's LAN IP>
|
||
Remote Port: 12345
|
||
3. Trigger the unit (wait for a vibration event, or use the manual call-home
|
||
button if your firmware version has one).
|
||
4. The unit connects. This script handshakes, downloads all events,
|
||
and saves a timestamped session directory.
|
||
|
||
Output per session
|
||
------------------
|
||
bridges/captures/ach_inbound_<ts>/
|
||
device_info.json — serial number, firmware version, calibration date, etc.
|
||
events.json — all events: timestamp, PPV per channel, peaks, metadata
|
||
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
|
||
session_<ts>.log — detailed protocol log
|
||
|
||
What to look for
|
||
----------------
|
||
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
|
||
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
|
||
gets a clean response, it's pull.
|
||
|
||
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
|
||
bulk waveform data — if frequency is sent pre-computed there will be float32
|
||
values before the ADC sample blocks.
|
||
|
||
ACH-specific framing: Does the unit send anything extra before the DLE+STX
|
||
framing starts? raw_rx.bin will show raw bytes including any preamble.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import logging
|
||
import socket
|
||
import sys
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
from minimateplus.transport import SocketTransport
|
||
from minimateplus.client import MiniMateClient
|
||
from minimateplus.models import DeviceInfo, Event
|
||
|
||
log = logging.getLogger("ach_server")
|
||
|
||
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
|
||
# Persisted as <output_dir>/ach_state.json
|
||
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
|
||
|
||
_state_lock = threading.Lock()
|
||
|
||
|
||
def _load_state(state_path: Path) -> dict:
|
||
if state_path.exists():
|
||
try:
|
||
with open(state_path) as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_state(state_path: Path, state: dict) -> None:
|
||
with _state_lock:
|
||
with open(state_path, "w") as f:
|
||
json.dump(state, f, indent=2)
|
||
|
||
|
||
# ── Per-session handler ────────────────────────────────────────────────────────
|
||
|
||
class AchSession:
|
||
"""
|
||
Handles one inbound unit connection in its own thread.
|
||
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
|
||
standard connect → get_device_info → get_events sequence.
|
||
|
||
State tracking (ach_state.json in output_dir):
|
||
On each successful download we record how many events the unit had.
|
||
On the next call-home we compare: if count hasn't grown, there's nothing
|
||
new and we close cleanly without downloading. If it has grown, we
|
||
download all events up to the new count and save only the new ones.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
sock: socket.socket,
|
||
peer: str,
|
||
output_dir: Path,
|
||
timeout: float,
|
||
events_only: bool,
|
||
max_events: Optional[int],
|
||
state_path: Path,
|
||
) -> None:
|
||
self.sock = sock
|
||
self.peer = peer
|
||
self.output_dir = output_dir
|
||
self.timeout = timeout
|
||
self.events_only = events_only
|
||
self.max_events = max_events
|
||
self.state_path = state_path
|
||
|
||
def run(self) -> None:
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# Session dir and file handler are created lazily — only after startup
|
||
# succeeds. This prevents internet scanners and dropped connections from
|
||
# littering the output directory with empty session folders.
|
||
try:
|
||
self._run_inner(ts)
|
||
except Exception as exc:
|
||
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
|
||
finally:
|
||
try:
|
||
self.sock.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def _run_inner(self, ts: str) -> None:
|
||
transport = SocketTransport(self.sock, peer=self.peer)
|
||
|
||
# Collect raw bytes in memory until startup succeeds, then flush to disk.
|
||
raw_buf: list[bytes] = []
|
||
_orig_read = transport.read
|
||
|
||
def tapped_read(n: int) -> bytes:
|
||
data = _orig_read(n)
|
||
if data:
|
||
raw_buf.append(data)
|
||
return data
|
||
|
||
transport.read = tapped_read # type: ignore[method-assign]
|
||
|
||
serial: Optional[str] = None
|
||
|
||
# ── Step 1: startup handshake ─────────────────────────────────────────
|
||
# Do this BEFORE creating the session directory so that scanner probes
|
||
# and dropped connections leave no trace on disk.
|
||
try:
|
||
from minimateplus.protocol import MiniMateProtocol
|
||
client = MiniMateClient(transport=transport, timeout=self.timeout)
|
||
client.open()
|
||
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
|
||
proto.startup()
|
||
except Exception as exc:
|
||
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
|
||
return # no session dir created
|
||
|
||
# Startup succeeded — this is a real unit. Create session dir now.
|
||
session_dir = self.output_dir / f"ach_inbound_{ts}"
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
log_path = session_dir / f"session_{ts}.log"
|
||
raw_path = session_dir / f"raw_rx_{ts}.bin"
|
||
|
||
# Flush buffered raw bytes to file and switch to direct file writes.
|
||
raw_fh = open(raw_path, "wb")
|
||
for chunk in raw_buf:
|
||
raw_fh.write(chunk)
|
||
raw_buf.clear()
|
||
|
||
def tapped_read_file(n: int) -> bytes:
|
||
data = _orig_read(n)
|
||
if data:
|
||
raw_fh.write(data)
|
||
raw_fh.flush()
|
||
return data
|
||
|
||
transport.read = tapped_read_file # type: ignore[method-assign]
|
||
|
||
# Wire up file handler now that the session dir exists.
|
||
fh = logging.FileHandler(log_path, encoding="utf-8")
|
||
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
|
||
root_logger = logging.getLogger()
|
||
root_logger.addHandler(fh)
|
||
|
||
try:
|
||
# ── Step 2: device info ───────────────────────────────────────────
|
||
device_info = None
|
||
if not self.events_only:
|
||
log.info("Step 2/3: reading device info")
|
||
try:
|
||
device_info = client.connect()
|
||
serial = device_info.serial
|
||
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
|
||
log.info(
|
||
" [OK] Device: serial=%s firmware=%s calibration=%s",
|
||
serial,
|
||
device_info.firmware_version,
|
||
device_info.calibration_date,
|
||
)
|
||
except Exception as exc:
|
||
log.error(" [FAIL] Device info failed: %s", exc)
|
||
else:
|
||
log.info("Step 2/3: skipping device info (--events-only)")
|
||
|
||
# ── Step 3: check for new events via high-water mark ───────────────
|
||
log.info("Step 3/3: checking for new events")
|
||
|
||
state = _load_state(self.state_path)
|
||
unit_key = serial or self.peer # fall back to IP if no serial
|
||
last_count = state.get(unit_key, {}).get("event_count", 0)
|
||
|
||
# Use the event count already read from the event index during connect().
|
||
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
|
||
# Falls back to count_events() only if connect() wasn't called.
|
||
if device_info is not None:
|
||
current_count = device_info.event_count
|
||
else:
|
||
try:
|
||
current_count = client.count_events()
|
||
except Exception as exc:
|
||
log.error(" [FAIL] count_events failed: %s", exc)
|
||
return
|
||
|
||
log.info(" Unit has %d stored event(s); last downloaded count: %d",
|
||
current_count, last_count)
|
||
|
||
if current_count <= last_count:
|
||
log.info(" [OK] No new events since last call-home -- nothing to download")
|
||
log.info("Session complete (no new events) -> %s", session_dir)
|
||
return
|
||
|
||
new_event_count = current_count - last_count
|
||
log.info(" %d new event(s) to download", new_event_count)
|
||
|
||
# Download all events up to current_count, apply max_events cap.
|
||
# We re-download old events too (get_events always starts from 0),
|
||
# but we only SAVE the new ones (the last new_event_count of the list).
|
||
stop_idx = current_count - 1
|
||
if self.max_events is not None:
|
||
stop_idx = min(stop_idx, self.max_events - 1)
|
||
if self.max_events < current_count:
|
||
log.warning(
|
||
" max_events=%d cap: will download events 0–%d only "
|
||
"(unit has %d total)",
|
||
self.max_events, stop_idx, current_count,
|
||
)
|
||
|
||
try:
|
||
all_events = client.get_events(
|
||
full_waveform=True,
|
||
stop_after_index=stop_idx,
|
||
)
|
||
# Only the events beyond last_count are genuinely new
|
||
new_events = all_events[last_count:]
|
||
log.info(" [OK] Downloaded %d total event(s), %d new",
|
||
len(all_events), len(new_events))
|
||
|
||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
||
if last_count > 0 and len(all_events) > len(new_events):
|
||
log.info(" (skipped %d already-seen event(s))", last_count)
|
||
|
||
for i, ev in enumerate(new_events):
|
||
log.info(
|
||
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
|
||
last_count + i,
|
||
ev.timestamp.isoformat() if ev.timestamp else "?",
|
||
ev.peaks.transverse if ev.peaks else 0,
|
||
ev.peaks.vertical if ev.peaks else 0,
|
||
ev.peaks.longitudinal if ev.peaks else 0,
|
||
ev.peaks.vector_sum if ev.peaks else 0,
|
||
)
|
||
|
||
# Update high-water mark
|
||
state[unit_key] = {
|
||
"event_count": current_count,
|
||
"last_seen": datetime.datetime.now().isoformat(),
|
||
"serial": serial,
|
||
"peer": self.peer,
|
||
}
|
||
_save_state(self.state_path, state)
|
||
|
||
except Exception as exc:
|
||
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
|
||
|
||
finally:
|
||
raw_fh.close()
|
||
client.close() # closes transport / socket cleanly
|
||
root_logger.removeHandler(fh)
|
||
fh.close()
|
||
|
||
log.info("Session complete -> %s", session_dir)
|
||
log.info("="*60)
|
||
|
||
|
||
# ── JSON helpers ───────────────────────────────────────────────────────────────
|
||
|
||
def _save_json(path: Path, obj: object) -> None:
|
||
with open(path, "w") as f:
|
||
json.dump(obj, f, indent=2, default=str)
|
||
log.debug("Saved %s", path)
|
||
|
||
|
||
def _device_info_to_dict(d: DeviceInfo) -> dict:
|
||
return {
|
||
"serial": d.serial,
|
||
"firmware_version": d.firmware_version,
|
||
"calibration_date": str(d.calibration_date) if d.calibration_date else None,
|
||
"aux_trigger": d.aux_trigger,
|
||
"setup_name": d.setup_name,
|
||
"sample_rate": d.sample_rate,
|
||
"record_time": d.record_time,
|
||
"trigger_level_geo": d.trigger_level_geo,
|
||
"alarm_level_geo": d.alarm_level_geo,
|
||
"max_range_geo": d.max_range_geo,
|
||
"project": d.project,
|
||
"client": d.client,
|
||
"operator": d.operator,
|
||
"sensor_location": d.sensor_location,
|
||
}
|
||
|
||
|
||
def _event_to_dict(e: Event) -> dict:
|
||
peaks = {}
|
||
if e.peaks:
|
||
peaks = {
|
||
"transverse": e.peaks.transverse,
|
||
"vertical": e.peaks.vertical,
|
||
"longitudinal": e.peaks.longitudinal,
|
||
"vector_sum": e.peaks.vector_sum,
|
||
"mic": e.peaks.mic,
|
||
}
|
||
samples = {}
|
||
if e.raw_samples:
|
||
samples = {
|
||
ch: vals[:20] # first 20 sample-sets to keep the file sane
|
||
for ch, vals in e.raw_samples.items()
|
||
}
|
||
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
|
||
return {
|
||
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
|
||
"project": e.project,
|
||
"client": e.client,
|
||
"operator": e.operator,
|
||
"sensor_location": e.sensor_location,
|
||
"peaks": peaks,
|
||
"raw_samples_preview": samples,
|
||
}
|
||
|
||
|
||
# ── Main server loop ───────────────────────────────────────────────────────────
|
||
|
||
def serve(args: argparse.Namespace) -> None:
|
||
output_dir = Path(args.output)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
state_path = output_dir / "ach_state.json"
|
||
|
||
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
server_sock.bind(("0.0.0.0", args.port))
|
||
server_sock.listen(5)
|
||
|
||
max_ev = args.max_events
|
||
print(f"\n{'='*60}")
|
||
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
|
||
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
||
print(f" State file: {state_path}")
|
||
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
||
print(f"{'='*60}")
|
||
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
||
print(f" Remote Host: <this machine's LAN IP>")
|
||
print(f" Remote Port: {args.port}")
|
||
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
||
|
||
allow_ips = set(args.allow_ips)
|
||
if allow_ips:
|
||
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
|
||
else:
|
||
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
client_sock, addr = server_sock.accept()
|
||
peer_ip = addr[0]
|
||
peer = f"{addr[0]}:{addr[1]}"
|
||
|
||
if allow_ips and peer_ip not in allow_ips:
|
||
log.info("Rejected connection from %s (not in allowlist)", peer)
|
||
client_sock.close()
|
||
continue
|
||
|
||
log.info("Accepted connection from %s", peer)
|
||
session = AchSession(
|
||
sock=client_sock,
|
||
peer=peer,
|
||
output_dir=output_dir,
|
||
timeout=args.timeout,
|
||
events_only=args.events_only,
|
||
max_events=max_ev,
|
||
state_path=state_path,
|
||
)
|
||
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
||
t.start()
|
||
except KeyboardInterrupt:
|
||
raise
|
||
except Exception as exc:
|
||
log.error("Accept error: %s", exc)
|
||
finally:
|
||
server_sock.close()
|
||
print("\nServer stopped.")
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
p = argparse.ArgumentParser(
|
||
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
p.add_argument(
|
||
"--port", "-p",
|
||
type=int,
|
||
default=12345,
|
||
help="Port to listen on (default: 12345).",
|
||
)
|
||
p.add_argument(
|
||
"--output", "-o",
|
||
default=str(Path(__file__).parent / "captures"),
|
||
metavar="DIR",
|
||
help="Directory to write session captures (default: bridges/captures/).",
|
||
)
|
||
p.add_argument(
|
||
"--timeout", "-t",
|
||
type=float,
|
||
default=30.0,
|
||
help="Protocol receive timeout in seconds (default: 30.0).",
|
||
)
|
||
p.add_argument(
|
||
"--events-only",
|
||
action="store_true",
|
||
help="Skip the device-info step and go straight to event download.",
|
||
)
|
||
p.add_argument(
|
||
"--max-events",
|
||
type=int,
|
||
default=None,
|
||
metavar="N",
|
||
help=(
|
||
"Safety cap: download at most N events per session (default: unlimited). "
|
||
"Useful if a unit has many old events stored — prevents a very long first run."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--allow-ip",
|
||
metavar="IP",
|
||
action="append",
|
||
dest="allow_ips",
|
||
default=[],
|
||
help=(
|
||
"Only accept connections from this IP address (repeat for multiple). "
|
||
"Example: --allow-ip 63.43.212.232 "
|
||
"If not specified, all IPs are accepted (not recommended for public servers)."
|
||
),
|
||
)
|
||
p.add_argument(
|
||
"--verbose", "-v",
|
||
action="store_true",
|
||
help="Enable debug logging.",
|
||
)
|
||
return p.parse_args()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
args = parse_args()
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
||
)
|
||
try:
|
||
serve(args)
|
||
except KeyboardInterrupt:
|
||
print("\nStopped.")
|