feat: implement set_project_info functionality and add POC test script

This commit is contained in:
2026-04-07 02:49:17 -04:00
parent bcc044655a
commit 7005ae766d
4 changed files with 283 additions and 1 deletions
+104
View File
@@ -54,6 +54,24 @@ from .transport import SerialTransport, BaseTransport
log = logging.getLogger(__name__)
# ── Module-level constants ────────────────────────────────────────────────────
# Trigger config payload hardcoded from 3-11-26 BW TX capture (BW frame 108).
# No SUB 0x22 read exists in the capture — Blastware writes this fixed blob.
# 29 bytes: [00][1A][D5][00][00][10][03][08][0A] + 18×FF + [00][00]
_TRIGGER_DATA_HARDCODED: bytes = bytes.fromhex(
"001ad500001003080a"
"ffffffffffffffffffffffffffffffffffffff"
"0000"
)
# Compliance ASCII slot format (confirmed from 3-11-26 capture + label search):
# Each label occupies a 64-byte slot.
# Value starts at slot_start + 22, max 42 bytes, null-padded.
_COMPLIANCE_SLOT_SIZE = 64
_COMPLIANCE_VALUE_OFFSET = 22
_COMPLIANCE_VALUE_MAX = _COMPLIANCE_SLOT_SIZE - _COMPLIANCE_VALUE_OFFSET # 42
# ── MiniMateClient ────────────────────────────────────────────────────────────
@@ -599,6 +617,92 @@ class MiniMateClient:
log.info("push_config_raw: complete")
def set_project_info(
self,
project: Optional[str] = None,
client_name: Optional[str] = None,
operator: Optional[str] = None,
seis_loc: Optional[str] = None,
notes: Optional[str] = None,
) -> None:
"""
POC: Read the current config from the device, patch ASCII project fields
in the compliance block, and write the modified config back.
Only fields passed as non-None are modified. All other bytes are
round-tripped verbatim.
Label slot format confirmed from 3-11-26 BW TX capture:
- Compliance buffer is 2126 bytes (three E5 data frames concatenated).
- Each label ("Project:", "Client:", etc.) anchors a 64-byte slot.
- ASCII value starts at slot_start + 22, max 42 bytes, null-padded.
Known labels and their confirmed positions in the 2126-byte buffer:
b"Project:" → value at 125 + 22 = 147
b"Client:" → value at 189 + 22 = 211
b"User Name:" → value at 253 + 22 = 275
b"Seis Loc:" → value at 317 + 22 = 339
b"Extended Notes" → value at 381 + 22 = 403
Write payloads:
event_index_data : 88 bytes — read live from device via SUB 08
compliance_data : 2128 bytes — 2126 read bytes + 2-byte footer \\x00\\x00
(checksum formula unknown for POC; device may ignore it)
trigger_data : 29 bytes — hardcoded from 3-11-26 capture
waveform_data : 204 bytes — read live from device via SUB 09
Raises:
RuntimeError: if not connected.
ProtocolError: if any read or write step fails.
"""
proto = self._require_proto()
# 1. Read current payloads from the device
log.info("set_project_info: reading event index (SUB 08)")
event_index_data = proto.read_event_index() # 88 bytes
log.info("set_project_info: reading compliance config (SUB 1A)")
compliance_raw = bytearray(proto.read_compliance_config()) # 2126 bytes
log.info("set_project_info: reading waveform data (SUB 09)")
waveform_data = proto.read_waveform_data_raw() # 204 bytes
trigger_data = _TRIGGER_DATA_HARDCODED # 29 bytes
# 2. Patch ASCII fields inside the compliance buffer
def _set_field(label: bytes, value: Optional[str]) -> None:
if value is None:
return
idx = compliance_raw.find(label)
if idx < 0:
log.warning(
"set_project_info: label %r not found in compliance data", label
)
return
val_bytes = value.encode("ascii", errors="replace")[: _COMPLIANCE_VALUE_MAX - 1]
padded = val_bytes + b"\x00" * (_COMPLIANCE_VALUE_MAX - len(val_bytes))
compliance_raw[idx + _COMPLIANCE_VALUE_OFFSET : idx + _COMPLIANCE_SLOT_SIZE] = padded
log.debug(
"set_project_info: set %r%r (at offset %d)", label, value, idx
)
_set_field(b"Project:", project)
_set_field(b"Client:", client_name)
_set_field(b"User Name:", operator)
_set_field(b"Seis Loc:", seis_loc)
_set_field(b"Extended Notes", notes)
# 3. Append 2-byte footer (checksum formula unknown; \x00\x00 for POC)
compliance_data = bytes(compliance_raw) + b"\x00\x00" # 2128 bytes
log.info(
"set_project_info: compliance payload ready (%d bytes)", len(compliance_data)
)
# 4. Push the full write sequence to the device
self.push_config_raw(event_index_data, compliance_data, trigger_data, waveform_data)
log.info("set_project_info: complete")
# ── Internal helpers ──────────────────────────────────────────────────────
def _require_proto(self) -> MiniMateProtocol:
+34
View File
@@ -413,6 +413,40 @@ class MiniMateProtocol:
)
return header_bytes, length
def read_waveform_data_raw(self) -> bytes:
"""
Send the SUB 09 (WAVEFORM_DATA) two-step read and return the raw
202-byte (0xCA) waveform data block.
This is the "waveform data" block that Blastware reads from the device
before the write sequence (confirmed from 3-11-26 BW TX capture BW[80-81]).
The returned bytes are used verbatim as the ``waveform_data`` payload for
``write_waveform_data()`` / ``push_config_raw()``.
Returns:
Raw data section starting at data[11:], typically 204 bytes.
(data[11 : 11 + 0xCA] = 202 bytes on some firmware; the actual
length may be 204 depending on firmware version.)
Raises:
ProtocolError: on timeout, bad checksum, or wrong response SUB.
"""
SUB_WAVEFORM_DATA = 0x09
rsp_sub = _expected_rsp_sub(SUB_WAVEFORM_DATA) # 0xFF - 0x09 = 0xF6
length = DATA_LENGTHS[SUB_WAVEFORM_DATA] # 0xCA = 202
log.debug("read_waveform_data_raw: 09 probe")
self._send(build_bw_frame(SUB_WAVEFORM_DATA, 0))
self._recv_one(expected_sub=rsp_sub)
log.debug("read_waveform_data_raw: 09 data request offset=0x%02X", length)
self._send(build_bw_frame(SUB_WAVEFORM_DATA, length))
data_rsp = self._recv_one(expected_sub=rsp_sub)
raw = data_rsp.data[11:]
log.debug("read_waveform_data_raw: got %d bytes", len(raw))
return raw
def read_waveform_record(self, key4: bytes) -> bytes:
"""
Send the SUB 0C (WAVEFORM_RECORD / FULL_WAVEFORM_RECORD) two-step read.
+68
View File
@@ -0,0 +1,68 @@
"""
poc_set_project.py — POC test for set_project_info() against a live MiniMate Plus.
Usage:
python poc_set_project.py [--host IP] [--port PORT]
Default target: BE11529 at 63.43.212.232:9034
"""
import argparse
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-7s %(name)s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("poc_set_project")
from minimateplus import MiniMateClient
from minimateplus.transport import TcpTransport
DEFAULT_HOST = "63.43.212.232"
DEFAULT_PORT = 9034
def main() -> None:
ap = argparse.ArgumentParser(description="POC: write project info to MiniMate Plus")
ap.add_argument("--host", default=DEFAULT_HOST, help="Modem IP address")
ap.add_argument("--port", type=int, default=DEFAULT_PORT, help="TCP port")
ap.add_argument("--project", default="POC Write Test")
ap.add_argument("--client-name", default="Terra-Mechanics Inc.")
ap.add_argument("--operator", default="B. Harrison")
ap.add_argument("--seis-loc", default="Lab Bench - POC")
ap.add_argument("--notes", default="set_project_info POC 2026-04-07")
args = ap.parse_args()
log.info("Connecting to %s:%d", args.host, args.port)
transport = TcpTransport(args.host, port=args.port)
with MiniMateClient(transport=transport, timeout=60.0) as client:
log.info("Performing POLL handshake + identity read …")
info = client.connect()
log.info("Connected: serial=%s firmware=%s", info.serial, info.firmware_version)
log.info("Calling set_project_info() …")
client.set_project_info(
project=args.project,
client_name=args.client_name,
operator=args.operator,
seis_loc=args.seis_loc,
notes=args.notes,
)
log.info("set_project_info() returned — write sequence complete")
log.info("Done. Reconnect Blastware to verify the fields were written.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)
except Exception as exc:
log.exception("Fatal: %s", exc)
sys.exit(1)
+77 -1
View File
@@ -40,9 +40,10 @@ from typing import Optional
# FastAPI / Pydantic
try:
from fastapi import FastAPI, HTTPException, Query
from fastapi import Body, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import uvicorn
except ImportError:
print(
@@ -475,6 +476,81 @@ def device_event_waveform(
}
# ── Write endpoints ───────────────────────────────────────────────────────────
class ProjectInfoBody(BaseModel):
"""Request body for POST /device/config/project."""
project: Optional[str] = None
client_name: Optional[str] = None
operator: Optional[str] = None
seis_loc: Optional[str] = None
notes: Optional[str] = None
@app.post("/device/config/project")
def device_config_project(
body: ProjectInfoBody,
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
POC: Read the current device config, patch ASCII project/client/operator/
sensor-location/notes fields, and write the modified config back.
Only fields included in the JSON body (non-null) are modified. All other
bytes are round-tripped verbatim from the device.
Supply either *port* (serial) or *host* (TCP/modem).
Example body:
{
"project": "Bridge Inspection 2026",
"client_name": "City of Portland",
"operator": "Brian Harrison",
"seis_loc": "South Abutment",
"notes": "Pre-blast baseline"
}
Returns:
{"status": "ok", "message": "..."} on success.
Raises:
502 on protocol errors (timeout, bad ack, etc.).
422 if neither port nor host is provided.
"""
log.info(
"POST /device/config/project port=%s host=%s body=%s",
port, host, body.model_dump(exclude_none=True),
)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
client.set_project_info(
project=body.project,
client_name=body.client_name,
operator=body.operator,
seis_loc=body.seis_loc,
notes=body.notes,
)
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
changed = body.model_dump(exclude_none=True)
msg = "Config written. Fields updated: " + (", ".join(changed.keys()) or "none")
return {"status": "ok", "message": msg, "updated_fields": changed}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":