diff --git a/.gitignore b/.gitignore index e6e6e12..212a2eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,28 +1,28 @@ -/bridges/captures/ -/example-events/ - -/manuals/ - -# Python bytecode -__pycache__/ -*.py[cod] - -# Virtual environments -.venv/ -venv/ -env/ - -# Editor / OS -.vscode/ -*.swp -.DS_Store -Thumbs.db - -# Analyzer outputs -*.report -claude_export_*.md - -# Frame database -*.db -*.db-wal -*.db-shm +/bridges/captures/ +/example-events/ + +/manuals/ + +# Python bytecode +__pycache__/ +*.py[cod] + +# Virtual environments +.venv/ +venv/ +env/ + +# Editor / OS +.vscode/ +*.swp +.DS_Store +Thumbs.db + +# Analyzer outputs +*.report +claude_export_*.md + +# Frame database +*.db +*.db-wal +*.db-shm diff --git a/CLAUDE.md b/CLAUDE.md index 50d2f46..b6faa1c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -582,28 +582,32 @@ All confirmed from 4-8-26/2ndtry BW TX/S3 capture (clean start → 30s monitor Standard two-step read (probe at offset 0x00, data at offset 0x2C). Response SUB = 0xFF − 0x1C = **0xE3** (standard formula — no exception). -**Payload length is ~46–49 bytes in BOTH idle and monitoring states** — length alone -is NOT a reliable mode indicator. Earlier note claiming "12 bytes when monitoring" -was wrong (confirmed 2026-04-08 from 4-8-26/mid-monitor captures). +**Payload length is 46–47 bytes IDLE, 48–49 bytes MONITORING** — not a reliable sole +indicator due to 1-byte jitter overlap at the boundary. -**Monitoring flag (CORRECTED 2026-04-08 — full byte diff of 2ndtry capture):** -- `section[6] == 0x00` → unit is **idle** -- `section[6] == 0x10` → unit is **monitoring** +**Monitoring flag (CONFIRMED 2026-04-09 — byte diff of all 144 data frames, 2ndtry capture):** +- `section[1] == 0x00` → unit is **idle** +- `section[1] == 0x10` → unit is **monitoring** -Earlier note claiming `section[1]` was the flag was WRONG — section[1] is always 0x00 in both states. The correction was found by diffing all 0xE3 data frames across the start/stop transitions: `section[6]` is the only byte that flips cleanly at frame #36 (start) and #132 (stop) within the 2ndtry 0xE3 frame sequence. +This is `data[12]` (= `frame.data[12]`). The flag is 0x00 in all 36 IDLE_BEFORE frames, +0x10 in all 98 MONITORING frames, and 0x00 in all 10 IDLE_AFTER frames — 100% accurate. -Battery and memory fields are present in **both** states, but the payload grows by **3 bytes** when monitoring is active (section goes from ~52 to ~55 bytes), shifting subsequent fields by +3. +**HISTORY OF THIS FIELD (do not re-derive):** The original implementation used `section[1]`. +A re-analysis in the prior session incorrectly concluded `section[1]` is always 0x00 and +"corrected" the flag to `section[6]`, which has non-binary values (0xea idle, 0x07 monitoring) +and is device-specific. The 2026-04-09 re-analysis confirms `section[1]` was right. -**Field offsets (relative to `data[11:]` = section):** +**IMPORTANT — `frame.data` has checksum already stripped** by `S3FrameParser._finalise()` +(`raw_payload = body[:-1]`; `data = raw_payload[5:]`). There is NO trailing checksum byte in +`section`. All relative-from-end offsets must account for this. -Battery and memory are at **relative offsets from the end** — the payload can vary by ±1–3 bytes due to counter jitter and monitoring-mode expansion, but these 10 bytes are always anchored at the end: +Battery and memory fields are present in **both** states: | Offset (relative to end) | Field | Type | Notes | |---|---|---|---| -| `section[-11:-9]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V | -| `section[-9:-5]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB | -| `section[-5:-1]` | memory free (bytes) | uint32 BE | decreases as events are stored | -| `section[-1]` | frame checksum | — | last byte, skip | +| `section[-10:-8]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V | +| `section[-8:-4]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB | +| `section[-4:]` | memory free (bytes) | uint32 BE | decreases as events are stored | ### SESSION_RESET signal (`41 03`) — required for monitoring units @@ -657,7 +661,7 @@ Key findings: **SFM behavior after `POST /device/monitor/start`:** `_pollMonitorConfirm()` polls `/device/monitor/status` every 5 s for up to 60 s, updating the badge on each poll. -Status will show MONITORING once `section[6]` flips to `0x10`. +Status will show MONITORING once `section[1]` flips to `0x10`. ### SUBs known from sensor-check capture (4-8-26) — NOT YET IMPLEMENTED diff --git a/bridges/ach_server.py b/bridges/ach_server.py index 6088431..1c37f86 100644 --- a/bridges/ach_server.py +++ b/bridges/ach_server.py @@ -71,6 +71,28 @@ from minimateplus.models import DeviceInfo, Event log = logging.getLogger("ach_server") +# ── Per-unit state (high-water mark) ────────────────────────────────────────── +# Persisted as /ach_state.json +# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... } + +_state_lock = threading.Lock() + + +def _load_state(state_path: Path) -> dict: + if state_path.exists(): + try: + with open(state_path) as f: + return json.load(f) + except Exception: + pass + return {} + + +def _save_state(state_path: Path, state: dict) -> None: + with _state_lock: + with open(state_path, "w") as f: + json.dump(state, f, indent=2) + # ── Per-session handler ──────────────────────────────────────────────────────── @@ -79,6 +101,12 @@ class AchSession: Handles one inbound unit connection in its own thread. Wraps the socket in a SocketTransport → MiniMateClient, then runs the standard connect → get_device_info → get_events sequence. + + State tracking (ach_state.json in output_dir): + On each successful download we record how many events the unit had. + On the next call-home we compare: if count hasn't grown, there's nothing + new and we close cleanly without downloading. If it has grown, we + download all events up to the new count and save only the new ones. """ def __init__( @@ -88,12 +116,16 @@ class AchSession: output_dir: Path, timeout: float, events_only: bool, + max_events: Optional[int], + state_path: Path, ) -> None: - self.sock = sock - self.peer = peer - self.output_dir = output_dir - self.timeout = timeout + self.sock = sock + self.peer = peer + self.output_dir = output_dir + self.timeout = timeout self.events_only = events_only + self.max_events = max_events + self.state_path = state_path def run(self) -> None: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") @@ -126,17 +158,14 @@ class AchSession: log.info("Inbound connection from %s", self.peer) log.info("Session dir: %s", session_dir) - # Wrap the accepted socket in a SocketTransport. - # SocketTransport.connect() is a no-op — the socket is already live. transport = SocketTransport(self.sock, peer=self.peer) - # Tap the transport so we save every raw byte received from the device. - # We monkey-patch read() to write to a file before returning. + # Tap the transport: save every raw byte received from the device. raw_fh = open(raw_path, "wb") - original_read = transport.read + _orig_read = transport.read def tapped_read(n: int) -> bytes: - data = original_read(n) + data = _orig_read(n) if data: raw_fh.write(data) raw_fh.flush() @@ -144,9 +173,11 @@ class AchSession: transport.read = tapped_read # type: ignore[method-assign] + serial: Optional[str] = None + try: client = MiniMateClient(transport=transport, timeout=self.timeout) - client.open() # calls transport.connect() — no-op for SocketTransport + client.open() # ── Step 1: startup handshake ───────────────────────────────────── log.info("Step 1/3: startup handshake (POLL / SUB 5B)") @@ -154,55 +185,106 @@ class AchSession: from minimateplus.protocol import MiniMateProtocol proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto.startup() - log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)") - log.info(" NOTE: If you see this, the device waited for us to send POLL first.") - log.info(" That means ACH is pull protocol (same as direct BW connection).") + log.info(" ✓ Startup OK — pull protocol confirmed") except Exception as exc: log.error(" ✗ Startup failed: %s", exc) - log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.") - log.warning(" If raw_rx.bin is empty → unit didn't respond at all.") return # ── Step 2: device info ─────────────────────────────────────────── + device_info = None if not self.events_only: log.info("Step 2/3: reading device info") try: - device_info = client.connect() # SUB FE + 1A + device_info = client.connect() + serial = device_info.serial_number _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) log.info( " ✓ Device: serial=%s firmware=%s calibration=%s", - device_info.serial_number, + serial, device_info.firmware_version, device_info.calibration_date, ) except Exception as exc: log.error(" ✗ Device info failed: %s", exc) - # Not fatal — continue to events else: log.info("Step 2/3: skipping device info (--events-only)") - # ── Step 3: download events ──────────────────────────────────────── - log.info("Step 3/3: downloading events") + # ── Step 3: check for new events via high-water mark ─────────────── + log.info("Step 3/3: checking for new events") + + state = _load_state(self.state_path) + unit_key = serial or self.peer # fall back to IP if no serial + last_count = state.get(unit_key, {}).get("event_count", 0) + try: - events = client.get_events(full_waveform=True) - log.info(" ✓ Downloaded %d event(s)", len(events)) - _save_json(session_dir / "events.json", [_event_to_dict(e) for e in events]) - for i, ev in enumerate(events): - log.info( - " Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f", - i, - ev.timestamp.isoformat() if ev.timestamp else "?", - ev.peaks.transverse if ev.peaks else 0, - ev.peaks.vertical if ev.peaks else 0, - ev.peaks.longitudinal if ev.peaks else 0, - ev.peaks.vector_sum if ev.peaks else 0, - ) + current_count = client.count_events() + log.info(" Unit has %d stored event(s); last downloaded count: %d", + current_count, last_count) except Exception as exc: - log.error(" ✗ Event download failed: %s", exc) + log.error(" ✗ count_events failed: %s", exc) + return + + if current_count <= last_count: + log.info(" ✓ No new events since last call-home — nothing to download") + log.info("Session complete (no new events) → %s", session_dir) + return + + new_event_count = current_count - last_count + log.info(" %d new event(s) to download", new_event_count) + + # Download all events up to current_count, apply max_events cap. + # We re-download old events too (get_events always starts from 0), + # but we only SAVE the new ones (the last new_event_count of the list). + stop_idx = current_count - 1 + if self.max_events is not None: + stop_idx = min(stop_idx, self.max_events - 1) + if self.max_events < current_count: + log.warning( + " max_events=%d cap: will download events 0–%d only " + "(unit has %d total)", + self.max_events, stop_idx, current_count, + ) + + try: + all_events = client.get_events( + full_waveform=True, + stop_after_index=stop_idx, + ) + # Only the events beyond last_count are genuinely new + new_events = all_events[last_count:] + log.info(" ✓ Downloaded %d total event(s), %d new", + len(all_events), len(new_events)) + + _save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events]) + if last_count > 0 and len(all_events) > len(new_events): + log.info(" (skipped %d already-seen event(s))", last_count) + + for i, ev in enumerate(new_events): + log.info( + " NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f", + last_count + i, + ev.timestamp.isoformat() if ev.timestamp else "?", + ev.peaks.transverse if ev.peaks else 0, + ev.peaks.vertical if ev.peaks else 0, + ev.peaks.longitudinal if ev.peaks else 0, + ev.peaks.vector_sum if ev.peaks else 0, + ) + + # Update high-water mark + state[unit_key] = { + "event_count": current_count, + "last_seen": datetime.datetime.now().isoformat(), + "serial": serial, + "peer": self.peer, + } + _save_state(self.state_path, state) + + except Exception as exc: + log.error(" ✗ Event download failed: %s", exc, exc_info=True) finally: raw_fh.close() - client.close() + client.close() # closes transport / socket cleanly log.info("Session complete → %s", session_dir) log.info("="*60) @@ -268,15 +350,19 @@ def _event_to_dict(e: Event) -> dict: def serve(args: argparse.Namespace) -> None: output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) + state_path = output_dir / "ach_state.json" server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("0.0.0.0", args.port)) server_sock.listen(5) + max_ev = args.max_events print(f"\n{'='*60}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}") - print(f" Output: {output_dir.resolve()}/ach_inbound_/") + print(f" Output: {output_dir.resolve()}/ach_inbound_/") + print(f" State file: {state_path}") + print(f" Max events per session: {max_ev if max_ev else 'unlimited'}") print(f"{'='*60}") print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f" Remote Host: ") @@ -295,6 +381,8 @@ def serve(args: argparse.Namespace) -> None: output_dir=output_dir, timeout=args.timeout, events_only=args.events_only, + max_events=max_ev, + state_path=state_path, ) t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t.start() @@ -336,6 +424,16 @@ def parse_args() -> argparse.Namespace: action="store_true", help="Skip the device-info step and go straight to event download.", ) + p.add_argument( + "--max-events", + type=int, + default=None, + metavar="N", + help=( + "Safety cap: download at most N events per session (default: unlimited). " + "Useful if a unit has many old events stored — prevents a very long first run." + ), + ) p.add_argument( "--verbose", "-v", action="store_true", diff --git a/minimateplus/client.py b/minimateplus/client.py index 9e32c6b..984cbcf 100644 --- a/minimateplus/client.py +++ b/minimateplus/client.py @@ -1755,17 +1755,20 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus: data is the raw S3 frame .data attribute (includes the 11-byte section header, so field offsets below are relative to data[11]). - Monitoring flag (confirmed 4-8-26/2ndtry, full byte diff analysis): - section[6] == 0x00 → idle - section[6] == 0x10 → monitoring + NOTE: frame.data has the checksum byte already stripped by S3FrameParser + (_finalise returns raw_payload[5:] where raw_payload = body[:-1]). + There is NO trailing checksum byte in section. - The payload size varies (52–55+ bytes) but the battery/memory block is - always the last 10 bytes before the trailing checksum byte: + Monitoring flag (confirmed 4-8-26/2ndtry, byte diff of all 144 data frames): + section[1] == 0x00 → idle + section[1] == 0x10 → monitoring - section[-11:-9] battery × 100 uint16 BE (0x02A8 = 6.80 V) - section[-9 :-5] memory_total uint32 BE bytes - section[-5 :-1] memory_free uint32 BE bytes - section[-1] checksum (not data) + The payload length varies (46–49 bytes) — IDLE is 46-47, MONITORING is 48-49. + The battery/memory block is always the last 10 bytes of section (no checksum): + + section[-10:-8] battery × 100 uint16 BE (0x02A8 = 6.80 V) + section[-8 :-4] memory_total uint32 BE bytes + section[-4:] memory_free uint32 BE bytes Values confirmed from 4-8-26/2ndtry capture (BE11529): battery 0x02A8 = 680 → 6.80 V @@ -1780,32 +1783,30 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus: len(data), len(section), section.hex(), ) - # Monitoring flag: section[6] (CORRECTED 2026-04-08 — was wrongly section[1]). - # Byte diff of 2ndtry BW-S3 captures confirms section[6] flips 0x00↔0x10 - # exactly at the start/stop monitoring transitions (0xE3 frame #36 / #132). - is_monitoring = len(section) > 6 and section[6] == 0x10 + # Monitoring flag: section[1] == 0x10. + # Confirmed from byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture: + # section[1] = 0x00 in all IDLE frames, 0x10 in all MONITORING frames. + # (section[6] also changes but has non-binary values 0xea/0x07 — device-specific.) + is_monitoring = len(section) > 1 and section[1] == 0x10 battery_v = None memory_total = None memory_free = None - # Battery and memory offsets are RELATIVE TO THE END of the section. - # The payload length varies (52–55+ bytes) depending on monitoring state and - # internal counters, but the battery/memory block is always the last 10 bytes - # before the checksum (section[-1]). + # Battery and memory at relative-from-end offsets. + # Payload length varies (46–49 bytes) but the battery/memory block is always + # the last 10 bytes. No checksum byte — it was stripped by S3FrameParser. # - # section[-11:-9] battery × 100 uint16 BE 0x02A8 = 6.80 V - # section[-9 :-5] memory_total uint32 BE ≈ 960 KB on BE11529 - # section[-5 :-1] memory_free uint32 BE decreases as events fill - # section[-1] frame checksum (not data) + # section[-10:-8] battery × 100 uint16 BE 0x02A8 = 6.80 V + # section[-8 :-4] memory_total uint32 BE ≈ 960 KB on BE11529 + # section[-4:] memory_free uint32 BE decreases as events fill # - # Confirmed stable across IDLE (52b), MONITORING (55b), and counter-jitter - # IDLE variants (53b) from 4-8-26/2ndtry full capture analysis. - if len(section) >= 11: - batt_raw = struct.unpack(">H", section[-11:-9])[0] + # Confirmed stable across IDLE (46b), MONITORING (48-49b) variants. + if len(section) >= 10: + batt_raw = struct.unpack(">H", section[-10:-8])[0] battery_v = batt_raw / 100.0 - memory_total = struct.unpack(">I", section[-9:-5])[0] - memory_free = struct.unpack(">I", section[-5:-1])[0] + memory_total = struct.unpack(">I", section[-8:-4])[0] + memory_free = struct.unpack(">I", section[-4:])[0] return MonitorStatus( is_monitoring=is_monitoring,