Add runbook for recovering wedged units and new scripts for device management

- Created a comprehensive runbook (`wedged_unit_recovery.md`) detailing the recovery process for units stuck in a call-home loop, including symptoms, recovery steps, and explanations of the failure mode.
- Added `blind_stop.sh` script to send stop-monitoring commands in a tight loop for unresponsive devices.
- Introduced `rescue_device.sh` script to disable Auto Call Home and erase events from a busy device.
- Implemented `slow_drip.sh` script to send stop-monitoring frames at a slow rate to prevent UART overrun.
- Developed `spam_stop.sh` script to rapidly send stop-monitoring commands to a device.
- Created `watch_unit.sh` script for passive monitoring of device reachability, logging results over time.
This commit is contained in:
2026-05-17 07:58:13 +00:00
parent ae7edac83f
commit 1fff8179d6
8 changed files with 1401 additions and 10 deletions
+69
View File
@@ -491,6 +491,75 @@ class SeismoDb:
)
return cur.rowcount > 0
def delete_event(self, event_id: str) -> Optional[dict]:
"""
Hard-delete one event row by id. Returns the deleted row (so the
caller can clean up any on-disk files referenced by it) or None
if no row matched.
"""
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM events WHERE id = ?", (event_id,),
).fetchone()
if row is None:
return None
conn.execute("DELETE FROM events WHERE id = ?", (event_id,))
return dict(row)
def delete_events_bulk(
self,
serial: Optional[str] = None,
from_dt: Optional[datetime.datetime] = None,
to_dt: Optional[datetime.datetime] = None,
false_trigger: Optional[bool] = None,
ids: Optional[list[str]] = None,
) -> list[dict]:
"""
Hard-delete events matching the given filters. Returns the list
of deleted row dicts. Refuses to delete with no filters at all
(would wipe the whole table) — raises ValueError.
Filter semantics match query_events: serial / from_dt / to_dt /
false_trigger combine with AND. `ids` is an additional inclusion
list (event_id IN (...)); if supplied alongside other filters,
only rows matching all conditions are deleted.
"""
clauses: list[str] = []
params: list = []
if serial:
clauses.append("serial = ?")
params.append(serial)
if from_dt:
clauses.append("timestamp >= ?")
params.append(from_dt.isoformat())
if to_dt:
clauses.append("timestamp <= ?")
params.append(to_dt.isoformat())
if false_trigger is not None:
clauses.append("false_trigger = ?")
params.append(1 if false_trigger else 0)
if ids:
placeholders = ",".join("?" * len(ids))
clauses.append(f"id IN ({placeholders})")
params.extend(ids)
if not clauses:
raise ValueError(
"delete_events_bulk refuses to delete with no filters "
"(would wipe the entire events table)"
)
where = "WHERE " + " AND ".join(clauses)
with self._connect() as conn:
rows = conn.execute(
f"SELECT * FROM events {where}", params,
).fetchall()
if rows:
conn.execute(f"DELETE FROM events {where}", params)
return [dict(r) for r in rows]
def update_event_review(self, event_id: str, review: dict) -> bool:
"""
Sync derived index columns from a sidecar's `review` block.
+728 -10
View File
@@ -36,6 +36,7 @@ from __future__ import annotations
import datetime
import logging
import socket
import sys
import tempfile
import threading
@@ -63,7 +64,9 @@ from minimateplus.protocol import ProtocolError
from minimateplus.models import CallHomeConfig, ComplianceConfig, DeviceInfo, Event, PeakValues, ProjectInfo, Timestamp
from minimateplus.transport import TcpTransport, DEFAULT_TCP_PORT
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform
from minimateplus.client import _decode_a5_metadata_into, _decode_a5_waveform, _decode_event_count
from minimateplus.framing import build_bw_write_frame, SESSION_RESET, POLL_PROBE, POLL_DATA
from minimateplus.protocol import SUB_STOP_MONITORING
from sfm import event_hdf5
from sfm.cache import SFMCache, get_cache
from sfm.database import SeismoDb
@@ -268,7 +271,8 @@ def _build_client(
baud: int,
host: Optional[str],
tcp_port: int,
timeout: float = 30.0,
timeout: float = 10.0,
connect_timeout: Optional[float] = None,
) -> MiniMateClient:
"""
Return a MiniMateClient configured for either serial or TCP transport.
@@ -276,12 +280,24 @@ def _build_client(
TCP takes priority if *host* is supplied; otherwise *port* (serial) is used.
Raises HTTPException(422) if neither is provided.
Default *timeout* is 10s — the device usually responds in well under a
second over cellular; 10s leaves comfortable headroom for retransmits
while still failing reasonably fast when a unit is wedged.
Use timeout=120.0 (or higher) for endpoints that perform a full 5A waveform
download — a 70-second event at 1024 sps takes 2-3 minutes to transfer over
cellular and each individual recv must complete within the timeout window.
*connect_timeout* (TCP only) overrides the TcpTransport default (10s) for
the initial TCP SYN/handshake. Use a small value (e.g. 5s) in rescue/race
scenarios where the device is busy in another session and you want to
fail fast and retry quickly.
"""
if host:
transport = TcpTransport(host, port=tcp_port)
if connect_timeout is not None:
transport = TcpTransport(host, port=tcp_port, connect_timeout=connect_timeout)
else:
transport = TcpTransport(host, port=tcp_port)
log.debug("TCP transport: %s:%d timeout=%.0fs", host, tcp_port, timeout)
return MiniMateClient(transport=transport, timeout=timeout)
elif port:
@@ -1095,13 +1111,23 @@ def device_monitor_status(
cached["_cached"] = True
return cached
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("monitor status poll retry: %s", exc)
client.poll()
status = client.get_monitor_status()
try:
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("monitor status poll retry: %s", exc)
client.poll()
status = client.get_monitor_status()
except HTTPException:
raise
except ProtocolError as exc:
# Includes minimateplus.protocol.TimeoutError ("device unresponsive").
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
result: dict = {"is_monitoring": status.is_monitoring}
if status.battery_v is not None:
@@ -1117,6 +1143,529 @@ def device_monitor_status(
return result
@app.get("/device/events/storage_range")
def device_events_storage_range(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the device's event storage range (SUB 0x06) — first and last
stored event keys. POLL handshake + one read; no connect(), no
config reads, no event walk. Completes in ~2 seconds.
Useful for checking whether the device has any stored events
without invoking the slow count_events() 1E/1F chain. Both keys =
`01110000` means the device is empty.
"""
log.info("GET /device/events/storage_range host=%s tcp_port=%s", host, tcp_port)
try:
def _do():
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("storage_range poll retry: %s", exc)
client.poll()
proto = client._require_proto()
return proto.read_event_storage_range()
rng = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
data = bytes(rng.data)
result: dict = {"raw_len": len(data), "raw_hex": data.hex()}
if len(data) >= 8:
first_key = data[-8:-4].hex()
last_key = data[-4:].hex()
result["first_key"] = first_key
result["last_key"] = last_key
result["is_empty"] = (first_key == "01110000" and last_key == "01110000")
return result
@app.get("/device/events/index")
def device_events_index(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Read the device's event index (SUB 0x08) — returns the lifetime
event counter at data[10:12] (uint16 BE). POLL handshake + one
read; no connect(), no config reads, no event walk. ~2 seconds.
Note: this is a LIFETIME counter (events ever recorded) — it does
NOT decrement when events are erased. After an erase, the device
counter resets to 0 only on the next recorded event. For "are
there stored events right now?" use /device/events/storage_range
instead.
"""
log.info("GET /device/events/index host=%s tcp_port=%s", host, tcp_port)
try:
def _do():
with _build_client(port=port, baud=baud, host=host, tcp_port=tcp_port) as client:
try:
client.poll()
except Exception as exc:
log.warning("event_index poll retry: %s", exc)
client.poll()
proto = client._require_proto()
return proto.read_event_index()
idx_raw = _run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
raw = bytes(idx_raw)
result: dict = {"raw_len": len(raw), "raw_hex": raw.hex()}
try:
result["lifetime_count"] = _decode_event_count(raw)
except Exception as exc:
result["decode_error"] = str(exc)
return result
@app.post("/device/events/erase")
def device_events_erase(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
) -> dict:
"""
Erase ALL stored events from the device memory.
Sequence: SUB 0xA3 → 0x1C → 0x06 → 0xA2 (confirmed 2026-04-11).
After this call the unit's event memory is empty and event keys reset
to 0x01110000. The device returns to its normal operating state
automatically — no restart-monitoring call is needed.
Note: this endpoint does NOT touch the ACH server's `ach_state.json`.
If a call-home subsequently lands on the ACH server, its post-erase
detection logic (max(device_keys) vs max_downloaded_key) handles the
key-counter rollback.
"""
log.info("POST /device/events/erase port=%s host=%s tcp_port=%s", port, host, tcp_port)
try:
def _do():
with _build_client(port, baud, host, tcp_port) as client:
client.connect()
client.delete_all_events()
_run_with_retry(_do, is_tcp=_is_tcp(host))
except HTTPException:
raise
except ProtocolError as exc:
raise HTTPException(status_code=502, detail=f"Protocol error: {exc}") from exc
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Connection error: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Device error: {exc}") from exc
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
cleared = get_cache().clear_device(conn_key)
return {
"status": "ok",
"message": "Device event memory cleared",
"cache_cleared": cleared,
}
@app.post("/device/stop_monitoring_blind")
def device_stop_monitoring_blind(
host: str = Query(..., description="TCP host — modem IP"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
connect_timeout: float = Query(5.0, description="TCP connect timeout in seconds (default 5)"),
repeat: int = Query(3, description="How many times to send the frame within one TCP session (default 3)"),
) -> dict:
"""
Fire-and-forget Stop Monitoring (SUB 0x97). TCP-only.
Opens a TCP session, dumps the FULL handshake the device's protocol
state machine expects — `SESSION_RESET + POLL_PROBE + SESSION_RESET +
POLL_DATA` — and then N back-to-back copies of the stop-monitoring
frame. Does NOT read any S3 response. Succeeds as long as the bytes
left the socket.
The POLL handshake bytes are required: monitoring units ignore command
frames received without a preceding POLL exchange. Sending the POLL
bytes "blind" (without reading the responses) still works because the
device processes inbound bytes in order regardless of whether we drain
its outbound buffer.
Idempotent: the device processes extra copies of SUB 0x97 the same as
one (already-stopped is a no-op).
Returns the number of bytes sent. A 503 means the TCP connect failed
(device busy in another session — caller should retry).
"""
log.info(
"POST /device/stop_monitoring_blind host=%s tcp_port=%s connect_timeout=%.1fs repeat=%d",
host, tcp_port, connect_timeout, repeat,
)
if repeat < 1:
repeat = 1
frame = build_bw_write_frame(SUB_STOP_MONITORING, b"")
payload = (
SESSION_RESET + POLL_PROBE
+ SESSION_RESET + POLL_DATA
+ (frame * repeat)
)
t0 = time.monotonic()
transport = TcpTransport(host, port=tcp_port, connect_timeout=connect_timeout)
try:
transport.connect()
except OSError as exc:
raise HTTPException(status_code=503, detail=f"Connection error: {exc}") from exc
try:
transport.write(payload)
except OSError as exc:
transport.disconnect()
raise HTTPException(status_code=502, detail=f"Send error: {exc}") from exc
finally:
transport.disconnect()
return {
"status": "sent",
"bytes_sent": len(payload),
"frame_size": len(frame),
"repeat": repeat,
"elapsed_s": round(time.monotonic() - t0, 3),
}
@app.post("/device/stop_monitoring_slow_drip")
def device_stop_monitoring_slow_drip(
host: str = Query(..., description="TCP host — modem IP"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
duration_s: float = Query(120.0, description="Total time to hold the session open (seconds)"),
interval_s: float = Query(3.0, description="Seconds between drip sends"),
connect_timeout: float = Query(5.0, description="TCP connect timeout"),
) -> dict:
"""
Hold a single TCP session open for *duration_s* seconds and drip
stop-monitoring frames into the device at a slow rate so its UART
RX FIFO has time to drain between sends.
Sequence:
1. Open TCP session.
2. Send the wake preamble: SESSION_RESET + POLL_PROBE +
SESSION_RESET + POLL_DATA (so the device's protocol parser
is primed for a write command).
3. Wait interval_s for the device to drain.
4. Drip-send (SESSION_RESET + stop_monitoring_frame) every
interval_s until duration_s elapses.
5. Opportunistically drain any bytes the device sends back (so
the modem's TX queue doesn't fill up). Successful drains are
counted in `bytes_received` — non-zero strongly suggests the
device has started responding to us.
6. Close.
Designed for units whose firmware is too busy with event-recording
to keep up with high-rate spam. Heavy spam overruns the UART FIFO;
slow drip stays under it.
Compared to spam mode: ~40× fewer bytes/sec on the wire, but each
byte has a much higher chance of actually being parsed.
"""
log.info(
"POST /device/stop_monitoring_slow_drip host=%s tcp_port=%s duration=%.1fs interval=%.2fs connect_timeout=%.1fs",
host, tcp_port, duration_s, interval_s, connect_timeout,
)
duration_s = max(1.0, min(duration_s, 600.0)) # clamp 1s..10min
interval_s = max(0.1, min(interval_s, 30.0))
connect_timeout = max(0.1, connect_timeout)
stop_frame = build_bw_write_frame(SUB_STOP_MONITORING, b"")
preamble = (
SESSION_RESET + POLL_PROBE
+ SESSION_RESET + POLL_DATA
)
t0 = time.monotonic()
drips_sent = 0
bytes_sent = 0
bytes_received = 0
try:
sock = socket.create_connection((host, tcp_port), timeout=connect_timeout)
except OSError as exc:
raise HTTPException(status_code=503, detail=f"Connection error: {exc}") from exc
# Short read timeout so opportunistic drains don't block.
sock.settimeout(0.1)
try:
# Initial wake preamble.
try:
sock.sendall(preamble)
bytes_sent += len(preamble)
except OSError as exc:
raise HTTPException(status_code=502, detail=f"Preamble send failed: {exc}") from exc
# Initial settle.
time.sleep(interval_s)
# Try a non-blocking drain of any response to the wake.
try:
data = sock.recv(4096)
if data:
bytes_received += len(data)
log.info("slow_drip: device responded to wake preamble (%d bytes)", len(data))
except socket.timeout:
pass
except OSError:
pass
deadline = t0 + duration_s
drip = SESSION_RESET + stop_frame # 2 + 21 = 23 bytes per drip
send_error: Optional[str] = None
while time.monotonic() < deadline:
try:
sock.sendall(drip)
bytes_sent += len(drip)
drips_sent += 1
except OSError as exc:
send_error = f"{exc}"
log.warning("slow_drip: send failed after %d drips: %s", drips_sent, exc)
break
# Drain any inbound bytes; ignore timeouts.
try:
data = sock.recv(4096)
if data:
bytes_received += len(data)
except socket.timeout:
pass
except OSError:
pass
# Sleep the interval, but don't oversleep past the deadline.
remaining = deadline - time.monotonic()
if remaining <= 0:
break
time.sleep(min(interval_s, remaining))
finally:
try:
sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
try:
sock.close()
except OSError:
pass
elapsed = time.monotonic() - t0
log.info(
"slow_drip done — drips=%d bytes_sent=%d bytes_received=%d in %.1fs",
drips_sent, bytes_sent, bytes_received, elapsed,
)
return {
"status": "done",
"duration_s": round(elapsed, 2),
"drips_sent": drips_sent,
"bytes_sent": bytes_sent,
"bytes_received": bytes_received,
"preamble_bytes": len(preamble),
"drip_bytes": len(drip),
"send_error": send_error,
}
@app.post("/device/stop_monitoring_spam")
def device_stop_monitoring_spam(
host: str = Query(..., description="TCP host — modem IP"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
duration_s: float = Query(10.0, description="How long to hammer the device for (seconds)"),
connect_timeout: float = Query(0.5, description="Per-attempt TCP connect timeout (default 0.5s)"),
repeat: int = Query(3, description="Stop frames per TCP session (default 3)"),
) -> dict:
"""
Hammer the device with blind stop-monitoring sessions as fast as
possible for `duration_s` seconds. Each attempt: open TCP → write
SESSION_RESET + POLL handshake + STOP frames × repeat → close. No
response is read.
Designed for units that are aggressively calling home — short
connect_timeout (default 500 ms) means every failed attempt loses
only that much time before retrying, so we can fit several attempts
per second even when the modem is mostly busy with its own outbound
sessions.
Single HTTP call kicks off the whole burst; counters are returned
when it finishes. No streaming; if you want live progress, watch
SFM logs.
"""
log.info(
"POST /device/stop_monitoring_spam host=%s tcp_port=%s duration=%.1fs connect_timeout=%.3fs repeat=%d",
host, tcp_port, duration_s, connect_timeout, repeat,
)
if repeat < 1:
repeat = 1
duration_s = max(0.1, min(duration_s, 300.0)) # clamp 0.1s..5min
connect_timeout = max(0.05, connect_timeout)
frame = build_bw_write_frame(SUB_STOP_MONITORING, b"")
payload = (
SESSION_RESET + POLL_PROBE
+ SESSION_RESET + POLL_DATA
+ (frame * repeat)
)
t0 = time.monotonic()
deadline = t0 + duration_s
sent_ok = 0
connect_failed = 0
write_failed = 0
while time.monotonic() < deadline:
try:
sock = socket.create_connection((host, tcp_port), timeout=connect_timeout)
except OSError:
connect_failed += 1
continue
try:
sock.sendall(payload)
sent_ok += 1
except OSError:
write_failed += 1
finally:
try:
sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
try:
sock.close()
except OSError:
pass
elapsed = time.monotonic() - t0
total = sent_ok + connect_failed + write_failed
log.info(
"stop_monitoring_spam done — sent=%d connect_failed=%d write_failed=%d in %.2fs",
sent_ok, connect_failed, write_failed, elapsed,
)
return {
"status": "done",
"duration_s": round(elapsed, 2),
"sent_ok": sent_ok,
"connect_failed": connect_failed,
"write_failed": write_failed,
"total_attempts": total,
"rate_attempts_per_s": round(total / elapsed, 1) if elapsed > 0 else 0,
"payload_bytes": len(payload),
}
@app.post("/device/rescue")
def device_rescue(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
baud: int = Query(38400, description="Serial baud rate"),
host: Optional[str] = Query(None, description="TCP host — modem IP or ACH relay"),
tcp_port: int = Query(DEFAULT_TCP_PORT, description=f"TCP port (default {DEFAULT_TCP_PORT})"),
connect_timeout: float = Query(5.0, description="TCP connect timeout in seconds (default 5)"),
recv_timeout: float = Query(5.0, description="Per-frame S3 recv timeout in seconds (default 5)"),
disable_ach: bool = Query(True, description="Disable Auto Call Home on the device before erasing"),
erase: bool = Query(True, description="Erase all stored events after disabling ACH"),
) -> dict:
"""
Rescue an uncooperative unit by squeezing all maintenance work into a
single TCP session.
Designed for devices that are actively calling home to a separate ACH
server (BW or otherwise). While we hold this TCP session open the
modem cannot accept an inbound ACH call, so the order matters:
1. Short-timeout TCP connect (fails fast if the device is busy in
another session — the caller should retry in a tight loop).
2. POLL handshake.
3. (optional) Write call_home config with auto_call_home_enabled=false
so the device stops calling out even after we drop the session.
4. (optional) Erase all stored events (0xA3 → 0x1C → 0x06 → 0xA2).
5. Close the TCP session.
Both `disable_ach` and `erase` default to true. Pass `?erase=false` if
you only want to silence the unit without wiping its events.
Caller pattern (bash):
until curl -sS --max-time 30 -X POST \\
"http://localhost:8001/api/sfm/device/rescue?host=$IP&tcp_port=$P"; do
sleep 1
done
"""
log.info(
"POST /device/rescue host=%s tcp_port=%s connect_timeout=%.1fs recv_timeout=%.1fs disable_ach=%s erase=%s",
host, tcp_port, connect_timeout, recv_timeout, disable_ach, erase,
)
steps: list[dict] = []
t0 = time.monotonic()
try:
with _build_client(
port, baud, host, tcp_port,
timeout=recv_timeout,
connect_timeout=connect_timeout,
) as client:
steps.append({"step": "tcp_connect", "ok": True, "elapsed_s": round(time.monotonic() - t0, 2)})
try:
client.poll()
except Exception as exc:
log.warning("rescue: poll retry: %s", exc)
client.poll()
steps.append({"step": "poll", "ok": True, "elapsed_s": round(time.monotonic() - t0, 2)})
if disable_ach:
client.set_call_home_config(auto_call_home_enabled=False)
steps.append({"step": "disable_ach", "ok": True, "elapsed_s": round(time.monotonic() - t0, 2)})
if erase:
client.delete_all_events()
steps.append({"step": "erase", "ok": True, "elapsed_s": round(time.monotonic() - t0, 2)})
except ProtocolError as exc:
steps.append({"step": "error", "ok": False, "detail": f"protocol: {exc}"})
raise HTTPException(status_code=502, detail={"message": f"Protocol error: {exc}", "steps": steps}) from exc
except OSError as exc:
steps.append({"step": "error", "ok": False, "detail": f"socket: {exc}"})
# Connection refused / timed out → device busy in another session. Caller should retry.
raise HTTPException(status_code=503, detail={"message": f"Connection error: {exc}", "steps": steps}) from exc
except Exception as exc:
steps.append({"step": "error", "ok": False, "detail": str(exc)})
raise HTTPException(status_code=500, detail={"message": f"Device error: {exc}", "steps": steps}) from exc
conn_key = SFMCache.make_conn_key(host, tcp_port, port, baud)
cleared = get_cache().clear_device(conn_key)
return {
"status": "ok",
"elapsed_s": round(time.monotonic() - t0, 2),
"disable_ach": disable_ach,
"erase": erase,
"steps": steps,
"cache_cleared": cleared,
}
@app.post("/device/monitor/start")
def device_monitor_start(
port: Optional[str] = Query(None, description="Serial port (e.g. COM5)"),
@@ -1403,6 +1952,175 @@ def db_set_false_trigger(
return {"status": "ok", "event_id": event_id, "false_trigger": value}
def _cleanup_event_files(row: dict) -> dict:
"""
Best-effort cleanup of on-disk waveform / sidecar / pickle / hdf5 files
associated with a deleted event row. Returns a dict of {kind: bool} for
what was actually removed (true) vs not found / failed (false).
"""
serial = row.get("serial")
bw_name = row.get("blastware_filename")
a5_name = row.get("a5_pickle_filename")
sc_name = row.get("sidecar_filename")
removed: dict = {}
if not serial:
return removed
store = _get_store()
# blastware_filename is the "base" — other files derive their paths from it
# via WaveformStore helpers. Sidecar and a5 may also be stored under their
# own column values if they ever diverged historically.
base_name = bw_name or a5_name or sc_name
if base_name:
bw_path, a5_path = store.paths_for(serial, base_name)
sc_path = store.sidecar_path_for(serial, base_name)
h5_path = store.hdf5_path_for(serial, base_name)
for kind, p in [("blastware", bw_path), ("a5_pickle", a5_path),
("sidecar", sc_path), ("hdf5", h5_path)]:
try:
if p.exists():
p.unlink()
removed[kind] = True
except OSError as exc:
log.warning("file cleanup failed for %s (%s): %s", p, kind, exc)
removed[kind] = False
return removed
@app.delete("/db/events/{event_id}")
def db_delete_event(event_id: str) -> dict:
"""
Hard-delete a single event from the SFM events table and remove any
associated on-disk waveform/sidecar/pickle/hdf5 files.
Returns 404 if the event_id is not found.
"""
log.info("DELETE /db/events/%s", event_id)
deleted = _get_db().delete_event(event_id)
if deleted is None:
raise HTTPException(status_code=404, detail=f"Event {event_id} not found")
files_removed = _cleanup_event_files(deleted)
return {
"status": "ok",
"event_id": event_id,
"files_removed": files_removed,
}
class BulkDeleteBody(BaseModel):
"""Body for POST /db/events/delete_bulk."""
serial: Optional[str] = None
from_dt: Optional[str] = None # ISO-8601
to_dt: Optional[str] = None # ISO-8601
false_trigger: Optional[bool] = None
ids: Optional[list[str]] = None
confirm: bool = False
# Safety: when no `ids` are supplied, require this many max rows to
# actually be deleted; if the matched count exceeds it, the endpoint
# returns a dry-run-style summary instead. Pass None to disable.
max_rows: Optional[int] = 10000
@app.post("/db/events/delete_bulk")
def db_delete_events_bulk(body: BulkDeleteBody) -> dict:
"""
Hard-delete multiple events at once, by filter and/or by id list.
Filters (`serial`, `from_dt`, `to_dt`, `false_trigger`) combine with AND,
matching the same semantics as `GET /db/events`. `ids` is an additional
inclusion list. At least one filter or non-empty `ids` MUST be supplied
— refusing to wipe the whole table.
Safety knobs:
- `confirm` MUST be `true` to actually delete. When false (default),
returns the match count without deleting (dry-run).
- `max_rows` (default 10,000) caps how many rows can be deleted in one
call by-filter; if the match count exceeds it, the endpoint returns
a count summary without deleting. Ignored when only `ids` is used.
Returns:
{
"status": "ok" | "dry_run" | "too_many",
"matched": <int>,
"deleted": <int>, # 0 unless status == "ok"
"files_removed": <int>, # total file unlink successes
"sample_serials": [...], # up to 5 distinct serials touched
}
"""
log.info(
"POST /db/events/delete_bulk serial=%s from=%s to=%s ft=%s ids=%d confirm=%s max=%s",
body.serial, body.from_dt, body.to_dt, body.false_trigger,
len(body.ids or []), body.confirm, body.max_rows,
)
from_parsed = datetime.datetime.fromisoformat(body.from_dt) if body.from_dt else None
to_parsed = datetime.datetime.fromisoformat(body.to_dt) if body.to_dt else None
db = _get_db()
# Dry-run path: count matches without deleting.
rows = db.query_events(
serial=body.serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=body.false_trigger,
limit=1_000_000, # we want a true count, not a page
offset=0,
)
if body.ids:
id_set = set(body.ids)
rows = [r for r in rows if r["id"] in id_set]
matched = len(rows)
sample_serials = sorted({r.get("serial") for r in rows[:50] if r.get("serial")})[:5]
if not body.confirm:
return {
"status": "dry_run",
"matched": matched,
"deleted": 0,
"files_removed": 0,
"sample_serials": sample_serials,
"hint": "Set confirm=true in the request body to actually delete.",
}
if body.max_rows is not None and not body.ids and matched > body.max_rows:
return {
"status": "too_many",
"matched": matched,
"deleted": 0,
"files_removed": 0,
"sample_serials": sample_serials,
"hint": (
f"Matched {matched} > max_rows={body.max_rows}. Either raise "
f"max_rows in the body, narrow the filter, or supply an "
f"explicit `ids` list."
),
}
try:
deleted_rows = db.delete_events_bulk(
serial=body.serial,
from_dt=from_parsed,
to_dt=to_parsed,
false_trigger=body.false_trigger,
ids=body.ids,
)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
files_removed = 0
for row in deleted_rows:
result = _cleanup_event_files(row)
files_removed += sum(1 for ok in result.values() if ok)
return {
"status": "ok",
"matched": matched,
"deleted": len(deleted_rows),
"files_removed": files_removed,
"sample_serials": sample_serials,
}
# ── /db/events/{id} — waveform file accessors ─────────────────────────────────
#
# These endpoints serve files from the persistent WaveformStore, so a Blastware