""" models.py β€” Plain-Python data models for the MiniMate Plus protocol library. All models are intentionally simple dataclasses with no protocol logic. They represent *decoded* device data β€” the client layer translates raw frame bytes into these objects, and the SFM API layer serialises them to JSON. Notes on certainty: Fields marked βœ… are confirmed from captured data. Fields marked πŸ”Ά are strongly inferred but not formally proven. Fields marked ❓ are present in the captured payload but not yet decoded. See docs/instantel_protocol_reference.md for full derivation details. """ from __future__ import annotations import struct from dataclasses import dataclass, field from typing import Optional # ── Timestamp ───────────────────────────────────────────────────────────────── @dataclass class Timestamp: """ Event timestamp decoded from the MiniMate Plus wire format. Two source formats exist: 1. 6-byte format (from event index / 1E header β€” not yet decoded in client): [flag:1] [year:2 BE] [unknown:1] [month:1] [day:1] Use Timestamp.from_bytes(). 2. 9-byte format (from Full Waveform Record / 0C, bytes 0–8) βœ… CONFIRMED: [day:1] [sub_code:1] [month:1] [year:2 BE] [unknown:1] [hour:1] [min:1] [sec:1] Use Timestamp.from_waveform_record(). Confirmed 2026-04-01 against Blastware event report (BE11529 thump event): raw bytes: 01 10 04 07 ea 00 00 1c 0c β†’ day=1, sub_code=0x10 (Waveform mode), month=4, year=2026, hour=0, minute=28, second=12 ← matches Blastware "00:28:12 April 1, 2026" The sub_code at byte[1] is the record-mode indicator: 0x10 β†’ Waveform (continuous / single-shot) βœ… other β†’ Histogram (code not yet captured ❓) The year 1995 is the device's factory-default RTC date β€” it appears whenever the battery has been disconnected. Treat 1995 as "clock not set". """ raw: bytes # raw bytes for round-tripping flag: int # byte 0 of 6-byte format, or sub_code from 9-byte format year: int # βœ… unknown_byte: int # separator byte (purpose unclear ❓) month: int # βœ… day: int # βœ… # Time fields β€” populated only from the 9-byte waveform-record format hour: Optional[int] = None # βœ… (waveform record format) minute: Optional[int] = None # βœ… (waveform record format) second: Optional[int] = None # βœ… (waveform record format) @classmethod def from_bytes(cls, data: bytes) -> "Timestamp": """ Decode a 6-byte timestamp (6-byte event-index format). Args: data: exactly 6 bytes from the device payload. Returns: Decoded Timestamp (no time fields). Raises: ValueError: if data is not exactly 6 bytes. """ if len(data) != 6: raise ValueError(f"Timestamp requires exactly 6 bytes, got {len(data)}") flag = data[0] year = struct.unpack_from(">H", data, 1)[0] unknown_byte = data[3] month = data[4] day = data[5] return cls( raw=bytes(data), flag=flag, year=year, unknown_byte=unknown_byte, month=month, day=day, ) @classmethod def from_waveform_record(cls, data: bytes) -> "Timestamp": """ Decode a 9-byte timestamp from the first bytes of a 210-byte waveform record (SUB 0C / Full Waveform Record response). Wire layout (βœ… CONFIRMED 2026-04-01 against Blastware event report): byte[0]: day (uint8) byte[1]: sub_code / mode flag (0x10 = Waveform mode) πŸ”Ά byte[2]: month (uint8) bytes[3–4]: year (big-endian uint16) byte[5]: unknown (0x00 in all observed samples ❓) byte[6]: hour (uint8) byte[7]: minute (uint8) byte[8]: second (uint8) Args: data: at least 9 bytes; only the first 9 are consumed. Returns: Decoded Timestamp with hour/minute/second populated. Raises: ValueError: if data is fewer than 9 bytes. """ if len(data) < 9: raise ValueError( f"Waveform record timestamp requires at least 9 bytes, got {len(data)}" ) day = data[0] sub_code = data[1] # 0x10 = Waveform; histogram code not yet confirmed month = data[2] year = struct.unpack_from(">H", data, 3)[0] unknown_byte = data[5] hour = data[6] minute = data[7] second = data[8] return cls( raw=bytes(data[:9]), flag=sub_code, year=year, unknown_byte=unknown_byte, month=month, day=day, hour=hour, minute=minute, second=second, ) @property def clock_set(self) -> bool: """False when year == 1995 (factory default / battery-lost state).""" return self.year != 1995 def __str__(self) -> str: if not self.clock_set: return f"CLOCK_NOT_SET ({self.year}-{self.month:02d}-{self.day:02d})" date_str = f"{self.year}-{self.month:02d}-{self.day:02d}" if self.hour is not None: return f"{date_str} {self.hour:02d}:{self.minute:02d}:{self.second:02d}" return date_str # ── Device identity ─────────────────────────────────────────────────────────── @dataclass class DeviceInfo: """ Combined device identity information gathered during the startup sequence. Populated from three response SUBs: - SUB EA (SERIAL_NUMBER_RESPONSE): serial, firmware_minor - SUB FE (FULL_CONFIG_RESPONSE): serial (repeat), firmware_version, dsp_version, manufacturer, model - SUB A4 (POLL_RESPONSE): manufacturer (repeat), model (repeat) All string fields are stripped of null padding before storage. """ # ── From SUB EA (SERIAL_NUMBER_RESPONSE) ───────────────────────────────── serial: str # e.g. "BE18189" βœ… firmware_minor: int # 0x11 = 17 for S337.17 βœ… serial_trail_0: Optional[int] = None # unit-specific byte β€” purpose unknown ❓ # ── From SUB FE (FULL_CONFIG_RESPONSE) ──────────────────────────────────── firmware_version: Optional[str] = None # e.g. "S337.17" βœ… dsp_version: Optional[str] = None # e.g. "10.72" βœ… manufacturer: Optional[str] = None # e.g. "Instantel" βœ… model: Optional[str] = None # e.g. "MiniMate Plus" βœ… # ── From SUB 1A (COMPLIANCE_CONFIG_RESPONSE) ────────────────────────────── compliance_config: Optional["ComplianceConfig"] = None # E5 response, read in connect() # ── From SUB 08 (EVENT_INDEX_RESPONSE) ──────────────────────────────────── event_count: Optional[int] = None # stored event count from F7 response πŸ”Ά def __str__(self) -> str: fw = self.firmware_version or f"?.{self.firmware_minor}" mdl = self.model or "MiniMate Plus" return f"{mdl} S/N:{self.serial} FW:{fw}" # ── Channel threshold / scaling ─────────────────────────────────────────────── @dataclass class ChannelConfig: """ Per-channel threshold and scaling values from SUB E5 / SUB 71. Floats are stored in the device in imperial units (in/s for geo channels, psi for MicL). Unit strings embedded in the payload confirm this. Certainty: βœ… CONFIRMED for trigger_level, alarm_level, unit strings. """ label: str # e.g. "Tran", "Vert", "Long", "MicL" βœ… trigger_level: float # in/s (geo) or psi (MicL) βœ… alarm_level: float # in/s (geo) or psi (MicL) βœ… max_range: float # full-scale calibration constant (e.g. 6.206) πŸ”Ά unit_label: str # e.g. "in./s" or "psi" βœ… # ── Peak values for one event ───────────────────────────────────────────────── @dataclass class PeakValues: """ Per-channel peak particle velocity / pressure for a single event, plus the scalar Peak Vector Sum. Extracted from the Full Waveform Record (SUB F3 / 0C response), stored as IEEE 754 big-endian floats in the device's native units (in/s / psi). Per-channel PPV location (βœ… CONFIRMED 2026-04-01): Found by searching for the 4-byte channel label string ("Tran", "Vert", "Long", "MicL") and reading the float at label_offset + 6. Peak Vector Sum (βœ… CONFIRMED 2026-04-01): Fixed offset 87 in the 210-byte record. = √(TranΒ² + VertΒ² + LongΒ²) at the sample instant of maximum combined geo motion. NOT the vector sum of the three per-channel peak values (those may occur at different times). Matches Blastware's "Peak Vector Sum" display exactly. """ tran: Optional[float] = None # Transverse PPV (in/s) βœ… vert: Optional[float] = None # Vertical PPV (in/s) βœ… long: Optional[float] = None # Longitudinal PPV (in/s) βœ… micl: Optional[float] = None # Air overpressure (psi) πŸ”Ά (units uncertain) peak_vector_sum: Optional[float] = None # Scalar geo PVS (in/s) βœ… # ── Project / operator metadata ─────────────────────────────────────────────── @dataclass class ProjectInfo: """ Operator-supplied project and location strings from the Full Waveform Record (SUB F3) and compliance config block (SUB E5 / SUB 71). All fields are optional β€” they may be blank if the operator did not fill them in through Blastware. """ setup_name: Optional[str] = None # "Standard Recording Setup" project: Optional[str] = None # project description client: Optional[str] = None # client name βœ… confirmed offset operator: Optional[str] = None # operator / user name sensor_location: Optional[str] = None # sensor location string notes: Optional[str] = None # extended notes # ── Compliance Config ────────────────────────────────────────────────────────── @dataclass class ComplianceConfig: """ Device compliance and recording configuration from SUB 1A (response E5). Contains device-wide settings like record time, trigger/alarm thresholds, and operator-supplied strings. This is read once during connect() and cached in DeviceInfo. All fields are optional β€” some may not be decoded yet or may be absent from the device configuration. """ raw: Optional[bytes] = None # full 2090-byte payload (for debugging) # Recording parameters (βœ… CONFIRMED from Β§7.6) record_time: Optional[float] = None # seconds (7.0, 10.0, 13.0, etc.) sample_rate: Optional[int] = None # sps (1024, 2048, 4096, etc.) β€” NOT YET FOUND ❓ # Trigger/alarm levels (βœ… CONFIRMED per-channel at Β§7.6) # For now we store the first geo channel (Transverse) as representatives; # full per-channel data would require structured Channel objects. trigger_level_geo: Optional[float] = None # in/s (first geo channel) alarm_level_geo: Optional[float] = None # in/s (first geo channel) max_range_geo: Optional[float] = None # in/s full-scale range # Project/setup strings (sourced from E5 / SUB 71 write payload) # These are the FULL project metadata from compliance config, # complementing the sparse ProjectInfo found in the waveform record (SUB 0C). setup_name: Optional[str] = None # "Standard Recording Setup" project: Optional[str] = None # project description client: Optional[str] = None # client name operator: Optional[str] = None # operator / user name sensor_location: Optional[str] = None # sensor location string notes: Optional[str] = None # extended notes / additional info # ── Event ───────────────────────────────────────────────────────────────────── @dataclass class Event: """ A single seismic event record downloaded from the device. Populated progressively across several request/response pairs: 1. SUB 1E (EVENT_HEADER) β†’ index, timestamp, sample_rate 2. SUB 0C (FULL_WAVEFORM_RECORD) β†’ peak_values, project_info, record_type 3. SUB 5A (BULK_WAVEFORM_STREAM) β†’ raw_samples (downloaded on demand) Fields not yet retrieved are None. """ # ── Identity ────────────────────────────────────────────────────────────── index: int # 0-based event number on device # ── From EVENT_HEADER (SUB 1E) ──────────────────────────────────────────── timestamp: Optional[Timestamp] = None # 6-byte timestamp βœ… sample_rate: Optional[int] = None # samples/sec (e.g. 1024) πŸ”Ά # ── From FULL_WAVEFORM_RECORD (SUB F3) ─────────────────────────────────── peak_values: Optional[PeakValues] = None project_info: Optional[ProjectInfo] = None record_type: Optional[str] = None # e.g. "Histogram", "Waveform" πŸ”Ά # ── From BULK_WAVEFORM_STREAM (SUB 5A) ─────────────────────────────────── # Raw ADC samples keyed by channel label. Not fetched unless explicitly # requested (large data transfer β€” up to several MB per event). raw_samples: Optional[dict] = None # {"Tran": [...], "Vert": [...], ...} # ── Debug / introspection ───────────────────────────────────────────────── # Raw 210-byte waveform record bytes, set when debug mode is active. # Exposed by the SFM server via ?debug=true so field layouts can be verified. _raw_record: Optional[bytes] = field(default=None, repr=False) def __str__(self) -> str: ts = str(self.timestamp) if self.timestamp else "no timestamp" ppv = "" if self.peak_values: pv = self.peak_values parts = [] if pv.tran is not None: parts.append(f"T={pv.tran:.4f}") if pv.vert is not None: parts.append(f"V={pv.vert:.4f}") if pv.long is not None: parts.append(f"L={pv.long:.4f}") if pv.micl is not None: parts.append(f"M={pv.micl:.6f}") ppv = " [" + ", ".join(parts) + " in/s]" return f"Event#{self.index} {ts}{ppv}"