perf: monitor caches run state, ~halving live-feed latency

Each monitor poll was sending DOD? + Measure? (two commands), and the NL43
enforces >=1s between commands, so updates were ~2.5s apart. The run state
changes rarely, so cache it and refresh via Measure? only every
MONITOR_STATE_REFRESH_S (default 30s); most polls now send just DOD? (one
rate-limited command) -> ~1.3s/update. Also trim MONITOR_POLL_INTERVAL to
0.25s since the device rate-limit is the real pacer.

request_dod() gains an optional measurement_state arg: when supplied it
reuses that state and skips the Measure? round-trip; None preserves the old
query-every-time behavior.

~1Hz is the device floor for DOD (the >=1s command spacing); DRD's 10Hz
push isn't reachable via polling, but ~1s is a normal cadence for SLM levels.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 18:52:13 +00:00
parent 87c06f1519
commit 9d34779171
2 changed files with 35 additions and 12 deletions
+22 -4
View File
@@ -27,9 +27,14 @@ from app.alerts import alert_evaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Sleep between DOD polls. Note the 1s device rate-limit (and DOD?+Measure? per # Extra idle between DOD polls. The 1s device rate-limit already paces consecutive
# poll) already paces the effective rate to a few seconds; this is the extra idle. # DOD? commands, so this just needs to be small — the rate-limit is the real floor.
MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "1.0")) MONITOR_POLL_INTERVAL = float(os.getenv("MONITOR_POLL_INTERVAL", "0.25"))
# How often to refresh the run state (Measure?). It changes rarely, so we cache it
# and skip that second rate-limited command on most polls — roughly halving the
# per-update latency (~2.5s -> ~1.3s).
MONITOR_STATE_REFRESH_S = float(os.getenv("MONITOR_STATE_REFRESH_S", "30"))
# If nothing has been broadcast in this many seconds (e.g. device offline and # If nothing has been broadcast in this many seconds (e.g. device offline and
# silent), send a keepalive frame so reverse proxies don't drop the idle WS. # silent), send a keepalive frame so reverse proxies don't drop the idle WS.
@@ -70,6 +75,8 @@ class DeviceMonitor:
self._last_payload: Optional[dict] = None # replayed to new subscribers self._last_payload: Optional[dict] = None # replayed to new subscribers
self._consec_fail = 0 self._consec_fail = 0
self._reachable = True # last broadcast reachability (for transition frames) self._reachable = True # last broadcast reachability (for transition frames)
self._cached_state: Optional[str] = None # run state, refreshed periodically
self._last_state_refresh = 0.0
@property @property
def running(self) -> bool: def running(self) -> bool:
@@ -168,7 +175,18 @@ class DeviceMonitor:
ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password, ftp_username=cfg.ftp_username, ftp_password=cfg.ftp_password,
ftp_port=cfg.ftp_port or 21, ftp_port=cfg.ftp_port or 21,
) )
snap = await client.request_dod() # Refresh the run state only every MONITOR_STATE_REFRESH_S; reuse the
# cached state otherwise so most polls send just DOD? (one rate-limited
# command) instead of DOD? + Measure?.
now = asyncio.get_running_loop().time()
refresh_state = (self._cached_state is None
or now - self._last_state_refresh >= MONITOR_STATE_REFRESH_S)
snap = await client.request_dod(
measurement_state=None if refresh_state else self._cached_state
)
if refresh_state:
self._cached_state = snap.measurement_state
self._last_state_refresh = now
snap.unit_id = self.unit_id snap.unit_id = self.unit_id
persist_snapshot(snap, db) persist_snapshot(snap, db)
db.commit() db.commit()
+8 -3
View File
@@ -680,10 +680,12 @@ class NL43Client:
else: else:
raise ValueError(f"Unknown result code: {result_code}") raise ValueError(f"Unknown result code: {result_code}")
async def request_dod(self) -> NL43Snapshot: async def request_dod(self, measurement_state: Optional[str] = None) -> NL43Snapshot:
"""Request DOD (Data Output Display) snapshot from device. """Request DOD (Data Output Display) snapshot from device.
Returns parsed measurement data from the device display. Returns parsed measurement data from the device display. Pass
measurement_state to reuse a cached run state and skip the extra Measure?
round-trip (the state changes rarely); leave it None to query it.
""" """
# _send_command now handles result code validation and returns the data line # _send_command now handles result code validation and returns the data line
resp = await self._send_command("DOD?\r\n") resp = await self._send_command("DOD?\r\n")
@@ -706,7 +708,10 @@ class NL43Client:
logger.info(f"Parsed {len(parts)} data points from DOD response") logger.info(f"Parsed {len(parts)} data points from DOD response")
# Query actual measurement state (DOD doesn't include this information) # DOD doesn't include the run state. Query it only when not supplied by the
# caller — the monitor passes a cached state most cycles and refreshes it
# occasionally, avoiding a second rate-limited command on every poll.
if measurement_state is None:
try: try:
measurement_state = await self.get_measurement_state() measurement_state = await self.get_measurement_state()
except Exception as e: except Exception as e: