905 lines
39 KiB
Python
905 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ach_server.py — Minimal inbound ACH (Auto Call Home) server for MiniMate Plus.
|
|
|
|
This IS your test server. Run it on any machine on the same network, point a
|
|
unit's ACEmanager call-home destination at it, and it will speak the full BW
|
|
protocol to the device: handshake, pull device info, download all events, save
|
|
everything as JSON.
|
|
|
|
The key thing this script tells you that no amount of packet sniffing can:
|
|
- Does the device speak first (push) or wait for us to send POLL (pull)?
|
|
|
|
If startup() completes normally → it's pull protocol, same as Blastware.
|
|
If startup() times out → the device sent something first; check raw_rx.bin.
|
|
|
|
Usage
|
|
-----
|
|
python bridges/ach_server.py [--port 12345] [--output bridges/captures/]
|
|
|
|
Setup
|
|
-----
|
|
1. Run this script on a machine on your local network.
|
|
2. In ACEmanager → Application → ALEOS Application Framework (or equivalent)
|
|
find the Call Home / ACH settings. Set:
|
|
Remote Host: <this machine's LAN IP>
|
|
Remote Port: 12345
|
|
3. Trigger the unit (wait for a vibration event, or use the manual call-home
|
|
button if your firmware version has one).
|
|
4. The unit connects. This script handshakes, downloads all events,
|
|
and saves a timestamped session directory.
|
|
|
|
Output per session
|
|
------------------
|
|
bridges/captures/ach_inbound_<ts>/
|
|
device_info.json — serial number, firmware version, calibration date, etc.
|
|
events.json — all events: timestamp, PPV per channel, peaks, metadata
|
|
raw_rx_<ts>.bin — raw bytes from the device (S3 side) for Analyzer
|
|
raw_tx_<ts>.bin — raw bytes we sent to the device (BW side) for Analyzer
|
|
session_<ts>.log — detailed protocol log
|
|
|
|
What to look for
|
|
----------------
|
|
Push vs pull: Check session_<ts>.log. If the first line after "Connected"
|
|
shows bytes arriving BEFORE the POLL probe was sent, it's push. If POLL
|
|
gets a clean response, it's pull.
|
|
|
|
Frequency: Look at raw_rx.bin in the Analyzer. SUB 5A (0xA5 responses) carry
|
|
bulk waveform data — if frequency is sent pre-computed there will be float32
|
|
values before the ADC sample blocks.
|
|
|
|
ACH-specific framing: Does the unit send anything extra before the DLE+STX
|
|
framing starts? raw_rx.bin will show raw bytes including any preamble.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import socket
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from minimateplus.transport import SocketTransport
|
|
from minimateplus.client import MiniMateClient
|
|
from minimateplus.models import DeviceInfo, Event, MonitorLogEntry
|
|
from sfm.database import SeismoDb
|
|
from sfm.waveform_store import WaveformStore
|
|
|
|
log = logging.getLogger("ach_server")
|
|
|
|
# ── Per-unit state (downloaded events index) ──────────────────────────────────
|
|
# Persisted as <output_dir>/ach_state.json
|
|
# Format (current — v2):
|
|
# {
|
|
# "BE11529": {
|
|
# "downloaded_events": { # key_hex → ISO timestamp string
|
|
# "01110000": "2026-04-11T00:42:17",
|
|
# "0111245a": "2026-04-11T01:04:30"
|
|
# },
|
|
# "max_downloaded_key": "0111245a",
|
|
# "last_seen": "2026-04-11T01:04:36",
|
|
# "serial": "BE11529",
|
|
# "peer": "63.43.212.232:51920"
|
|
# }
|
|
# }
|
|
#
|
|
# Why (key, timestamp) and not key alone:
|
|
# The device's event-key counter resets to 0x01110000 after every memory
|
|
# erase (internal or external). A bare-key dedup (the v1 format) cannot
|
|
# distinguish a re-recorded event with the same key from one we already
|
|
# downloaded. The 0C waveform record's timestamp IS unique per physical
|
|
# event, so we pair (key, timestamp) and treat a key with a different
|
|
# timestamp as a new event regardless of `max_downloaded_key`.
|
|
#
|
|
# Legacy v1 format (`downloaded_keys: list[str]` only) is auto-migrated on
|
|
# read: the keys are kept under a sentinel of "" (empty string) timestamp so
|
|
# the (key, timestamp) compare always sees a mismatch and forces a one-time
|
|
# re-download. After that pass the state is rewritten in v2 form.
|
|
|
|
_state_lock = threading.Lock()
|
|
|
|
|
|
def _load_state(state_path: Path) -> dict:
|
|
"""
|
|
Load ach_state.json, transparently migrating any legacy
|
|
`downloaded_keys: list` entries into the v2 `downloaded_events: dict`
|
|
schema. Returns the migrated state.
|
|
"""
|
|
if not state_path.exists():
|
|
return {}
|
|
try:
|
|
with open(state_path) as f:
|
|
state = json.load(f)
|
|
except Exception:
|
|
return {}
|
|
|
|
# Per-unit migration: legacy list → dict-with-empty-timestamps
|
|
for unit_key, unit_state in list(state.items()):
|
|
if not isinstance(unit_state, dict):
|
|
continue
|
|
if "downloaded_events" in unit_state:
|
|
continue
|
|
legacy_keys = unit_state.get("downloaded_keys")
|
|
if isinstance(legacy_keys, list):
|
|
unit_state["downloaded_events"] = {k: "" for k in legacy_keys}
|
|
log.info(
|
|
"ach_state: migrated %s from v1 (downloaded_keys list) → v2 "
|
|
"(downloaded_events dict, %d keys with empty timestamps; "
|
|
"they will re-validate on next session)",
|
|
unit_key, len(legacy_keys),
|
|
)
|
|
else:
|
|
unit_state["downloaded_events"] = {}
|
|
# keep legacy field for one cycle; cleared on next save
|
|
unit_state.pop("downloaded_keys", None)
|
|
|
|
return state
|
|
|
|
|
|
def _save_state(state_path: Path, state: dict) -> None:
|
|
with _state_lock:
|
|
with open(state_path, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
|
|
# ── Per-session handler ────────────────────────────────────────────────────────
|
|
|
|
class AchSession:
|
|
"""
|
|
Handles one inbound unit connection in its own thread.
|
|
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
|
|
standard connect → get_device_info → get_events sequence.
|
|
|
|
State tracking (ach_state.json in output_dir):
|
|
On each successful download we record the SET of event keys downloaded.
|
|
On the next call-home we compare: if all device keys are already in the
|
|
set, there's nothing new. If any key is new (including after the device
|
|
was wiped and re-recorded), we download and save only those events.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sock: socket.socket,
|
|
peer: str,
|
|
output_dir: Path,
|
|
timeout: float,
|
|
events_only: bool,
|
|
max_events: Optional[int],
|
|
state_path: Path,
|
|
db: "SeismoDb",
|
|
store: "WaveformStore",
|
|
clear_after_download: bool = False,
|
|
restart_monitoring: bool = False,
|
|
force_redownload: bool = False,
|
|
) -> None:
|
|
self.sock = sock
|
|
self.peer = peer
|
|
self.output_dir = output_dir
|
|
self.timeout = timeout
|
|
self.events_only = events_only
|
|
self.max_events = max_events
|
|
self.state_path = state_path
|
|
self.db = db
|
|
self.store = store
|
|
self.clear_after_download = clear_after_download
|
|
self.restart_monitoring = restart_monitoring
|
|
# `force_redownload` tells this session to ignore ach_state and
|
|
# re-download every event currently on the device, regardless of any
|
|
# (key, timestamp) match. Useful as a manual override when state has
|
|
# become inconsistent with what's actually on disk / in the DB.
|
|
self.force_redownload = force_redownload
|
|
|
|
def run(self) -> None:
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Session dir and file handler are created lazily — only after startup
|
|
# succeeds. This prevents internet scanners and dropped connections from
|
|
# littering the output directory with empty session folders.
|
|
try:
|
|
self._run_inner(ts)
|
|
except Exception as exc:
|
|
log.error("Session failed (%s): %s", self.peer, exc, exc_info=True)
|
|
finally:
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _run_inner(self, ts: str) -> None:
|
|
transport = SocketTransport(self.sock, peer=self.peer)
|
|
|
|
# Collect raw bytes in memory until startup succeeds, then flush to disk.
|
|
raw_rx_buf: list[bytes] = [] # device → us (S3 side)
|
|
raw_tx_buf: list[bytes] = [] # us → device (BW side)
|
|
_orig_read = transport.read
|
|
_orig_write = transport.write
|
|
|
|
def tapped_read(n: int) -> bytes:
|
|
data = _orig_read(n)
|
|
if data:
|
|
raw_rx_buf.append(data)
|
|
return data
|
|
|
|
def tapped_write(data: bytes) -> None:
|
|
_orig_write(data)
|
|
if data:
|
|
raw_tx_buf.append(data)
|
|
|
|
transport.read = tapped_read # type: ignore[method-assign]
|
|
transport.write = tapped_write # type: ignore[method-assign]
|
|
|
|
serial: Optional[str] = None
|
|
|
|
# ── Step 1: startup handshake ─────────────────────────────────────────
|
|
# Do this BEFORE creating the session directory so that scanner probes
|
|
# and dropped connections leave no trace on disk.
|
|
try:
|
|
from minimateplus.protocol import MiniMateProtocol
|
|
client = MiniMateClient(transport=transport, timeout=self.timeout)
|
|
client.open()
|
|
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
|
|
proto.startup()
|
|
except Exception as exc:
|
|
log.warning("Startup failed from %s: %s -- ignoring", self.peer, exc)
|
|
return # no session dir created
|
|
|
|
# Startup succeeded — this is a real unit. Create session dir now.
|
|
session_dir = self.output_dir / f"ach_inbound_{ts}"
|
|
session_dir.mkdir(parents=True, exist_ok=True)
|
|
log_path = session_dir / f"session_{ts}.log"
|
|
raw_rx_path = session_dir / f"raw_rx_{ts}.bin" # device → us (S3 side)
|
|
raw_tx_path = session_dir / f"raw_tx_{ts}.bin" # us → device (BW side)
|
|
|
|
# Flush buffered bytes to files and switch to direct file writes.
|
|
raw_rx_fh = open(raw_rx_path, "wb")
|
|
raw_tx_fh = open(raw_tx_path, "wb")
|
|
for chunk in raw_rx_buf:
|
|
raw_rx_fh.write(chunk)
|
|
for chunk in raw_tx_buf:
|
|
raw_tx_fh.write(chunk)
|
|
raw_rx_buf.clear()
|
|
raw_tx_buf.clear()
|
|
|
|
def tapped_read_file(n: int) -> bytes:
|
|
data = _orig_read(n)
|
|
if data:
|
|
raw_rx_fh.write(data)
|
|
raw_rx_fh.flush()
|
|
return data
|
|
|
|
def tapped_write_file(data: bytes) -> None:
|
|
_orig_write(data)
|
|
if data:
|
|
raw_tx_fh.write(data)
|
|
raw_tx_fh.flush()
|
|
|
|
transport.read = tapped_read_file # type: ignore[method-assign]
|
|
transport.write = tapped_write_file # type: ignore[method-assign]
|
|
|
|
# Wire up file handler now that the session dir exists.
|
|
fh = logging.FileHandler(log_path, encoding="utf-8")
|
|
fh.setFormatter(logging.Formatter("%(asctime)s %(levelname)-7s %(name)s %(message)s"))
|
|
root_logger = logging.getLogger()
|
|
root_logger.addHandler(fh)
|
|
|
|
try:
|
|
# ── Step 2: device info ───────────────────────────────────────────
|
|
device_info = None
|
|
if not self.events_only:
|
|
log.info("Step 2/3: reading device info")
|
|
try:
|
|
device_info = client.connect()
|
|
serial = device_info.serial
|
|
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
|
|
log.info(
|
|
" [OK] Device: serial=%s firmware=%s model=%s events=%d",
|
|
serial,
|
|
device_info.firmware_version,
|
|
device_info.model,
|
|
device_info.event_count or 0,
|
|
)
|
|
except Exception as exc:
|
|
log.error(" [FAIL] Device info failed: %s", exc)
|
|
else:
|
|
log.info("Step 2/3: skipping device info (--events-only)")
|
|
|
|
# ── Step 3: check for new events by comparing key sets ────────────
|
|
log.info("Step 3/3: checking for new events")
|
|
|
|
state = _load_state(self.state_path)
|
|
unit_key = serial or self.peer # fall back to IP if no serial
|
|
unit_state = state.get(unit_key, {})
|
|
|
|
# downloaded_events is the v2 (key_hex → timestamp_iso) dict.
|
|
# Empty-string timestamps are migrated v1 entries — they force a
|
|
# one-time re-download because the (key, timestamp) compare always
|
|
# mismatches against any non-empty timestamp from a fresh 0C read.
|
|
seen_events: dict[str, str] = dict(unit_state.get("downloaded_events", {}))
|
|
max_seen_key: str = unit_state.get("max_downloaded_key", "00000000")
|
|
|
|
if self.force_redownload:
|
|
log.info(" --force-redownload-all set — ignoring %d cached "
|
|
"(key, timestamp) entries for this session",
|
|
len(seen_events))
|
|
seen_events = {}
|
|
|
|
# Walk the event index (browse-mode, no 5A) to get the actual current
|
|
# key list. The SUB 08 event_count field is a lifetime "total events
|
|
# ever recorded" counter that does NOT decrement on erase — confirmed
|
|
# 2026-04-13. list_event_keys() via the 1E/1F chain is the only
|
|
# reliable way to know what is actually stored on the device right now.
|
|
log.info(" Checking device key list (browse walk, no waveform download)...")
|
|
try:
|
|
device_keys = client.list_event_keys()
|
|
except Exception as exc:
|
|
log.warning(" list_event_keys failed: %s -- falling back to full download", exc)
|
|
device_keys = None
|
|
|
|
current_count = len(device_keys) if device_keys is not None else 0
|
|
|
|
log.info(" Unit has %d stored event(s); %d (key, ts) entr(ies) previously downloaded",
|
|
current_count, len(seen_events))
|
|
|
|
if device_keys is not None and current_count == 0:
|
|
log.info(" [OK] No events on device -- nothing to download")
|
|
log.info("Session complete (no events) -> %s", session_dir)
|
|
return
|
|
|
|
if device_keys is not None:
|
|
# ── Post-erase detection (best-effort, key-only signal) ───────
|
|
# After erase the device's key counter resets to 01110000.
|
|
# If the device's current max key is below our high-water mark
|
|
# we know erase happened. This catches the cleanest case but
|
|
# does NOT catch erase-then-record-many-events (where the new
|
|
# max may climb past the old max). The (key, timestamp) check
|
|
# in get_events() is what handles those.
|
|
if device_keys and max_seen_key != "00000000":
|
|
max_device_key = max(device_keys)
|
|
if max_device_key < max_seen_key:
|
|
log.info(
|
|
" Post-erase reset detected: "
|
|
"device max key %s < historical max %s "
|
|
"-- discarding stale (key, ts) state for this session",
|
|
max_device_key, max_seen_key,
|
|
)
|
|
seen_events = {}
|
|
|
|
# Note: no early-exit "all already downloaded" short-circuit
|
|
# here. Without per-event timestamps we cannot tell whether
|
|
# device_keys ⊆ seen_events.keys() actually means we have
|
|
# those physical events. get_events() will read 0C on its
|
|
# skip path and decide per event.
|
|
|
|
# Apply max_events cap
|
|
# stop_idx: when we know the count from list_event_keys, use it as
|
|
# an upper bound. When list_event_keys failed (device_keys is None),
|
|
# pass None — get_events will run until the null sentinel naturally.
|
|
stop_idx: Optional[int] = (current_count - 1) if device_keys is not None else None
|
|
if self.max_events is not None:
|
|
cap = self.max_events - 1
|
|
stop_idx = cap if stop_idx is None else min(stop_idx, cap)
|
|
if device_keys is not None and self.max_events < current_count:
|
|
log.warning(
|
|
" max_events=%d cap: will download events 0-%d only "
|
|
"(unit has %d total)",
|
|
self.max_events, stop_idx, current_count,
|
|
)
|
|
|
|
try:
|
|
# Pass `seen_events` (key → ISO timestamp) so the client can
|
|
# read 0C on its skip path and only skip 5A when the per-event
|
|
# timestamp matches what we already have on disk. When force_-
|
|
# redownload is set, seen_events was already cleared above.
|
|
#
|
|
# Filter out empty-string timestamps (legacy v1 entries) — the
|
|
# client's 0C-on-skip-path only trusts entries with a
|
|
# populated timestamp; otherwise it falls through to a full
|
|
# 5A download.
|
|
skip_dict = {k: ts for k, ts in seen_events.items() if ts}
|
|
|
|
all_events = client.get_events(
|
|
full_waveform=True,
|
|
stop_after_index=stop_idx,
|
|
skip_waveform_for_events=skip_dict if skip_dict else None,
|
|
)
|
|
|
|
# New events are those that came back with _a5_frames populated
|
|
# (= 5A actually ran on this session). Skipped events have
|
|
# _a5_frames = None because the client matched (key, timestamp)
|
|
# against skip_dict and bypassed 5A.
|
|
new_events = [
|
|
e for e in all_events
|
|
if getattr(e, "_a5_frames", None)
|
|
]
|
|
skipped = len(all_events) - len(new_events)
|
|
|
|
log.info(" [OK] Walked %d event(s): %d downloaded, %d skipped (matched (key, ts) in state)",
|
|
len(all_events), len(new_events), skipped)
|
|
|
|
# ── Persist event file + A5 sidecar to the waveform store ──
|
|
# Saves ride alongside the existing JSON dump so the on-disk
|
|
# event file and events.json reference the same set of events.
|
|
waveform_records: dict[str, dict] = {}
|
|
for ev in new_events:
|
|
if not ev._a5_frames:
|
|
continue
|
|
try:
|
|
rec = self.store.save(
|
|
ev,
|
|
serial=serial or "UNKNOWN",
|
|
a5_frames=ev._a5_frames,
|
|
)
|
|
if ev._waveform_key is not None:
|
|
waveform_records[ev._waveform_key.hex()] = rec
|
|
log.info(
|
|
" [WAVE] saved %s (%d bytes)",
|
|
rec["filename"], rec["filesize"],
|
|
)
|
|
except Exception as exc:
|
|
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
|
|
log.warning(
|
|
" [WARN] Waveform store save failed for %s: %s",
|
|
key_hex, exc,
|
|
)
|
|
|
|
if new_events:
|
|
_save_json(
|
|
session_dir / "events.json",
|
|
[_event_to_dict(e, waveform_records) for e in new_events],
|
|
)
|
|
|
|
for ev in new_events:
|
|
pv = ev.peak_values
|
|
pi = ev.project_info
|
|
key_hex = ev._waveform_key.hex() if ev._waveform_key else "????????"
|
|
log.info(
|
|
" NEW [%s] %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f project=%r",
|
|
key_hex,
|
|
str(ev.timestamp) if ev.timestamp else "?",
|
|
pv.tran if pv else 0,
|
|
pv.vert if pv else 0,
|
|
pv.long if pv else 0,
|
|
pv.peak_vector_sum if pv else 0,
|
|
pi.project if pi else "",
|
|
)
|
|
else:
|
|
log.info(" [OK] No new events since last call-home -- nothing to save")
|
|
|
|
# ── Monitor log entries (partial records / continuous monitoring) ──
|
|
# Browse walk (0A + 1F only) to collect monitor log entries for
|
|
# recording intervals where no threshold was crossed. This is a
|
|
# second 1E-based pass over the device's record list, separate from
|
|
# the get_events() download loop above.
|
|
log.info(" Collecting monitor log entries (browse walk)...")
|
|
new_monitor_entries: list[MonitorLogEntry] = []
|
|
try:
|
|
new_monitor_entries = client.get_monitor_log_entries(
|
|
skip_keys=seen_keys if seen_keys else None,
|
|
)
|
|
if new_monitor_entries:
|
|
_save_json(
|
|
session_dir / "monitor_log.json",
|
|
[_monitor_log_entry_to_dict(e) for e in new_monitor_entries],
|
|
)
|
|
log.info(
|
|
" [OK] %d new monitor log entry(s) saved",
|
|
len(new_monitor_entries),
|
|
)
|
|
for ml in new_monitor_entries:
|
|
log.info(
|
|
" MONLOG [%s] %s → %s (%s)",
|
|
ml.key,
|
|
ml.start_time.isoformat() if ml.start_time else "?",
|
|
ml.stop_time.isoformat() if ml.stop_time else "?",
|
|
f"{ml.duration_seconds:.0f}s" if ml.duration_seconds is not None else "?s",
|
|
)
|
|
else:
|
|
log.info(" [OK] No new monitor log entries")
|
|
except Exception as exc:
|
|
log.warning(
|
|
" [WARN] Monitor log collection failed: %s -- continuing",
|
|
exc,
|
|
)
|
|
|
|
# ── Persist to SQLite DB ─────────────────────────────────────
|
|
_session_start = datetime.datetime.now()
|
|
try:
|
|
_ev_ins, _ev_skip = self.db.insert_events(
|
|
new_events,
|
|
serial=serial or self.peer,
|
|
session_id=None,
|
|
waveform_records=waveform_records,
|
|
device_family="series3",
|
|
)
|
|
_ml_ins, _ml_skip = self.db.insert_monitor_log(
|
|
new_monitor_entries, session_id=None
|
|
)
|
|
_session_id = self.db.insert_ach_session(
|
|
serial=serial or self.peer,
|
|
peer=self.peer,
|
|
events_downloaded=_ev_ins,
|
|
monitor_entries=_ml_ins,
|
|
duration_seconds=(datetime.datetime.now() - _session_start).total_seconds(),
|
|
session_time=_session_start,
|
|
)
|
|
log.info(
|
|
" [DB] session=%s events +%d (skip %d) monitor +%d (skip %d)",
|
|
_session_id[:8], _ev_ins, _ev_skip, _ml_ins, _ml_skip,
|
|
)
|
|
except Exception as exc:
|
|
log.warning(" [WARN] DB write failed: %s -- continuing", exc)
|
|
|
|
# ── Optional: erase device memory after successful download ────
|
|
erased_successfully = False
|
|
if self.clear_after_download and new_events:
|
|
log.info(" Clearing device memory (--clear-after-download)...")
|
|
try:
|
|
client.delete_all_events()
|
|
log.info(" [OK] Device memory cleared")
|
|
erased_successfully = True
|
|
except Exception as exc:
|
|
log.error(
|
|
" [WARN] Event deletion failed: %s -- events NOT cleared",
|
|
exc,
|
|
)
|
|
|
|
# ── Update persistent state ───────────────────────────────────
|
|
# Build a fresh (key → ISO timestamp) map from THIS session's
|
|
# results. For each event currently on the device, prefer the
|
|
# timestamp we just observed (from 0C); fall back to whatever
|
|
# was already in seen_events for that key (so we don't lose an
|
|
# entry just because get_events skipped it on the (key, ts)
|
|
# match path).
|
|
def _ts_iso(ev) -> str:
|
|
ts = getattr(ev, "timestamp", None)
|
|
if ts is None:
|
|
return ""
|
|
try:
|
|
return datetime.datetime(
|
|
ts.year, ts.month, ts.day,
|
|
ts.hour or 0, ts.minute or 0, ts.second or 0,
|
|
).isoformat()
|
|
except Exception:
|
|
return str(ts)
|
|
|
|
current_events_map: dict[str, str] = {}
|
|
for ev in all_events:
|
|
if ev._waveform_key is None:
|
|
continue
|
|
key_hex = ev._waveform_key.hex()
|
|
ts_iso = _ts_iso(ev) or seen_events.get(key_hex, "")
|
|
current_events_map[key_hex] = ts_iso
|
|
|
|
# Monitor-log entries don't have a 0C-style timestamp, but
|
|
# they DO have a start_time; use that so the monitor-log keys
|
|
# are properly entered into the (key, ts) map.
|
|
for ml in new_monitor_entries:
|
|
key_hex = ml.key
|
|
ts = ml.start_time
|
|
ts_iso = ts.isoformat() if ts else seen_events.get(key_hex, "")
|
|
# If a triggered event already populated this key, keep
|
|
# whichever has a non-empty timestamp.
|
|
if key_hex not in current_events_map or not current_events_map[key_hex]:
|
|
current_events_map[key_hex] = ts_iso
|
|
|
|
if erased_successfully:
|
|
updated_events: dict[str, str] = {}
|
|
new_max_key = "00000000"
|
|
log.info(
|
|
" State reset after erase -- next session will download "
|
|
"from key 0 (device counter resets after erase)"
|
|
)
|
|
else:
|
|
# Merge: keep prior (key, ts) entries we still have evidence
|
|
# of (for survivors of any partial failure), plus this
|
|
# session's authoritative (key, ts) pairs.
|
|
updated_events = dict(seen_events)
|
|
updated_events.update(current_events_map)
|
|
new_max_key = (
|
|
max(updated_events.keys())
|
|
if updated_events else max_seen_key
|
|
)
|
|
|
|
state[unit_key] = {
|
|
"downloaded_events": updated_events,
|
|
"max_downloaded_key": new_max_key,
|
|
"last_seen": datetime.datetime.now().isoformat(),
|
|
"serial": serial,
|
|
"peer": self.peer,
|
|
}
|
|
_save_state(self.state_path, state)
|
|
|
|
except Exception as exc:
|
|
log.error(" [FAIL] Event download failed: %s", exc, exc_info=True)
|
|
|
|
# ── Optional: restart monitoring after successful download ─────────
|
|
if self.restart_monitoring:
|
|
log.info(" Restarting monitoring on device (--restart-monitoring)...")
|
|
try:
|
|
client.start_monitoring()
|
|
log.info(" [OK] Monitoring restarted")
|
|
except Exception as exc:
|
|
log.warning(" [WARN] Failed to restart monitoring: %s", exc)
|
|
|
|
finally:
|
|
raw_rx_fh.close()
|
|
raw_tx_fh.close()
|
|
client.close() # closes transport / socket cleanly
|
|
root_logger.removeHandler(fh)
|
|
fh.close()
|
|
|
|
log.info("Session complete -> %s", session_dir)
|
|
log.info("="*60)
|
|
|
|
|
|
# ── JSON helpers ───────────────────────────────────────────────────────────────
|
|
|
|
def _save_json(path: Path, obj: object) -> None:
|
|
with open(path, "w") as f:
|
|
json.dump(obj, f, indent=2, default=str)
|
|
log.debug("Saved %s", path)
|
|
|
|
|
|
def _device_info_to_dict(d: DeviceInfo) -> dict:
|
|
cc = d.compliance_config
|
|
return {
|
|
"serial": d.serial,
|
|
"firmware_version": d.firmware_version,
|
|
"dsp_version": d.dsp_version,
|
|
"model": d.model,
|
|
"event_count": d.event_count,
|
|
# compliance config fields (None if 1A read failed)
|
|
"setup_name": cc.setup_name if cc else None,
|
|
"sample_rate": cc.sample_rate if cc else None,
|
|
"record_time": cc.record_time if cc else None,
|
|
"trigger_level_geo": cc.trigger_level_geo if cc else None,
|
|
"alarm_level_geo": cc.alarm_level_geo if cc else None,
|
|
"geo_adc_scale": cc.geo_adc_scale if cc else None, # hw scale factor (in/s)/V
|
|
"geo_range": cc.geo_range if cc else None, # 0x01=Normal 10in/s, 0x00=Sensitive 1.25in/s (unconfirmed)
|
|
"project": cc.project if cc else None,
|
|
"client": cc.client if cc else None,
|
|
"operator": cc.operator if cc else None,
|
|
"sensor_location": cc.sensor_location if cc else None,
|
|
}
|
|
|
|
|
|
def _event_to_dict(
|
|
e: Event,
|
|
waveform_records: Optional[dict[str, dict]] = None,
|
|
) -> dict:
|
|
pv = e.peak_values
|
|
pi = e.project_info
|
|
peaks = {}
|
|
if pv:
|
|
peaks = {
|
|
"transverse": pv.tran,
|
|
"vertical": pv.vert,
|
|
"longitudinal": pv.long,
|
|
"vector_sum": pv.peak_vector_sum,
|
|
"mic": pv.micl,
|
|
}
|
|
samples = {}
|
|
if e.raw_samples:
|
|
samples = {
|
|
ch: vals[:20] # first 20 sample-sets to keep the file sane
|
|
for ch, vals in e.raw_samples.items()
|
|
}
|
|
samples["__note__"] = "first 20 sample-sets only; see raw_rx.bin for full waveform"
|
|
|
|
rec: dict = {}
|
|
if waveform_records and e._waveform_key is not None:
|
|
rec = waveform_records.get(e._waveform_key.hex(), {}) or {}
|
|
|
|
return {
|
|
"timestamp": str(e.timestamp) if e.timestamp else None,
|
|
"project": pi.project if pi else None,
|
|
"client": pi.client if pi else None,
|
|
"operator": pi.operator if pi else None,
|
|
"sensor_location": pi.sensor_location if pi else None,
|
|
"peaks": peaks,
|
|
"raw_samples_preview": samples,
|
|
"blastware_filename": rec.get("filename"),
|
|
"blastware_filesize": rec.get("filesize"),
|
|
"a5_pickle_filename": rec.get("a5_pickle_filename"),
|
|
}
|
|
|
|
|
|
def _monitor_log_entry_to_dict(e: MonitorLogEntry) -> dict:
|
|
return {
|
|
"key": e.key,
|
|
"start_time": e.start_time.isoformat() if e.start_time else None,
|
|
"stop_time": e.stop_time.isoformat() if e.stop_time else None,
|
|
"duration_seconds": e.duration_seconds,
|
|
"serial": e.serial,
|
|
"geo_threshold_ips": e.geo_threshold_ips,
|
|
}
|
|
|
|
|
|
# ── Main server loop ───────────────────────────────────────────────────────────
|
|
|
|
def serve(args: argparse.Namespace) -> None:
|
|
output_dir = Path(args.output)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
state_path = output_dir / "ach_state.json"
|
|
db = SeismoDb(output_dir / "seismo_relay.db")
|
|
store = WaveformStore(output_dir / "waveforms")
|
|
|
|
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
server_sock.bind(("0.0.0.0", args.port))
|
|
server_sock.listen(5)
|
|
# Wake up every second so Ctrl-C is handled promptly on Windows.
|
|
# Without this, accept() blocks indefinitely and ignores KeyboardInterrupt.
|
|
server_sock.settimeout(1.0)
|
|
|
|
max_ev = args.max_events
|
|
print(f"\n{'='*60}")
|
|
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
|
|
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
|
print(f" State file: {state_path}")
|
|
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
|
print(f" Clear device after download: {'YES' if args.clear_after_download else 'no'}")
|
|
print(f" Restart monitoring after download: {'YES' if args.restart_monitoring else 'no'}")
|
|
print(f" Force re-download all (ignore state): {'YES' if args.force_redownload_all else 'no'}")
|
|
print(f"{'='*60}")
|
|
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
|
print(f" Remote Host: <this machine's LAN IP>")
|
|
print(f" Remote Port: {args.port}")
|
|
print(f"\n Waiting for inbound connections... (Ctrl-C to stop)\n")
|
|
|
|
allow_ips = set(args.allow_ips)
|
|
if allow_ips:
|
|
print(f" Allowlist: {', '.join(sorted(allow_ips))}")
|
|
else:
|
|
print(" Allowlist: NONE -- accepting all IPs (add --allow-ip to restrict)")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
client_sock, addr = server_sock.accept()
|
|
except socket.timeout:
|
|
continue # no connection this second; loop back and check for Ctrl-C
|
|
try:
|
|
peer_ip = addr[0]
|
|
peer = f"{addr[0]}:{addr[1]}"
|
|
|
|
if allow_ips and peer_ip not in allow_ips:
|
|
log.info("Rejected connection from %s (not in allowlist)", peer)
|
|
client_sock.close()
|
|
continue
|
|
|
|
log.info("Accepted connection from %s", peer)
|
|
session = AchSession(
|
|
sock=client_sock,
|
|
peer=peer,
|
|
output_dir=output_dir,
|
|
timeout=args.timeout,
|
|
events_only=args.events_only,
|
|
max_events=max_ev,
|
|
state_path=state_path,
|
|
db=db,
|
|
store=store,
|
|
clear_after_download=args.clear_after_download,
|
|
restart_monitoring=args.restart_monitoring,
|
|
force_redownload=args.force_redownload_all,
|
|
)
|
|
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
|
t.start()
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception as exc:
|
|
log.error("Accept error: %s", exc)
|
|
finally:
|
|
server_sock.close()
|
|
print("\nServer stopped.")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description="Minimal inbound ACH server — speak BW protocol to calling MiniMate Plus units.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
p.add_argument(
|
|
"--port", "-p",
|
|
type=int,
|
|
default=12345,
|
|
help="Port to listen on (default: 12345).",
|
|
)
|
|
p.add_argument(
|
|
"--output", "-o",
|
|
default=str(Path(__file__).parent / "captures"),
|
|
metavar="DIR",
|
|
help="Directory to write session captures (default: bridges/captures/).",
|
|
)
|
|
p.add_argument(
|
|
"--timeout", "-t",
|
|
type=float,
|
|
default=30.0,
|
|
help="Protocol receive timeout in seconds (default: 30.0).",
|
|
)
|
|
p.add_argument(
|
|
"--events-only",
|
|
action="store_true",
|
|
help="Skip the device-info step and go straight to event download.",
|
|
)
|
|
p.add_argument(
|
|
"--max-events",
|
|
type=int,
|
|
default=None,
|
|
metavar="N",
|
|
help=(
|
|
"Safety cap: download at most N events per session (default: unlimited). "
|
|
"Useful if a unit has many old events stored — prevents a very long first run."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--allow-ip",
|
|
metavar="IP",
|
|
action="append",
|
|
dest="allow_ips",
|
|
default=[],
|
|
help=(
|
|
"Only accept connections from this IP address (repeat for multiple). "
|
|
"Example: --allow-ip 63.43.212.232 "
|
|
"If not specified, all IPs are accepted (not recommended for public servers)."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--restart-monitoring",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"After downloading events, send SUB 0x96 (start monitoring) before "
|
|
"disconnecting. Required for RV55 units whose firmware does not assert "
|
|
"DCD on disconnect — without this the unit stays idle after a call-home."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--clear-after-download",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"After successfully downloading new events, erase all events from the "
|
|
"device memory (SUB 0xA3 → 0x1C → 0x06 → 0xA2 sequence, confirmed from "
|
|
"4-11-26 MITM capture). Only fires when at least one new event was saved. "
|
|
"This mirrors the standard Blastware ACH workflow."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--force-redownload-all",
|
|
action="store_true",
|
|
default=False,
|
|
help=(
|
|
"Manual override: ignore ach_state.json's downloaded_events map "
|
|
"for this session and re-download every event currently on the "
|
|
"device, regardless of (key, timestamp) match. Useful when state "
|
|
"has become inconsistent with the on-disk waveform store / DB."
|
|
),
|
|
)
|
|
p.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Enable debug logging.",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
|
|
)
|
|
try:
|
|
serve(args)
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|