feat: Add monitoring functionality to MiniMate protocol and web interface

- Introduced new SUBs for monitoring status, start, and stop commands in protocol.py.
- Implemented read_monitor_status, start_monitoring, and stop_monitoring methods in MiniMateProtocol class.
- Added new API endpoints for monitoring status retrieval and control in server.py.
- Enhanced the web application with a monitoring panel, including battery and memory status display.
- Created a new Python script to parse SUB 0x1C response frames for monitoring status.
- Documented the monitoring status response format and field locations in markdown and text files.
This commit is contained in:
2026-04-08 14:34:42 -04:00
parent 8545daac04
commit a41e7a9e1a
9 changed files with 1121 additions and 10 deletions
+82
View File
@@ -614,6 +614,88 @@ def device_config_project(
return device_config(body=body, port=port, baud=baud, host=host, tcp_port=tcp_port)
# ── Monitoring endpoints ───────────────────────────────────────────────────────
@app.get("/device/monitor/status")
def device_monitor_status(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read monitoring status from the device.
Returns is_monitoring bool, battery voltage, and memory usage (total + free bytes).
Battery and memory are only present when the unit is idle (not monitoring).
"""
transport = _make_transport(port=port, baud=baud, host=host, tcp_port=tcp_port)
with MiniMateClient(transport=transport) as client:
try:
client.connect()
except Exception as exc:
log.warning("monitor status connect retry: %s", exc)
client.connect()
status = client.get_monitor_status()
result: dict = {"is_monitoring": status.is_monitoring}
if status.battery_v is not None:
result["battery_v"] = round(status.battery_v, 2)
if status.memory_total is not None:
result["memory_total_bytes"] = status.memory_total
result["memory_total_kb"] = round(status.memory_total / 1024, 1)
if status.memory_free is not None:
result["memory_free_bytes"] = status.memory_free
result["memory_free_kb"] = round(status.memory_free / 1024, 1)
return result
@app.post("/device/monitor/start")
def device_monitor_start(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to start monitoring (recording triggered events).
Sends SUB 0x96 and waits for ack SUB 0x69.
"""
transport = _make_transport(port=port, baud=baud, host=host, tcp_port=tcp_port)
with MiniMateClient(transport=transport) as client:
try:
client.connect()
except Exception as exc:
log.warning("start monitoring connect retry: %s", exc)
client.connect()
client.start_monitoring()
return {"status": "started"}
@app.post("/device/monitor/stop")
def device_monitor_stop(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Command the device to stop monitoring.
Sends SUB 0x97 and waits for ack SUB 0x68.
"""
transport = _make_transport(port=port, baud=baud, host=host, tcp_port=tcp_port)
with MiniMateClient(transport=transport) as client:
try:
client.connect()
except Exception as exc:
log.warning("stop monitoring connect retry: %s", exc)
client.connect()
client.stop_monitoring()
return {"status": "stopped"}
# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":