feat: add high-water mark state tracking to ach_server + fix monitoring flag
ach_server.py: - Add ach_state.json per-unit state tracking (keyed by serial number) - count_events() before any download; skip session if no new events since last call-home - Download only events beyond the previous high-water mark (all_events[last_count:]) - --max-events N safety cap for first-run units with many stored events - state_path and max_events wired through AchSession constructor and serve() client.py (_decode_monitor_status): - Revert monitoring flag to section[1] == 0x10 (was incorrectly changed to section[6]) - Fix battery/memory offsets to section[-10:-8], [-8:-4], [-4:] (no trailing checksum byte) - Both confirmed by full byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+135
-37
@@ -71,6 +71,28 @@ from minimateplus.models import DeviceInfo, Event
|
||||
|
||||
log = logging.getLogger("ach_server")
|
||||
|
||||
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
|
||||
# Persisted as <output_dir>/ach_state.json
|
||||
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
|
||||
|
||||
_state_lock = threading.Lock()
|
||||
|
||||
|
||||
def _load_state(state_path: Path) -> dict:
|
||||
if state_path.exists():
|
||||
try:
|
||||
with open(state_path) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def _save_state(state_path: Path, state: dict) -> None:
|
||||
with _state_lock:
|
||||
with open(state_path, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
# ── Per-session handler ────────────────────────────────────────────────────────
|
||||
|
||||
@@ -79,6 +101,12 @@ class AchSession:
|
||||
Handles one inbound unit connection in its own thread.
|
||||
Wraps the socket in a SocketTransport → MiniMateClient, then runs the
|
||||
standard connect → get_device_info → get_events sequence.
|
||||
|
||||
State tracking (ach_state.json in output_dir):
|
||||
On each successful download we record how many events the unit had.
|
||||
On the next call-home we compare: if count hasn't grown, there's nothing
|
||||
new and we close cleanly without downloading. If it has grown, we
|
||||
download all events up to the new count and save only the new ones.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -88,12 +116,16 @@ class AchSession:
|
||||
output_dir: Path,
|
||||
timeout: float,
|
||||
events_only: bool,
|
||||
max_events: Optional[int],
|
||||
state_path: Path,
|
||||
) -> None:
|
||||
self.sock = sock
|
||||
self.peer = peer
|
||||
self.output_dir = output_dir
|
||||
self.timeout = timeout
|
||||
self.sock = sock
|
||||
self.peer = peer
|
||||
self.output_dir = output_dir
|
||||
self.timeout = timeout
|
||||
self.events_only = events_only
|
||||
self.max_events = max_events
|
||||
self.state_path = state_path
|
||||
|
||||
def run(self) -> None:
|
||||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
@@ -126,17 +158,14 @@ class AchSession:
|
||||
log.info("Inbound connection from %s", self.peer)
|
||||
log.info("Session dir: %s", session_dir)
|
||||
|
||||
# Wrap the accepted socket in a SocketTransport.
|
||||
# SocketTransport.connect() is a no-op — the socket is already live.
|
||||
transport = SocketTransport(self.sock, peer=self.peer)
|
||||
|
||||
# Tap the transport so we save every raw byte received from the device.
|
||||
# We monkey-patch read() to write to a file before returning.
|
||||
# Tap the transport: save every raw byte received from the device.
|
||||
raw_fh = open(raw_path, "wb")
|
||||
original_read = transport.read
|
||||
_orig_read = transport.read
|
||||
|
||||
def tapped_read(n: int) -> bytes:
|
||||
data = original_read(n)
|
||||
data = _orig_read(n)
|
||||
if data:
|
||||
raw_fh.write(data)
|
||||
raw_fh.flush()
|
||||
@@ -144,9 +173,11 @@ class AchSession:
|
||||
|
||||
transport.read = tapped_read # type: ignore[method-assign]
|
||||
|
||||
serial: Optional[str] = None
|
||||
|
||||
try:
|
||||
client = MiniMateClient(transport=transport, timeout=self.timeout)
|
||||
client.open() # calls transport.connect() — no-op for SocketTransport
|
||||
client.open()
|
||||
|
||||
# ── Step 1: startup handshake ─────────────────────────────────────
|
||||
log.info("Step 1/3: startup handshake (POLL / SUB 5B)")
|
||||
@@ -154,55 +185,106 @@ class AchSession:
|
||||
from minimateplus.protocol import MiniMateProtocol
|
||||
proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
|
||||
proto.startup()
|
||||
log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)")
|
||||
log.info(" NOTE: If you see this, the device waited for us to send POLL first.")
|
||||
log.info(" That means ACH is pull protocol (same as direct BW connection).")
|
||||
log.info(" ✓ Startup OK — pull protocol confirmed")
|
||||
except Exception as exc:
|
||||
log.error(" ✗ Startup failed: %s", exc)
|
||||
log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.")
|
||||
log.warning(" If raw_rx.bin is empty → unit didn't respond at all.")
|
||||
return
|
||||
|
||||
# ── Step 2: device info ───────────────────────────────────────────
|
||||
device_info = None
|
||||
if not self.events_only:
|
||||
log.info("Step 2/3: reading device info")
|
||||
try:
|
||||
device_info = client.connect() # SUB FE + 1A
|
||||
device_info = client.connect()
|
||||
serial = device_info.serial_number
|
||||
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
|
||||
log.info(
|
||||
" ✓ Device: serial=%s firmware=%s calibration=%s",
|
||||
device_info.serial_number,
|
||||
serial,
|
||||
device_info.firmware_version,
|
||||
device_info.calibration_date,
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(" ✗ Device info failed: %s", exc)
|
||||
# Not fatal — continue to events
|
||||
else:
|
||||
log.info("Step 2/3: skipping device info (--events-only)")
|
||||
|
||||
# ── Step 3: download events ────────────────────────────────────────
|
||||
log.info("Step 3/3: downloading events")
|
||||
# ── Step 3: check for new events via high-water mark ───────────────
|
||||
log.info("Step 3/3: checking for new events")
|
||||
|
||||
state = _load_state(self.state_path)
|
||||
unit_key = serial or self.peer # fall back to IP if no serial
|
||||
last_count = state.get(unit_key, {}).get("event_count", 0)
|
||||
|
||||
try:
|
||||
events = client.get_events(full_waveform=True)
|
||||
log.info(" ✓ Downloaded %d event(s)", len(events))
|
||||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in events])
|
||||
for i, ev in enumerate(events):
|
||||
log.info(
|
||||
" Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
|
||||
i,
|
||||
ev.timestamp.isoformat() if ev.timestamp else "?",
|
||||
ev.peaks.transverse if ev.peaks else 0,
|
||||
ev.peaks.vertical if ev.peaks else 0,
|
||||
ev.peaks.longitudinal if ev.peaks else 0,
|
||||
ev.peaks.vector_sum if ev.peaks else 0,
|
||||
)
|
||||
current_count = client.count_events()
|
||||
log.info(" Unit has %d stored event(s); last downloaded count: %d",
|
||||
current_count, last_count)
|
||||
except Exception as exc:
|
||||
log.error(" ✗ Event download failed: %s", exc)
|
||||
log.error(" ✗ count_events failed: %s", exc)
|
||||
return
|
||||
|
||||
if current_count <= last_count:
|
||||
log.info(" ✓ No new events since last call-home — nothing to download")
|
||||
log.info("Session complete (no new events) → %s", session_dir)
|
||||
return
|
||||
|
||||
new_event_count = current_count - last_count
|
||||
log.info(" %d new event(s) to download", new_event_count)
|
||||
|
||||
# Download all events up to current_count, apply max_events cap.
|
||||
# We re-download old events too (get_events always starts from 0),
|
||||
# but we only SAVE the new ones (the last new_event_count of the list).
|
||||
stop_idx = current_count - 1
|
||||
if self.max_events is not None:
|
||||
stop_idx = min(stop_idx, self.max_events - 1)
|
||||
if self.max_events < current_count:
|
||||
log.warning(
|
||||
" max_events=%d cap: will download events 0–%d only "
|
||||
"(unit has %d total)",
|
||||
self.max_events, stop_idx, current_count,
|
||||
)
|
||||
|
||||
try:
|
||||
all_events = client.get_events(
|
||||
full_waveform=True,
|
||||
stop_after_index=stop_idx,
|
||||
)
|
||||
# Only the events beyond last_count are genuinely new
|
||||
new_events = all_events[last_count:]
|
||||
log.info(" ✓ Downloaded %d total event(s), %d new",
|
||||
len(all_events), len(new_events))
|
||||
|
||||
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
|
||||
if last_count > 0 and len(all_events) > len(new_events):
|
||||
log.info(" (skipped %d already-seen event(s))", last_count)
|
||||
|
||||
for i, ev in enumerate(new_events):
|
||||
log.info(
|
||||
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
|
||||
last_count + i,
|
||||
ev.timestamp.isoformat() if ev.timestamp else "?",
|
||||
ev.peaks.transverse if ev.peaks else 0,
|
||||
ev.peaks.vertical if ev.peaks else 0,
|
||||
ev.peaks.longitudinal if ev.peaks else 0,
|
||||
ev.peaks.vector_sum if ev.peaks else 0,
|
||||
)
|
||||
|
||||
# Update high-water mark
|
||||
state[unit_key] = {
|
||||
"event_count": current_count,
|
||||
"last_seen": datetime.datetime.now().isoformat(),
|
||||
"serial": serial,
|
||||
"peer": self.peer,
|
||||
}
|
||||
_save_state(self.state_path, state)
|
||||
|
||||
except Exception as exc:
|
||||
log.error(" ✗ Event download failed: %s", exc, exc_info=True)
|
||||
|
||||
finally:
|
||||
raw_fh.close()
|
||||
client.close()
|
||||
client.close() # closes transport / socket cleanly
|
||||
|
||||
log.info("Session complete → %s", session_dir)
|
||||
log.info("="*60)
|
||||
@@ -268,15 +350,19 @@ def _event_to_dict(e: Event) -> dict:
|
||||
def serve(args: argparse.Namespace) -> None:
|
||||
output_dir = Path(args.output)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
state_path = output_dir / "ach_state.json"
|
||||
|
||||
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
server_sock.bind(("0.0.0.0", args.port))
|
||||
server_sock.listen(5)
|
||||
|
||||
max_ev = args.max_events
|
||||
print(f"\n{'='*60}")
|
||||
print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
|
||||
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
||||
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
|
||||
print(f" State file: {state_path}")
|
||||
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
|
||||
print(f"{'='*60}")
|
||||
print(f"\n Point your test unit's ACEmanager call-home settings to:")
|
||||
print(f" Remote Host: <this machine's LAN IP>")
|
||||
@@ -295,6 +381,8 @@ def serve(args: argparse.Namespace) -> None:
|
||||
output_dir=output_dir,
|
||||
timeout=args.timeout,
|
||||
events_only=args.events_only,
|
||||
max_events=max_ev,
|
||||
state_path=state_path,
|
||||
)
|
||||
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
|
||||
t.start()
|
||||
@@ -336,6 +424,16 @@ def parse_args() -> argparse.Namespace:
|
||||
action="store_true",
|
||||
help="Skip the device-info step and go straight to event download.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--max-events",
|
||||
type=int,
|
||||
default=None,
|
||||
metavar="N",
|
||||
help=(
|
||||
"Safety cap: download at most N events per session (default: unlimited). "
|
||||
"Useful if a unit has many old events stored — prevents a very long first run."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--verbose", "-v",
|
||||
action="store_true",
|
||||
|
||||
Reference in New Issue
Block a user