ef2c38e7db
Add full decode pipeline for 0x2C partial records from the device's event list, representing continuous monitoring intervals where no threshold was crossed. These records appear interleaved with full triggered events in the browse walk and were previously ignored. minimateplus/models.py - Add MonitorLogEntry dataclass: key, start_time, stop_time, serial, geo_threshold_ips, raw_header, duration_seconds property minimateplus/protocol.py - read_waveform_header() now returns (data_rsp.data, length) — full payload including the record-type byte at position 0 — instead of the sliced header. Callers that need the old slice use raw_data[11:11+length] as before. minimateplus/client.py - Add _decode_0a_partial_header(): auto-detects 9-byte (sub_code=0x10) vs 10-byte (sub_code=0x03) timestamp format, handles 1-byte inter-timestamp gap, extracts serial via BE anchor and geo threshold via Geo: anchor. - Add get_monitor_log_entries(skip_keys=None): browse walk (1E → 0A → 1F), decodes partial records, skips full records and already-seen keys. minimateplus/__init__.py - Export MonitorLogEntry bridges/ach_server.py - After get_events(), call get_monitor_log_entries(skip_keys=seen_keys) and save new entries to monitor_log.json in the session directory. - Add _monitor_log_entry_to_dict() helper. - Include monitor log keys in downloaded_keys for state persistence. CLAUDE.md / CHANGELOG.md - Document 0x2C partial record layout (timestamp format, ASCII metadata region, 1-byte gap edge case) confirmed from 4-11-26 MITM capture. - Version bump to v0.10.0; update What's next. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
697 lines
30 KiB
Python
697 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
|
|
|
|
This IS your test server. Run it on any machine on the same network, point a
|
|
unit's ACEmanager call-home destination at it, and it will speak the full BW
|
|
protocol to the device: handshake, pull device info, download all events, save
|
|
everything as JSON.
|
|
|
|
The key thing this script tells you that no amount of packet sniffing can:
|
|
- Does the device speak first (push) or wait for us to send POLL (pull)?
|
|
|
|
If startup() completes normally → it's pull protocol, same as Blastware.
|
|
If startup() times out → the device sent something first; check raw_rx.bin.
|
|
|
|
Usage
|
|
-----
|
|
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
|
|
|
|
Setup
|
|
-----
|
|
1. Run this script on a machine on your local network.
|
|
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
|
|
find the Call Home / ACH settings. Set:
|
|
Remote Host: <this machine's LAN IP>
|
|
Remote Port: 12345
|
|
3. Trigger the unit (wait for a vibration event, or use the manual call-home
|
|
button if your firmware version has one).
|
|
4. The unit connects. This script handshakes, downloads all events,
|
|
and saves a timestamped session directory.
|
|
|
|
Output per session
|
|
------------------
|
|
bridges/captures/ach_inbound_<ts>/
|
|
device_info.json — serial number, firmware version, calibration date, etc.
|
|
events.json — all events: timestamp, PPV per channel, peaks, metadata
|
|
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
|
|
session_<ts>.log — detailed protocol log
|
|
|
|
What to look for
|
|
----------------
|
|
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
|
|
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
|
|
gets a clean response, it's pull.
|
|
|
|
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
|
|
bulk waveform data — if frequency is sent pre-computed there will be float32
|
|
values before the ADC sample blocks.
|
|
|
|
ACH-specific framing: Does the unit send anything extra before the DLE+STX
|
|
framing starts? raw_rx.bin will show raw bytes including any preamble.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import socket
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from minimateplus.transport import SocketTransport
|
|
from minimateplus.client import MiniMateClient
|
|
from minimateplus.models import DeviceInfo, Event, MonitorLogEntry
|
|
|
|
log = logging.getLogger("ach_server")
|
|
|
|
# ── Per-unit state (downloaded-key set) ───────────────────────────────────────
|
|
# Persisted as <output_dir>/ach_state.json
|
|
# Format:
|
|
# {
|
|
# "BE11529": {
|
|
# "downloaded_keys": ["01110000", "0111245a"], # hex keys already on disk
|
|
# "max_downloaded_key": "0111245a", # highest key ever seen
|
|
# "last_seen": "2026-04-11T01:04:36"
|
|
# }
|
|
# }
|
|
#
|
|
# Key-based deduplication works well within a single "key generation" (between
|
|
# erases). After the device memory is erased the event counter resets to
|
|
# 0x01110000, so the first new event has the SAME key as the very first event
|
|
# we ever downloaded. We detect this situation with max_downloaded_key:
|
|
#
|
|
# if max(current_device_keys) < max_downloaded_key
|
|
# → device was wiped and keys have restarted → treat all device keys as new
|
|
#
|
|
# After our own erase (--clear-after-download) we also explicitly clear
|
|
# downloaded_keys and max_downloaded_key so the next session starts fresh.
|
|
|
|
_state_lock = threading.Lock()
|
|
|
|
|
|
def _load_state(state_path: Path) -> dict:
|
|
if state_path.exists():
|
|
try:
|
|
with open(state_path) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_state(state_path: Path, state: dict) -> None:
|
|
with _state_lock:
|
|
with open(state_path, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
# ── Per-session handler ────────────────────────────────────────────────────────
|
|
|
|
class AchSession:
|
|
"""
|
|
Handles one inbound unit connection in its own thread.
|
|
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
|
|
standard connect → get_device_info → get_events sequence.
|
|
|
|
State tracking (ach_state.json in output_dir):
|
|
On each successful download we record the SET of event keys downloaded.
|
|
On the next call-home we compare: if all device keys are already in the
|
|
set, there's nothing new. If any key is new (including after the device
|
|
was wiped and re-recorded), we download and save only those events.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sock: socket.socket,
|
|
peer: str,
|
|
output_dir: Path,
|
|
timeout: float,
|
|
events_only: bool,
|
|
max_events: Optional[int],
|
|
state_path: Path,
|
|
clear_after_download: bool = False,
|
|
) -> None:
|
|
self.sock = sock
|
|
self.peer = peer
|
|
self.output_dir = output_dir
|
|
self.timeout = timeout
|
|
self.events_only = events_only
|
|
self.max_events = max_events
|
|
self.state_path = state_path
|
|
self.clear_after_download = clear_after_download
|
|
|
|
def run(self) -> None:
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Session dir and file handler are created lazily — only after startup
|
|
# succeeds. This prevents internet scanners and dropped connections from
|
|
# littering the output directory with empty session folders.
|
|
try:
|
|
self._run_inner(ts)
|
|
except Exception as exc:
|
|
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
|
|
finally:
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _run_inner(self, ts: str) -> None:
|
|
transport = SocketTransport(self.sock, peer=self.peer)
|
|
|
|
# Collect raw bytes in memory until startup succeeds, then flush to disk.
|
|
raw_buf: list[bytes] = []
|
|
_orig_read = transport.read
|
|
|
|
def tapped_read(n: int) -> bytes:
|
|
data = _orig_read(n)
|
|
if data:
|
|
raw_buf.append(data)
|
|
return data
|
|
|
|
transport.read = tapped_read # type: ignore[method-assign]
|
|
|
|
serial: Optional[str] = None
|
|
|
|
# ── Step 1: startup handshake ─────────────────────────────────────────
|
|
# Do this BEFORE creating the session directory so that scanner probes
|
|
# and dropped connections leave no trace on disk.
|
|
try:
|
|
from minimateplus.protocol import MiniMateProtocol
|
|
client = MiniMateClient(transport=transport, timeout=self.timeout)
|
|
client.open()
|
|
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
|
|
proto.startup()
|
|
except Exception as exc:
|
|
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
|
|
return # no session dir created
|
|
|
|
# Startup succeeded — this is a real unit. Create session dir now.
|
|
session_dir = self.output_dir / f"ach_inbound_{ts}"
|
|
session_dir.mkdir(parents=True, exist_ok=True)
|
|
log_path = session_dir / f"session_{ts}.log"
|
|
raw_path = session_dir / f"raw_rx_{ts}.bin"
|
|
|
|
# Flush buffered raw bytes to file and switch to direct file writes.
|
|
raw_fh = open(raw_path, "wb")
|
|
for chunk in raw_buf:
|
|
raw_fh.write(chunk)
|
|
raw_buf.clear()
|
|
|
|
def tapped_read_file(n: int) -> bytes:
|
|
data = _orig_read(n)
|
|
if data:
|
|
raw_fh.write(data)
|
|
raw_fh.flush()
|
|
return data
|
|
|
|
transport.read = tapped_read_file # type: ignore[method-assign]
|
|
|
|
# Wire up file handler now that the session dir exists.
|
|
fh = logging.FileHandler(log_path, encoding="utf-8")
|
|
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
|
|
root_logger = logging.getLogger()
|
|
root_logger.addHandler(fh)
|
|
|
|
try:
|
|
# ── Step 2: device info ───────────────────────────────────────────
|
|
device_info = None
|
|
if not self.events_only:
|
|
log.info("Step 2/3: reading device info")
|
|
try:
|
|
device_info = client.connect()
|
|
serial = device_info.serial
|
|
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
|
|
log.info(
|
|
" [OK] Device: serial=%s firmware=%s model=%s events=%d",
|
|
serial,
|
|
device_info.firmware_version,
|
|
device_info.model,
|
|
device_info.event_count or 0,
|
|
)
|
|
except Exception as exc:
|
|
log.error(" [FAIL] Device info failed: %s", exc)
|
|
else:
|
|
log.info("Step 2/3: skipping device info (--events-only)")
|
|
|
|
# ── Step 3: check for new events by comparing key sets ────────────
|
|
log.info("Step 3/3: checking for new events")
|
|
|
|
state = _load_state(self.state_path)
|
|
unit_key = serial or self.peer # fall back to IP if no serial
|
|
unit_state = state.get(unit_key, {})
|
|
seen_keys: set[str] = set(unit_state.get("downloaded_keys", []))
|
|
# Highest event key ever downloaded from this unit (hex string, 8 chars).
|
|
# Used to detect post-erase key reuse — see comment block above.
|
|
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
|
|
|
|
# Use the event count already read from the event index during connect().
|
|
# This is fast (no extra round-trips) and confirmed accurate (matches LCD).
|
|
# Falls back to count_events() only if connect() wasn't called.
|
|
if device_info is not None:
|
|
current_count = device_info.event_count or 0
|
|
else:
|
|
try:
|
|
current_count = client.count_events()
|
|
except Exception as exc:
|
|
log.error(" [FAIL] count_events failed: %s", exc)
|
|
return
|
|
|
|
log.info(" Unit has %d stored event(s); %d key(s) previously downloaded",
|
|
current_count, len(seen_keys))
|
|
|
|
if current_count == 0:
|
|
log.info(" [OK] No events on device -- nothing to download")
|
|
log.info("Session complete (no events) -> %s", session_dir)
|
|
return
|
|
|
|
# Fast pre-check: walk the event index (browse-mode, no 5A) to get
|
|
# the current key list, then bail early if everything is already seen.
|
|
# This avoids calling get_events() at all when there's nothing new.
|
|
log.info(" Checking device key list (browse walk, no waveform download)...")
|
|
try:
|
|
device_keys = client.list_event_keys()
|
|
except Exception as exc:
|
|
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
|
|
device_keys = None
|
|
|
|
if device_keys is not None:
|
|
# ── Post-erase detection ──────────────────────────────────────
|
|
# After the device memory is erased, new events start from key
|
|
# 01110000 again — the same keys we already downloaded. Detect
|
|
# this by comparing the device's current highest key against the
|
|
# historical maximum. If the device has rolled back below our
|
|
# high-water mark, its counter was reset and we must treat all
|
|
# its keys as new, regardless of what seen_keys contains.
|
|
if device_keys and max_seen_key != "00000000":
|
|
max_device_key = max(device_keys) # lexicographic; safe because
|
|
# keys share the same 4-char prefix
|
|
if max_device_key < max_seen_key:
|
|
log.info(
|
|
" Post-erase reset detected: "
|
|
"device max key %s < historical max %s "
|
|
"-- treating all device keys as new",
|
|
max_device_key, max_seen_key,
|
|
)
|
|
seen_keys = set() # discard stale dedup info for this session
|
|
|
|
new_key_set = set(device_keys) - seen_keys
|
|
log.info(" Device has %d key(s): %d new, %d already seen",
|
|
len(device_keys), len(new_key_set), len(device_keys) - len(new_key_set))
|
|
if not new_key_set:
|
|
log.info(" [OK] All events already downloaded -- nothing to do")
|
|
# Refresh state timestamp; preserve max_seen_key unchanged.
|
|
state[unit_key] = {
|
|
"downloaded_keys": sorted(seen_keys | set(device_keys)),
|
|
"max_downloaded_key": max_seen_key,
|
|
"last_seen": datetime.datetime.now().isoformat(),
|
|
"serial": serial,
|
|
"peer": self.peer,
|
|
}
|
|
_save_state(self.state_path, state)
|
|
log.info("Session complete (no new events) -> %s", session_dir)
|
|
return
|
|
else:
|
|
new_key_set = None # unknown; proceed with full download
|
|
|
|
# Apply max_events cap
|
|
stop_idx = current_count - 1
|
|
if self.max_events is not None:
|
|
stop_idx = min(stop_idx, self.max_events - 1)
|
|
if self.max_events < current_count:
|
|
log.warning(
|
|
" max_events=%d cap: will download events 0-%d only "
|
|
"(unit has %d total)",
|
|
self.max_events, stop_idx, current_count,
|
|
)
|
|
|
|
try:
|
|
all_events = client.get_events(
|
|
full_waveform=True,
|
|
stop_after_index=stop_idx,
|
|
skip_waveform_for_keys=seen_keys if seen_keys else None,
|
|
)
|
|
|
|
# Filter to events whose keys we haven't saved before.
|
|
new_events = [
|
|
e for e in all_events
|
|
if e._waveform_key is None
|
|
or e._waveform_key.hex() not in seen_keys
|
|
]
|
|
skipped = len(all_events) - len(new_events)
|
|
|
|
log.info(" [OK] Downloaded %d event(s): %d new, %d skipped (already seen)",
|
|
len(all_events), len(new_events), skipped)
|
|
if skipped:
|
|
log.info(" (skipped %d already-downloaded event(s))", skipped)
|
|
|
|
if new_events:
|
|
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
|
|
|
for ev in new_events:
|
|
pv = ev.peak_values
|
|
pi = ev.project_info
|
|
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
|
|
log.info(
|
|
" NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
|
|
key_hex,
|
|
str(ev.timestamp) if ev.timestamp else "?",
|
|
pv.tran if pv else 0,
|
|
pv.vert if pv else 0,
|
|
pv.long if pv else 0,
|
|
pv.peak_vector_sum if pv else 0,
|
|
pi.project if pi else "",
|
|
)
|
|
else:
|
|
log.info(" [OK] No new events since last call-home -- nothing to save")
|
|
|
|
# ── Monitor log entries (partial records / continuous monitoring) ──
|
|
# Browse walk (0A + 1F only) to collect monitor log entries for
|
|
# recording intervals where no threshold was crossed. This is a
|
|
# second 1E-based pass over the device's record list, separate from
|
|
# the get_events() download loop above.
|
|
log.info(" Collecting monitor log entries (browse walk)...")
|
|
new_monitor_entries: list[MonitorLogEntry] = []
|
|
try:
|
|
new_monitor_entries = client.get_monitor_log_entries(
|
|
skip_keys=seen_keys if seen_keys else None,
|
|
)
|
|
if new_monitor_entries:
|
|
_save_json(
|
|
session_dir / "monitor_log.json",
|
|
[_monitor_log_entry_to_dict(e) for e in new_monitor_entries],
|
|
)
|
|
log.info(
|
|
" [OK] %d new monitor log entry(s) saved",
|
|
len(new_monitor_entries),
|
|
)
|
|
for ml in new_monitor_entries:
|
|
log.info(
|
|
" MONLOG [%s] %s → %s (%s)",
|
|
ml.key,
|
|
ml.start_time.isoformat() if ml.start_time else "?",
|
|
ml.stop_time.isoformat() if ml.stop_time else "?",
|
|
f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s",
|
|
)
|
|
else:
|
|
log.info(" [OK] No new monitor log entries")
|
|
except Exception as exc:
|
|
log.warning(
|
|
" [WARN] Monitor log collection failed: %s -- continuing",
|
|
exc,
|
|
)
|
|
|
|
# ── Optional: erase device memory after successful download ────
|
|
erased_successfully = False
|
|
if self.clear_after_download and new_events:
|
|
log.info(" Clearing device memory (--clear-after-download)...")
|
|
try:
|
|
client.delete_all_events()
|
|
log.info(" [OK] Device memory cleared")
|
|
erased_successfully = True
|
|
except Exception as exc:
|
|
log.error(
|
|
" [WARN] Event deletion failed: %s -- events NOT cleared",
|
|
exc,
|
|
)
|
|
|
|
# ── Update persistent state ───────────────────────────────────
|
|
# Include both triggered-event keys and monitor-log keys in the
|
|
# downloaded set so they are not re-processed on the next call-home.
|
|
current_event_keys = [
|
|
e._waveform_key.hex()
|
|
for e in all_events
|
|
if e._waveform_key is not None
|
|
]
|
|
current_monitor_keys = [e.key for e in new_monitor_entries]
|
|
current_keys = current_event_keys + current_monitor_keys
|
|
|
|
if erased_successfully:
|
|
# Device memory is clear. Reset downloaded_keys and the
|
|
# high-water mark so the next call-home starts fresh and
|
|
# doesn't mis-identify the recycled key 01110000 as "seen".
|
|
updated_keys = []
|
|
new_max_key = "00000000"
|
|
log.info(
|
|
" State reset after erase -- next session will download "
|
|
"from key 0 (device counter resets after erase)"
|
|
)
|
|
else:
|
|
# Normal (no erase): union of previously-seen + all keys on
|
|
# device now. Includes already-seen survivors so we never
|
|
# re-download them if the device somehow keeps old records.
|
|
updated_keys = sorted(set(seen_keys) | set(current_keys))
|
|
new_max_key = updated_keys[-1] if updated_keys else max_seen_key
|
|
|
|
state[unit_key] = {
|
|
"downloaded_keys": updated_keys,
|
|
"max_downloaded_key": new_max_key,
|
|
"last_seen": datetime.datetime.now().isoformat(),
|
|
"serial": serial,
|
|
"peer": self.peer,
|
|
}
|
|
_save_state(self.state_path, state)
|
|
|
|
except Exception as exc:
|
|
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
|
|
|
|
finally:
|
|
raw_fh.close()
|
|
client.close() # closes transport / socket cleanly
|
|
root_logger.removeHandler(fh)
|
|
fh.close()
|
|
|
|
log.info("Session complete -> %s", session_dir)
|
|
log.info("="*60)
|
|
|
|
|
|
# ── JSON helpers ───────────────────────────────────────────────────────────────
|
|
|
|
def _save_json(path: Path, obj: object) -> None:
|
|
with open(path, "w") as f:
|
|
json.dump(obj, f, indent=2, default=str)
|
|
log.debug("Saved %s", path)
|
|
|
|
|
|
def _device_info_to_dict(d: DeviceInfo) -> dict:
|
|
cc = d.compliance_config
|
|
return {
|
|
"serial": d.serial,
|
|
"firmware_version": d.firmware_version,
|
|
"dsp_version": d.dsp_version,
|
|
"model": d.model,
|
|
"event_count": d.event_count,
|
|
# compliance config fields (None if 1A read failed)
|
|
"setup_name": cc.setup_name if cc else None,
|
|
"sample_rate": cc.sample_rate if cc else None,
|
|
"record_time": cc.record_time if cc else None,
|
|
"trigger_level_geo": cc.trigger_level_geo if cc else None,
|
|
"alarm_level_geo": cc.alarm_level_geo if cc else None,
|
|
"max_range_geo": cc.max_range_geo if cc else None,
|
|
"project": cc.project if cc else None,
|
|
"client": cc.client if cc else None,
|
|
"operator": cc.operator if cc else None,
|
|
"sensor_location": cc.sensor_location if cc else None,
|
|
}
|
|
|
|
|
|
def _event_to_dict(e: Event) -> dict:
|
|
pv = e.peak_values
|
|
pi = e.project_info
|
|
peaks = {}
|
|
if pv:
|
|
peaks = {
|
|
"transverse": pv.tran,
|
|
"vertical": pv.vert,
|
|
"longitudinal": pv.long,
|
|
"vector_sum": pv.peak_vector_sum,
|
|
"mic": pv.micl,
|
|
}
|
|
samples = {}
|
|
if e.raw_samples:
|
|
samples = {
|
|
ch: vals[:20] # first 20 sample-sets to keep the file sane
|
|
for ch, vals in e.raw_samples.items()
|
|
}
|
|
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
|
|
return {
|
|
"timestamp": str(e.timestamp) if e.timestamp else None,
|
|
"project": pi.project if pi else None,
|
|
"client": pi.client if pi else None,
|
|
"operator": pi.operator if pi else None,
|
|
"sensor_location": pi.sensor_location if pi else None,
|
|
"peaks": peaks,
|
|
"raw_samples_preview": samples,
|
|
}
|
|
|
|
|
|
def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict:
|
|
return {
|
|
"key": e.key,
|
|
"start_time": e.start_time.isoformat() if e.start_time else None,
|
|
"stop_time": e.stop_time.isoformat() if e.stop_time else None,
|
|
"duration_seconds": e.duration_seconds,
|
|
"serial": e.serial,
|
|
"geo_threshold_ips": e.geo_threshold_ips,
|
|
}
|
|
|
|
|
|
# ── Main server loop ───────────────────────────────────────────────────────────
|
|
|
|
def serve(args: argparse.Namespace) -> None:
|
|
output_dir = Path(args.output)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
state_path = output_dir / "ach_state.json"
|
|
|
|
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
server_sock.bind(("0.0.0.0", args.port))
|
|
server_sock.listen(5)
|
|
# Wake up every second so Ctrl-C is handled promptly on Windows.
|
|
# Without this, accept() blocks indefinitely and ignores KeyboardInterrupt.
|
|
server_sock.settimeout(1.0)
|
|
|
|
max_ev = args.max_events
|
|
print(f"\n{'='*60}")
|
|
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
|
|
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
|
print(f" State file: {state_path}")
|
|
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
|
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
|
|
print(f"{'='*60}")
|
|
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
|
print(f" Remote Host: <this machine's LAN IP>")
|
|
print(f" Remote Port: {args.port}")
|
|
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
|
|
|
allow_ips = set(args.allow_ips)
|
|
if allow_ips:
|
|
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
|
|
else:
|
|
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
client_sock, addr = server_sock.accept()
|
|
except socket.timeout:
|
|
continue # no connection this second; loop back and check for Ctrl-C
|
|
try:
|
|
peer_ip = addr[0]
|
|
peer = f"{addr[0]}:{addr[1]}"
|
|
|
|
if allow_ips and peer_ip not in allow_ips:
|
|
log.info("Rejected connection from %s (not in allowlist)", peer)
|
|
client_sock.close()
|
|
continue
|
|
|
|
log.info("Accepted connection from %s", peer)
|
|
session = AchSession(
|
|
sock=client_sock,
|
|
peer=peer,
|
|
output_dir=output_dir,
|
|
timeout=args.timeout,
|
|
events_only=args.events_only,
|
|
max_events=max_ev,
|
|
state_path=state_path,
|
|
clear_after_download=args.clear_after_download,
|
|
)
|
|
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
|
t.start()
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception as exc:
|
|
log.error("Accept error: %s", exc)
|
|
finally:
|
|
server_sock.close()
|
|
print("\nServer stopped.")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
p.add_argument(
|
|
"--port", "-p",
|
|
type=int,
|
|
default=12345,
|
|
help="Port to listen on (default: 12345).",
|
|
)
|
|
p.add_argument(
|
|
"--output", "-o",
|
|
default=str(Path(__file__).parent / "captures"),
|
|
metavar="DIR",
|
|
help="Directory to write session captures (default: bridges/captures/).",
|
|
)
|
|
p.add_argument(
|
|
"--timeout", "-t",
|
|
type=float,
|
|
default=30.0,
|
|
help="Protocol receive timeout in seconds (default: 30.0).",
|
|
)
|
|
p.add_argument(
|
|
"--events-only",
|
|
action="store_true",
|
|
help="Skip the device-info step and go straight to event download.",
|
|
)
|
|
p.add_argument(
|
|
"--max-events",
|
|
type=int,
|
|
default=None,
|
|
metavar="N",
|
|
help=(
|
|
"Safety cap: download at most N events per session (default: unlimited). "
|
|
"Useful if a unit has many old events stored — prevents a very long first run."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--allow-ip",
|
|
metavar="IP",
|
|
action="append",
|
|
dest="allow_ips",
|
|
default=[],
|
|
help=(
|
|
"Only accept connections from this IP address (repeat for multiple). "
|
|
"Example: --allow-ip 63.43.212.232 "
|
|
"If not specified, all IPs are accepted (not recommended for public servers)."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--clear-after-download",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"After successfully downloading new events, erase all events from the "
|
|
"device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from "
|
|
"4-11-26 MITM capture). Only fires when at least one new event was saved. "
|
|
"This mirrors the standard Blastware ACH workflow."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable debug logging.",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
|
)
|
|
try:
|
|
serve(args)
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|