feat: add high-water mark state tracking to ach_server + fix monitoring flag

ach_server.py:
- Add ach_state.json per-unit state tracking (keyed by serial number)
- count_events() before any download; skip session if no new events since last call-home
- Download only events beyond the previous high-water mark (all_events[last_count:])
- --max-events N safety cap for first-run units with many stored events
- state_path and max_events wired through AchSession constructor and serve()

client.py (_decode_monitor_status):
- Revert monitoring flag to section[1] == 0x10 (was incorrectly changed to section[6])
- Fix battery/memory offsets to section[-10:-8], [-8:-4], [-4:] (no trailing checksum byte)
- Both confirmed by full byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-09 14:38:44 -04:00
committed by serversdown
parent cf7d838bf4
commit 0358acb51d
4 changed files with 210 additions and 107 deletions
+28 -28
View File
@@ -1,28 +1,28 @@
/bridges/captures/ /bridges/captures/
/example-events/ /example-events/
/manuals/ /manuals/
# Python bytecode # Python bytecode
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
# Virtual environments # Virtual environments
.venv/ .venv/
venv/ venv/
env/ env/
# Editor / OS # Editor / OS
.vscode/ .vscode/
*.swp *.swp
.DS_Store .DS_Store
Thumbs.db Thumbs.db
# Analyzer outputs # Analyzer outputs
*.report *.report
claude_export_*.md claude_export_*.md
# Frame database # Frame database
*.db *.db
*.db-wal *.db-wal
*.db-shm *.db-shm
+19 -15
View File
@@ -582,28 +582,32 @@ All confirmed from 4-8-26/2ndtry BW TX/S3 capture (clean start → 30s monitor
Standard two-step read (probe at offset 0x00, data at offset 0x2C). Standard two-step read (probe at offset 0x00, data at offset 0x2C).
Response SUB = 0xFF 0x1C = **0xE3** (standard formula — no exception). Response SUB = 0xFF 0x1C = **0xE3** (standard formula — no exception).
**Payload length is ~4649 bytes in BOTH idle and monitoring states** — length alone **Payload length is 4647 bytes IDLE, 4849 bytes MONITORING** — not a reliable sole
is NOT a reliable mode indicator. Earlier note claiming "12 bytes when monitoring" indicator due to 1-byte jitter overlap at the boundary.
was wrong (confirmed 2026-04-08 from 4-8-26/mid-monitor captures).
**Monitoring flag (CORRECTED 2026-04-08 full byte diff of 2ndtry capture):** **Monitoring flag (CONFIRMED 2026-04-09 — byte diff of all 144 data frames, 2ndtry capture):**
- `section[6] == 0x00` → unit is **idle** - `section[1] == 0x00` → unit is **idle**
- `section[6] == 0x10` → unit is **monitoring** - `section[1] == 0x10` → unit is **monitoring**
Earlier note claiming `section[1]` was the flag was WRONG — section[1] is always 0x00 in both states. The correction was found by diffing all 0xE3 data frames across the start/stop transitions: `section[6]` is the only byte that flips cleanly at frame #36 (start) and #132 (stop) within the 2ndtry 0xE3 frame sequence. This is `data[12]` (= `frame.data[12]`). The flag is 0x00 in all 36 IDLE_BEFORE frames,
0x10 in all 98 MONITORING frames, and 0x00 in all 10 IDLE_AFTER frames — 100% accurate.
Battery and memory fields are present in **both** states, but the payload grows by **3 bytes** when monitoring is active (section goes from ~52 to ~55 bytes), shifting subsequent fields by +3. **HISTORY OF THIS FIELD (do not re-derive):** The original implementation used `section[1]`.
A re-analysis in the prior session incorrectly concluded `section[1]` is always 0x00 and
"corrected" the flag to `section[6]`, which has non-binary values (0xea idle, 0x07 monitoring)
and is device-specific. The 2026-04-09 re-analysis confirms `section[1]` was right.
**Field offsets (relative to `data[11:]` = section):** **IMPORTANT — `frame.data` has checksum already stripped** by `S3FrameParser._finalise()`
(`raw_payload = body[:-1]`; `data = raw_payload[5:]`). There is NO trailing checksum byte in
`section`. All relative-from-end offsets must account for this.
Battery and memory are at **relative offsets from the end** — the payload can vary by ±13 bytes due to counter jitter and monitoring-mode expansion, but these 10 bytes are always anchored at the end: Battery and memory fields are present in **both** states:
| Offset (relative to end) | Field | Type | Notes | | Offset (relative to end) | Field | Type | Notes |
|---|---|---|---| |---|---|---|---|
| `section[-11:-9]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V | | `section[-10:-8]` | battery voltage × 100 | uint16 BE | `0x02A8` = 680 → 6.80 V |
| `section[-9:-5]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB | | `section[-8:-4]` | memory total (bytes) | uint32 BE | e.g. 983026 ≈ 960 KB |
| `section[-5:-1]` | memory free (bytes) | uint32 BE | decreases as events are stored | | `section[-4:]` | memory free (bytes) | uint32 BE | decreases as events are stored |
| `section[-1]` | frame checksum | — | last byte, skip |
### SESSION_RESET signal (`41 03`) — required for monitoring units ### SESSION_RESET signal (`41 03`) — required for monitoring units
@@ -657,7 +661,7 @@ Key findings:
**SFM behavior after `POST /device/monitor/start`:** `_pollMonitorConfirm()` polls **SFM behavior after `POST /device/monitor/start`:** `_pollMonitorConfirm()` polls
`/device/monitor/status` every 5 s for up to 60 s, updating the badge on each poll. `/device/monitor/status` every 5 s for up to 60 s, updating the badge on each poll.
Status will show MONITORING once `section[6]` flips to `0x10`. Status will show MONITORING once `section[1]` flips to `0x10`.
### SUBs known from sensor-check capture (4-8-26) — NOT YET IMPLEMENTED ### SUBs known from sensor-check capture (4-8-26) — NOT YET IMPLEMENTED
+135 -37
View File
@@ -71,6 +71,28 @@ from minimateplus.models import DeviceInfo, Event
log = logging.getLogger("ach_server") log = logging.getLogger("ach_server")
# ── Per-unit state (high-water mark) ──────────────────────────────────────────
# Persisted as <output_dir>/ach_state.json
# Format: { "BE11529": { "event_count": 5, "last_seen": "2026-04-09T..." }, ... }
_state_lock = threading.Lock()
def _load_state(state_path: Path) -> dict:
if state_path.exists():
try:
with open(state_path) as f:
return json.load(f)
except Exception:
pass
return {}
def _save_state(state_path: Path, state: dict) -> None:
with _state_lock:
with open(state_path, "w") as f:
json.dump(state, f, indent=2)
# ── Per-session handler ──────────────────────────────────────────────────────── # ── Per-session handler ────────────────────────────────────────────────────────
@@ -79,6 +101,12 @@ class AchSession:
Handles one inbound unit connection in its own thread. Handles one inbound unit connection in its own thread.
Wraps the socket in a SocketTransport → MiniMateClient, then runs the Wraps the socket in a SocketTransport → MiniMateClient, then runs the
standard connect → get_device_info → get_events sequence. standard connect → get_device_info → get_events sequence.
State tracking (ach_state.json in output_dir):
On each successful download we record how many events the unit had.
On the next call-home we compare: if count hasn't grown, there's nothing
new and we close cleanly without downloading. If it has grown, we
download all events up to the new count and save only the new ones.
""" """
def __init__( def __init__(
@@ -88,12 +116,16 @@ class AchSession:
output_dir: Path, output_dir: Path,
timeout: float, timeout: float,
events_only: bool, events_only: bool,
max_events: Optional[int],
state_path: Path,
) -> None: ) -> None:
self.sock = sock self.sock = sock
self.peer = peer self.peer = peer
self.output_dir = output_dir self.output_dir = output_dir
self.timeout = timeout self.timeout = timeout
self.events_only = events_only self.events_only = events_only
self.max_events = max_events
self.state_path = state_path
def run(self) -> None: def run(self) -> None:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
@@ -126,17 +158,14 @@ class AchSession:
log.info("Inbound connection from %s", self.peer) log.info("Inbound connection from %s", self.peer)
log.info("Session dir: %s", session_dir) log.info("Session dir: %s", session_dir)
# Wrap the accepted socket in a SocketTransport.
# SocketTransport.connect() is a no-op — the socket is already live.
transport = SocketTransport(self.sock, peer=self.peer) transport = SocketTransport(self.sock, peer=self.peer)
# Tap the transport so we save every raw byte received from the device. # Tap the transport: save every raw byte received from the device.
# We monkey-patch read() to write to a file before returning.
raw_fh = open(raw_path, "wb") raw_fh = open(raw_path, "wb")
original_read = transport.read _orig_read = transport.read
def tapped_read(n: int) -> bytes: def tapped_read(n: int) -> bytes:
data = original_read(n) data = _orig_read(n)
if data: if data:
raw_fh.write(data) raw_fh.write(data)
raw_fh.flush() raw_fh.flush()
@@ -144,9 +173,11 @@ class AchSession:
transport.read = tapped_read # type: ignore[method-assign] transport.read = tapped_read # type: ignore[method-assign]
serial: Optional[str] = None
try: try:
client = MiniMateClient(transport=transport, timeout=self.timeout) client = MiniMateClient(transport=transport, timeout=self.timeout)
client.open() # calls transport.connect() — no-op for SocketTransport client.open()
# ── Step 1: startup handshake ───────────────────────────────────── # ── Step 1: startup handshake ─────────────────────────────────────
log.info("Step 1/3: startup handshake (POLL / SUB 5B)") log.info("Step 1/3: startup handshake (POLL / SUB 5B)")
@@ -154,55 +185,106 @@ class AchSession:
from minimateplus.protocol import MiniMateProtocol from minimateplus.protocol import MiniMateProtocol
proto = MiniMateProtocol(transport, recv_timeout=self.timeout) proto = MiniMateProtocol(transport, recv_timeout=self.timeout)
proto.startup() proto.startup()
log.info(" ✓ Startup OK — device responded to POLL (pull protocol confirmed)") log.info(" ✓ Startup OK — pull protocol confirmed")
log.info(" NOTE: If you see this, the device waited for us to send POLL first.")
log.info(" That means ACH is pull protocol (same as direct BW connection).")
except Exception as exc: except Exception as exc:
log.error(" ✗ Startup failed: %s", exc) log.error(" ✗ Startup failed: %s", exc)
log.warning(" If startup timed out with bytes in raw_rx.bin → push protocol.")
log.warning(" If raw_rx.bin is empty → unit didn't respond at all.")
return return
# ── Step 2: device info ─────────────────────────────────────────── # ── Step 2: device info ───────────────────────────────────────────
device_info = None
if not self.events_only: if not self.events_only:
log.info("Step 2/3: reading device info") log.info("Step 2/3: reading device info")
try: try:
device_info = client.connect() # SUB FE + 1A device_info = client.connect()
serial = device_info.serial_number
_save_json(session_dir / "device_info.json", _device_info_to_dict(device_info)) _save_json(session_dir / "device_info.json", _device_info_to_dict(device_info))
log.info( log.info(
" ✓ Device: serial=%s firmware=%s calibration=%s", " ✓ Device: serial=%s firmware=%s calibration=%s",
device_info.serial_number, serial,
device_info.firmware_version, device_info.firmware_version,
device_info.calibration_date, device_info.calibration_date,
) )
except Exception as exc: except Exception as exc:
log.error(" ✗ Device info failed: %s", exc) log.error(" ✗ Device info failed: %s", exc)
# Not fatal — continue to events
else: else:
log.info("Step 2/3: skipping device info (--events-only)") log.info("Step 2/3: skipping device info (--events-only)")
# ── Step 3: download events ──────────────────────────────────────── # ── Step 3: check for new events via high-water mark ───────────────
log.info("Step 3/3: downloading events") log.info("Step 3/3: checking for new events")
state = _load_state(self.state_path)
unit_key = serial or self.peer # fall back to IP if no serial
last_count = state.get(unit_key, {}).get("event_count", 0)
try: try:
events = client.get_events(full_waveform=True) current_count = client.count_events()
log.info(" ✓ Downloaded %d event(s)", len(events)) log.info(" Unit has %d stored event(s); last downloaded count: %d",
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in events]) current_count, last_count)
for i, ev in enumerate(events):
log.info(
" Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
i,
ev.timestamp.isoformat() if ev.timestamp else "?",
ev.peaks.transverse if ev.peaks else 0,
ev.peaks.vertical if ev.peaks else 0,
ev.peaks.longitudinal if ev.peaks else 0,
ev.peaks.vector_sum if ev.peaks else 0,
)
except Exception as exc: except Exception as exc:
log.error("Event download failed: %s", exc) log.error("count_events failed: %s", exc)
return
if current_count <= last_count:
log.info(" ✓ No new events since last call-home — nothing to download")
log.info("Session complete (no new events) → %s", session_dir)
return
new_event_count = current_count - last_count
log.info(" %d new event(s) to download", new_event_count)
# Download all events up to current_count, apply max_events cap.
# We re-download old events too (get_events always starts from 0),
# but we only SAVE the new ones (the last new_event_count of the list).
stop_idx = current_count - 1
if self.max_events is not None:
stop_idx = min(stop_idx, self.max_events - 1)
if self.max_events < current_count:
log.warning(
" max_events=%d cap: will download events 0%d only "
"(unit has %d total)",
self.max_events, stop_idx, current_count,
)
try:
all_events = client.get_events(
full_waveform=True,
stop_after_index=stop_idx,
)
# Only the events beyond last_count are genuinely new
new_events = all_events[last_count:]
log.info(" ✓ Downloaded %d total event(s), %d new",
len(all_events), len(new_events))
_save_json(session_dir / "events.json", [_event_to_dict(e) for e in new_events])
if last_count > 0 and len(all_events) > len(new_events):
log.info(" (skipped %d already-seen event(s))", last_count)
for i, ev in enumerate(new_events):
log.info(
" NEW Event %d: %s Tran=%.4f Vert=%.4f Long=%.4f VS=%.4f",
last_count + i,
ev.timestamp.isoformat() if ev.timestamp else "?",
ev.peaks.transverse if ev.peaks else 0,
ev.peaks.vertical if ev.peaks else 0,
ev.peaks.longitudinal if ev.peaks else 0,
ev.peaks.vector_sum if ev.peaks else 0,
)
# Update high-water mark
state[unit_key] = {
"event_count": current_count,
"last_seen": datetime.datetime.now().isoformat(),
"serial": serial,
"peer": self.peer,
}
_save_state(self.state_path, state)
except Exception as exc:
log.error(" ✗ Event download failed: %s", exc, exc_info=True)
finally: finally:
raw_fh.close() raw_fh.close()
client.close() client.close() # closes transport / socket cleanly
log.info("Session complete → %s", session_dir) log.info("Session complete → %s", session_dir)
log.info("="*60) log.info("="*60)
@@ -268,15 +350,19 @@ def _event_to_dict(e: Event) -> dict:
def serve(args: argparse.Namespace) -> None: def serve(args: argparse.Namespace) -> None:
output_dir = Path(args.output) output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
state_path = output_dir / "ach_state.json"
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(("0.0.0.0", args.port)) server_sock.bind(("0.0.0.0", args.port))
server_sock.listen(5) server_sock.listen(5)
max_ev = args.max_events
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f" ACH inbound server listening on 0.0.0.0:{args.port}") print(f" ACH inbound server listening on 0.0.0.0:{args.port}")
print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/") print(f" Output: {output_dir.resolve()}/ach_inbound_<timestamp>/")
print(f" State file: {state_path}")
print(f" Max events per session: {max_ev if max_ev else 'unlimited'}")
print(f"{'='*60}") print(f"{'='*60}")
print(f"\n Point your test unit's ACEmanager call-home settings to:") print(f"\n Point your test unit's ACEmanager call-home settings to:")
print(f" Remote Host: <this machine's LAN IP>") print(f" Remote Host: <this machine's LAN IP>")
@@ -295,6 +381,8 @@ def serve(args: argparse.Namespace) -> None:
output_dir=output_dir, output_dir=output_dir,
timeout=args.timeout, timeout=args.timeout,
events_only=args.events_only, events_only=args.events_only,
max_events=max_ev,
state_path=state_path,
) )
t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}") t = threading.Thread(target=session.run, daemon=True, name=f"ach-{peer}")
t.start() t.start()
@@ -336,6 +424,16 @@ def parse_args() -> argparse.Namespace:
action="store_true", action="store_true",
help="Skip the device-info step and go straight to event download.", help="Skip the device-info step and go straight to event download.",
) )
p.add_argument(
"--max-events",
type=int,
default=None,
metavar="N",
help=(
"Safety cap: download at most N events per session (default: unlimited). "
"Useful if a unit has many old events stored — prevents a very long first run."
),
)
p.add_argument( p.add_argument(
"--verbose", "-v", "--verbose", "-v",
action="store_true", action="store_true",
+28 -27
View File
@@ -1755,17 +1755,20 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus:
data is the raw S3 frame .data attribute (includes the 11-byte section data is the raw S3 frame .data attribute (includes the 11-byte section
header, so field offsets below are relative to data[11]). header, so field offsets below are relative to data[11]).
Monitoring flag (confirmed 4-8-26/2ndtry, full byte diff analysis): NOTE: frame.data has the checksum byte already stripped by S3FrameParser
section[6] == 0x00 → idle (_finalise returns raw_payload[5:] where raw_payload = body[:-1]).
section[6] == 0x10 → monitoring There is NO trailing checksum byte in section.
The payload size varies (5255+ bytes) but the battery/memory block is Monitoring flag (confirmed 4-8-26/2ndtry, byte diff of all 144 data frames):
always the last 10 bytes before the trailing checksum byte: section[1] == 0x00 → idle
section[1] == 0x10 → monitoring
section[-11:-9] battery × 100 uint16 BE (0x02A8 = 6.80 V) The payload length varies (4649 bytes) — IDLE is 46-47, MONITORING is 48-49.
section[-9 :-5] memory_total uint32 BE bytes The battery/memory block is always the last 10 bytes of section (no checksum):
section[-5 :-1] memory_free uint32 BE bytes
section[-1] checksum (not data) section[-10:-8] battery × 100 uint16 BE (0x02A8 = 6.80 V)
section[-8 :-4] memory_total uint32 BE bytes
section[-4:] memory_free uint32 BE bytes
Values confirmed from 4-8-26/2ndtry capture (BE11529): Values confirmed from 4-8-26/2ndtry capture (BE11529):
battery 0x02A8 = 680 → 6.80 V battery 0x02A8 = 680 → 6.80 V
@@ -1780,32 +1783,30 @@ def _decode_monitor_status(data: bytes) -> MonitorStatus:
len(data), len(section), section.hex(), len(data), len(section), section.hex(),
) )
# Monitoring flag: section[6] (CORRECTED 2026-04-08 — was wrongly section[1]). # Monitoring flag: section[1] == 0x10.
# Byte diff of 2ndtry BW-S3 captures confirms section[6] flips 0x00↔0x10 # Confirmed from byte diff of all 144 0xE3 data frames in 4-8-26/2ndtry capture:
# exactly at the start/stop monitoring transitions (0xE3 frame #36 / #132). # section[1] = 0x00 in all IDLE frames, 0x10 in all MONITORING frames.
is_monitoring = len(section) > 6 and section[6] == 0x10 # (section[6] also changes but has non-binary values 0xea/0x07 — device-specific.)
is_monitoring = len(section) > 1 and section[1] == 0x10
battery_v = None battery_v = None
memory_total = None memory_total = None
memory_free = None memory_free = None
# Battery and memory offsets are RELATIVE TO THE END of the section. # Battery and memory at relative-from-end offsets.
# The payload length varies (5255+ bytes) depending on monitoring state and # Payload length varies (4649 bytes) but the battery/memory block is always
# internal counters, but the battery/memory block is always the last 10 bytes # the last 10 bytes. No checksum byte — it was stripped by S3FrameParser.
# before the checksum (section[-1]).
# #
# section[-11:-9] battery × 100 uint16 BE 0x02A8 = 6.80 V # section[-10:-8] battery × 100 uint16 BE 0x02A8 = 6.80 V
# section[-9 :-5] memory_total uint32 BE ≈ 960 KB on BE11529 # section[-8 :-4] memory_total uint32 BE ≈ 960 KB on BE11529
# section[-5 :-1] memory_free uint32 BE decreases as events fill # section[-4:] memory_free uint32 BE decreases as events fill
# section[-1] frame checksum (not data)
# #
# Confirmed stable across IDLE (52b), MONITORING (55b), and counter-jitter # Confirmed stable across IDLE (46b), MONITORING (48-49b) variants.
# IDLE variants (53b) from 4-8-26/2ndtry full capture analysis. if len(section) >= 10:
if len(section) >= 11: batt_raw = struct.unpack(">H", section[-10:-8])[0]
batt_raw = struct.unpack(">H", section[-11:-9])[0]
battery_v = batt_raw / 100.0 battery_v = batt_raw / 100.0
memory_total = struct.unpack(">I", section[-9:-5])[0] memory_total = struct.unpack(">I", section[-8:-4])[0]
memory_free = struct.unpack(">I", section[-5:-1])[0] memory_free = struct.unpack(">I", section[-4:])[0]
return MonitorStatus( return MonitorStatus(
is_monitoring=is_monitoring, is_monitoring=is_monitoring,