""" models.py — Plain-Python data models for the MiniMate Plus protocol library. All models are intentionally simple dataclasses with no protocol logic. They represent *decoded* device data — the client layer translates raw frame bytes into these objects, and the SFM API layer serialises them to JSON. Notes on certainty: Fields marked ✅ are confirmed from captured data. Fields marked 🔶 are strongly inferred but not formally proven. Fields marked ❓ are present in the captured payload but not yet decoded. See docs/instantel_protocol_reference.md for full derivation details. """ from __future__ import annotations import struct from dataclasses import dataclass, field from typing import Optional # ── Timestamp ───────────────────────────────────────────────────────────────── @dataclass class Timestamp: """ 6-byte event timestamp decoded from the MiniMate Plus wire format. Wire layout: [flag:1] [year:2 BE] [unknown:1] [month:1] [day:1] The year 1995 is the device's factory-default RTC date — it appears whenever the battery has been disconnected. Treat 1995 as "clock not set". """ raw: bytes # raw 6-byte sequence for round-tripping flag: int # byte 0 — validity/type flag (usually 0x01) 🔶 year: int # bytes 1–2 big-endian uint16 ✅ unknown_byte: int # byte 3 — likely hours/minutes ❓ month: int # byte 4 ✅ day: int # byte 5 ✅ @classmethod def from_bytes(cls, data: bytes) -> "Timestamp": """ Decode a 6-byte timestamp sequence. Args: data: exactly 6 bytes from the device payload. Returns: Decoded Timestamp. Raises: ValueError: if data is not exactly 6 bytes. """ if len(data) != 6: raise ValueError(f"Timestamp requires exactly 6 bytes, got {len(data)}") flag = data[0] year = struct.unpack_from(">H", data, 1)[0] unknown_byte = data[3] month = data[4] day = data[5] return cls( raw=bytes(data), flag=flag, year=year, unknown_byte=unknown_byte, month=month, day=day, ) @property def clock_set(self) -> bool: """False when year == 1995 (factory default / battery-lost state).""" return self.year != 1995 def __str__(self) -> str: if not self.clock_set: return f"CLOCK_NOT_SET ({self.year}-{self.month:02d}-{self.day:02d})" return f"{self.year}-{self.month:02d}-{self.day:02d}" # ── Device identity ─────────────────────────────────────────────────────────── @dataclass class DeviceInfo: """ Combined device identity information gathered during the startup sequence. Populated from three response SUBs: - SUB EA (SERIAL_NUMBER_RESPONSE): serial, firmware_minor - SUB FE (FULL_CONFIG_RESPONSE): serial (repeat), firmware_version, dsp_version, manufacturer, model - SUB A4 (POLL_RESPONSE): manufacturer (repeat), model (repeat) All string fields are stripped of null padding before storage. """ # ── From SUB EA (SERIAL_NUMBER_RESPONSE) ───────────────────────────────── serial: str # e.g. "BE18189" ✅ firmware_minor: int # 0x11 = 17 for S337.17 ✅ serial_trail_0: Optional[int] = None # unit-specific byte — purpose unknown ❓ # ── From SUB FE (FULL_CONFIG_RESPONSE) ──────────────────────────────────── firmware_version: Optional[str] = None # e.g. "S337.17" ✅ dsp_version: Optional[str] = None # e.g. "10.72" ✅ manufacturer: Optional[str] = None # e.g. "Instantel" ✅ model: Optional[str] = None # e.g. "MiniMate Plus" ✅ def __str__(self) -> str: fw = self.firmware_version or f"?.{self.firmware_minor}" mdl = self.model or "MiniMate Plus" return f"{mdl} S/N:{self.serial} FW:{fw}" # ── Channel threshold / scaling ─────────────────────────────────────────────── @dataclass class ChannelConfig: """ Per-channel threshold and scaling values from SUB E5 / SUB 71. Floats are stored in the device in imperial units (in/s for geo channels, psi for MicL). Unit strings embedded in the payload confirm this. Certainty: ✅ CONFIRMED for trigger_level, alarm_level, unit strings. """ label: str # e.g. "Tran", "Vert", "Long", "MicL" ✅ trigger_level: float # in/s (geo) or psi (MicL) ✅ alarm_level: float # in/s (geo) or psi (MicL) ✅ max_range: float # full-scale calibration constant (e.g. 6.206) 🔶 unit_label: str # e.g. "in./s" or "psi" ✅ # ── Peak values for one event ───────────────────────────────────────────────── @dataclass class PeakValues: """ Per-channel peak particle velocity / pressure for a single event. Extracted from the Full Waveform Record (SUB F3), stored as IEEE 754 big-endian floats in the device's native units (in/s / psi). """ tran: Optional[float] = None # Transverse PPV (in/s) ✅ vert: Optional[float] = None # Vertical PPV (in/s) ✅ long: Optional[float] = None # Longitudinal PPV (in/s) ✅ micl: Optional[float] = None # Air overpressure (psi) 🔶 (units uncertain) # ── Project / operator metadata ─────────────────────────────────────────────── @dataclass class ProjectInfo: """ Operator-supplied project and location strings from the Full Waveform Record (SUB F3) and compliance config block (SUB E5 / SUB 71). All fields are optional — they may be blank if the operator did not fill them in through Blastware. """ setup_name: Optional[str] = None # "Standard Recording Setup" project: Optional[str] = None # project description client: Optional[str] = None # client name ✅ confirmed offset operator: Optional[str] = None # operator / user name sensor_location: Optional[str] = None # sensor location string notes: Optional[str] = None # extended notes # ── Event ───────────────────────────────────────────────────────────────────── @dataclass class Event: """ A single seismic event record downloaded from the device. Populated progressively across several request/response pairs: 1. SUB 1E (EVENT_HEADER) → index, timestamp, sample_rate 2. SUB 0C (FULL_WAVEFORM_RECORD) → peak_values, project_info, record_type 3. SUB 5A (BULK_WAVEFORM_STREAM) → raw_samples (downloaded on demand) Fields not yet retrieved are None. """ # ── Identity ────────────────────────────────────────────────────────────── index: int # 0-based event number on device # ── From EVENT_HEADER (SUB 1E) ──────────────────────────────────────────── timestamp: Optional[Timestamp] = None # 6-byte timestamp ✅ sample_rate: Optional[int] = None # samples/sec (e.g. 1024) 🔶 # ── From FULL_WAVEFORM_RECORD (SUB F3) ─────────────────────────────────── peak_values: Optional[PeakValues] = None project_info: Optional[ProjectInfo] = None record_type: Optional[str] = None # e.g. "Histogram", "Waveform" 🔶 # ── From BULK_WAVEFORM_STREAM (SUB 5A) ─────────────────────────────────── # Raw ADC samples keyed by channel label. Not fetched unless explicitly # requested (large data transfer — up to several MB per event). raw_samples: Optional[dict] = None # {"Tran": [...], "Vert": [...], ...} def __str__(self) -> str: ts = str(self.timestamp) if self.timestamp else "no timestamp" ppv = "" if self.peak_values: pv = self.peak_values parts = [] if pv.tran is not None: parts.append(f"T={pv.tran:.4f}") if pv.vert is not None: parts.append(f"V={pv.vert:.4f}") if pv.long is not None: parts.append(f"L={pv.long:.4f}") if pv.micl is not None: parts.append(f"M={pv.micl:.6f}") ppv = " [" + ", ".join(parts) + " in/s]" return f"Event#{self.index} {ts}{ppv}"