Files
seismo-relay/minimateplus/models.py
2026-03-30 23:23:29 -04:00

216 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
models.py — Plain-Python data models for the MiniMate Plus protocol library.
All models are intentionally simple dataclasses with no protocol logic.
They represent *decoded* device data — the client layer translates raw frame
bytes into these objects, and the SFM API layer serialises them to JSON.
Notes on certainty:
Fields marked ✅ are confirmed from captured data.
Fields marked 🔶 are strongly inferred but not formally proven.
Fields marked ❓ are present in the captured payload but not yet decoded.
See docs/instantel_protocol_reference.md for full derivation details.
"""
from __future__ import annotations
import struct
from dataclasses import dataclass, field
from typing import Optional
# ── Timestamp ─────────────────────────────────────────────────────────────────
@dataclass
class Timestamp:
"""
6-byte event timestamp decoded from the MiniMate Plus wire format.
Wire layout: [flag:1] [year:2 BE] [unknown:1] [month:1] [day:1]
The year 1995 is the device's factory-default RTC date — it appears
whenever the battery has been disconnected. Treat 1995 as "clock not set".
"""
raw: bytes # raw 6-byte sequence for round-tripping
flag: int # byte 0 — validity/type flag (usually 0x01) 🔶
year: int # bytes 12 big-endian uint16 ✅
unknown_byte: int # byte 3 — likely hours/minutes ❓
month: int # byte 4 ✅
day: int # byte 5 ✅
@classmethod
def from_bytes(cls, data: bytes) -> "Timestamp":
"""
Decode a 6-byte timestamp sequence.
Args:
data: exactly 6 bytes from the device payload.
Returns:
Decoded Timestamp.
Raises:
ValueError: if data is not exactly 6 bytes.
"""
if len(data) != 6:
raise ValueError(f"Timestamp requires exactly 6 bytes, got {len(data)}")
flag = data[0]
year = struct.unpack_from(">H", data, 1)[0]
unknown_byte = data[3]
month = data[4]
day = data[5]
return cls(
raw=bytes(data),
flag=flag,
year=year,
unknown_byte=unknown_byte,
month=month,
day=day,
)
@property
def clock_set(self) -> bool:
"""False when year == 1995 (factory default / battery-lost state)."""
return self.year != 1995
def __str__(self) -> str:
if not self.clock_set:
return f"CLOCK_NOT_SET ({self.year}-{self.month:02d}-{self.day:02d})"
return f"{self.year}-{self.month:02d}-{self.day:02d}"
# ── Device identity ───────────────────────────────────────────────────────────
@dataclass
class DeviceInfo:
"""
Combined device identity information gathered during the startup sequence.
Populated from three response SUBs:
- SUB EA (SERIAL_NUMBER_RESPONSE): serial, firmware_minor
- SUB FE (FULL_CONFIG_RESPONSE): serial (repeat), firmware_version,
dsp_version, manufacturer, model
- SUB A4 (POLL_RESPONSE): manufacturer (repeat), model (repeat)
All string fields are stripped of null padding before storage.
"""
# ── From SUB EA (SERIAL_NUMBER_RESPONSE) ─────────────────────────────────
serial: str # e.g. "BE18189" ✅
firmware_minor: int # 0x11 = 17 for S337.17 ✅
serial_trail_0: Optional[int] = None # unit-specific byte — purpose unknown ❓
# ── From SUB FE (FULL_CONFIG_RESPONSE) ────────────────────────────────────
firmware_version: Optional[str] = None # e.g. "S337.17" ✅
dsp_version: Optional[str] = None # e.g. "10.72" ✅
manufacturer: Optional[str] = None # e.g. "Instantel" ✅
model: Optional[str] = None # e.g. "MiniMate Plus" ✅
def __str__(self) -> str:
fw = self.firmware_version or f"?.{self.firmware_minor}"
mdl = self.model or "MiniMate Plus"
return f"{mdl} S/N:{self.serial} FW:{fw}"
# ── Channel threshold / scaling ───────────────────────────────────────────────
@dataclass
class ChannelConfig:
"""
Per-channel threshold and scaling values from SUB E5 / SUB 71.
Floats are stored in the device in imperial units (in/s for geo channels,
psi for MicL). Unit strings embedded in the payload confirm this.
Certainty: ✅ CONFIRMED for trigger_level, alarm_level, unit strings.
"""
label: str # e.g. "Tran", "Vert", "Long", "MicL" ✅
trigger_level: float # in/s (geo) or psi (MicL) ✅
alarm_level: float # in/s (geo) or psi (MicL) ✅
max_range: float # full-scale calibration constant (e.g. 6.206) 🔶
unit_label: str # e.g. "in./s" or "psi" ✅
# ── Peak values for one event ─────────────────────────────────────────────────
@dataclass
class PeakValues:
"""
Per-channel peak particle velocity / pressure for a single event.
Extracted from the Full Waveform Record (SUB F3), stored as IEEE 754
big-endian floats in the device's native units (in/s / psi).
"""
tran: Optional[float] = None # Transverse PPV (in/s) ✅
vert: Optional[float] = None # Vertical PPV (in/s) ✅
long: Optional[float] = None # Longitudinal PPV (in/s) ✅
micl: Optional[float] = None # Air overpressure (psi) 🔶 (units uncertain)
# ── Project / operator metadata ───────────────────────────────────────────────
@dataclass
class ProjectInfo:
"""
Operator-supplied project and location strings from the Full Waveform
Record (SUB F3) and compliance config block (SUB E5 / SUB 71).
All fields are optional — they may be blank if the operator did not fill
them in through Blastware.
"""
setup_name: Optional[str] = None # "Standard Recording Setup"
project: Optional[str] = None # project description
client: Optional[str] = None # client name ✅ confirmed offset
operator: Optional[str] = None # operator / user name
sensor_location: Optional[str] = None # sensor location string
notes: Optional[str] = None # extended notes
# ── Event ─────────────────────────────────────────────────────────────────────
@dataclass
class Event:
"""
A single seismic event record downloaded from the device.
Populated progressively across several request/response pairs:
1. SUB 1E (EVENT_HEADER) → index, timestamp, sample_rate
2. SUB 0C (FULL_WAVEFORM_RECORD) → peak_values, project_info, record_type
3. SUB 5A (BULK_WAVEFORM_STREAM) → raw_samples (downloaded on demand)
Fields not yet retrieved are None.
"""
# ── Identity ──────────────────────────────────────────────────────────────
index: int # 0-based event number on device
# ── From EVENT_HEADER (SUB 1E) ────────────────────────────────────────────
timestamp: Optional[Timestamp] = None # 6-byte timestamp ✅
sample_rate: Optional[int] = None # samples/sec (e.g. 1024) 🔶
# ── From FULL_WAVEFORM_RECORD (SUB F3) ───────────────────────────────────
peak_values: Optional[PeakValues] = None
project_info: Optional[ProjectInfo] = None
record_type: Optional[str] = None # e.g. "Histogram", "Waveform" 🔶
# ── From BULK_WAVEFORM_STREAM (SUB 5A) ───────────────────────────────────
# Raw ADC samples keyed by channel label. Not fetched unless explicitly
# requested (large data transfer — up to several MB per event).
raw_samples: Optional[dict] = None # {"Tran": [...], "Vert": [...], ...}
def __str__(self) -> str:
ts = str(self.timestamp) if self.timestamp else "no timestamp"
ppv = ""
if self.peak_values:
pv = self.peak_values
parts = []
if pv.tran is not None:
parts.append(f"T={pv.tran:.4f}")
if pv.vert is not None:
parts.append(f"V={pv.vert:.4f}")
if pv.long is not None:
parts.append(f"L={pv.long:.4f}")
if pv.micl is not None:
parts.append(f"M={pv.micl:.6f}")
ppv = " [" + ", ".join(parts) + " in/s]"
return f"Event#{self.index} {ts}{ppv}"