feat: add thor/micromate compatibility v0.18.0
This commit is contained in:
@@ -0,0 +1,291 @@
|
||||
"""
|
||||
sfm/idf_ascii_report.py — parse Thor (Micromate Series IV) IDF ASCII reports.
|
||||
|
||||
Thor exports a `.IDFW.txt` or `.IDFH.txt` sidecar next to each `.IDFW`
|
||||
(waveform) or `.IDFH` (histogram) event binary. Each sidecar is a
|
||||
plain-text file with `"Key : Value"` lines covering the full device-
|
||||
authoritative event metadata — PPV per channel, ZC Freq, Time of Peak,
|
||||
Peak Acceleration / Displacement, sensor self-check results, project
|
||||
strings, calibration date, battery level, etc. — followed by a raw
|
||||
waveform-samples block headed by the literal line "Waveform Data Channels".
|
||||
|
||||
This is the Thor analogue of `minimateplus/bw_ascii_report.py` for the
|
||||
Blastware (Series III) report format. The parser is intentionally
|
||||
permissive: we extract everything we recognise into a flat dict and
|
||||
silently ignore anything we don't. Downstream callers parse units
|
||||
(`"0.2119 in/s"` → 0.2119) only on the fields they need.
|
||||
|
||||
Example input (truncated):
|
||||
|
||||
"EventType : Full Waveform"
|
||||
"SampleRate : 1024 sps"
|
||||
"EventTime : 16:27:23"
|
||||
"EventDate : 2023-12-19"
|
||||
"TranPPV : 0.0251 in/s"
|
||||
"VertPPV : 0.2119 in/s"
|
||||
"LongPPV : 0.0282 in/s"
|
||||
"PeakVectorSum : 0.2131 in/s"
|
||||
"MicPSPL : 99.4 dB(L)"
|
||||
"TranZCFreq : 6.5 Hz"
|
||||
"SerialNumber : UM11719"
|
||||
"Version : Micromate ISEE 11.0AK"
|
||||
"FileName : UM11719_20231219162723.IDFW"
|
||||
"BatteryLevel : 3.8 volts"
|
||||
"Calibration : November 22, 2023 by Instantel"
|
||||
"TranTestResults : Passed"
|
||||
"TitleString1 : UPMC Presby-Loc 3-Level1-1R Elevator Rm"
|
||||
Waveform Data Channels
|
||||
Tran Vert Long MicL
|
||||
0.0003 -0.0003 0.0003 0.00013
|
||||
...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import re
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
||||
|
||||
# Lines look like: "Key : Value" (quotes literal, single ":" separator)
|
||||
_LINE_RE = re.compile(r'^\s*"?([^":]+?)"?\s*:\s*"?(.*?)"?\s*$')
|
||||
|
||||
# Marker that ends the metadata block — everything after is raw sample data.
|
||||
_WAVEFORM_BLOCK_MARKER = "waveform data channels"
|
||||
|
||||
|
||||
def _normalize_key(raw: str) -> str:
|
||||
"""Convert "TranPPV" / "PreTriggerLength" → snake_case."""
|
||||
s = raw.strip()
|
||||
# Insert underscore between lower→upper / digit→letter transitions
|
||||
s = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", "_", s)
|
||||
s = re.sub(r"(?<=[A-Z])(?=[A-Z][a-z])", "_", s)
|
||||
s = s.replace("-", "_").replace(" ", "_")
|
||||
return s.lower()
|
||||
|
||||
|
||||
def _strip_unit_suffix(value: str) -> str:
|
||||
"""Return the numeric part of values like "0.2119 in/s" → "0.2119"."""
|
||||
parts = value.strip().split()
|
||||
return parts[0] if parts else value.strip()
|
||||
|
||||
|
||||
def _parse_float(value: str) -> Optional[float]:
|
||||
try:
|
||||
return float(_strip_unit_suffix(value))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def _parse_int(value: str) -> Optional[int]:
|
||||
try:
|
||||
return int(float(_strip_unit_suffix(value)))
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse a Thor IDFW.txt / IDFH.txt sidecar.
|
||||
|
||||
Returns a flat dict with two kinds of entries:
|
||||
|
||||
- **Raw fields** — every `Key : Value` line, keyed by snake_case
|
||||
of the original key, value as a string (unit suffix preserved).
|
||||
Lets callers grab any field we haven't explicitly normalised.
|
||||
|
||||
- **Derived fields** — a curated set with parsed types:
|
||||
* `serial_number` str
|
||||
* `event_type` str ("Full Waveform" / "Full Histogram")
|
||||
* `event_datetime` ISO-8601 string ("YYYY-MM-DDTHH:MM:SS") when
|
||||
both EventDate and EventTime are present
|
||||
* `sample_rate` int (samples/sec)
|
||||
* `tran_ppv`,`vert_ppv`,`long_ppv` float (in/s)
|
||||
* `mic_ppv` float (dB or psi — same units as MicPSPL)
|
||||
* `peak_vector_sum` float (in/s)
|
||||
* `tran_zc_freq`,`vert_zc_freq`,`long_zc_freq` float (Hz)
|
||||
* `record_time_sec` float (seconds)
|
||||
* `pre_trigger_sec` float (seconds)
|
||||
* `project` str (from TitleString1 — Thor's location)
|
||||
* `client` str (TitleString2)
|
||||
* `operator` str (TitleString3 — company/operator)
|
||||
* `notes` str (TitleString4)
|
||||
* `setup` str
|
||||
* `version` str (firmware)
|
||||
* `battery_volts` float
|
||||
* `calibration_text` str (e.g. "November 22, 2023 by Instantel")
|
||||
* `tran_test_passed`, `vert_test_passed`, `long_test_passed`,
|
||||
`mic_test_passed` bool ("Passed" → True; anything else → False)
|
||||
* `filename` str (FileName line — useful sanity check)
|
||||
|
||||
Stops parsing at the literal "Waveform Data Channels" line; the
|
||||
raw-samples block is left to whoever wants to decode the binary.
|
||||
|
||||
Input may be `str` or `bytes` (`utf-8`/`latin-1` tolerant).
|
||||
"""
|
||||
if isinstance(text, bytes):
|
||||
try:
|
||||
text = text.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
text = text.decode("latin-1", errors="replace")
|
||||
|
||||
raw: Dict[str, str] = {}
|
||||
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
if stripped.lower().startswith(_WAVEFORM_BLOCK_MARKER):
|
||||
break
|
||||
m = _LINE_RE.match(stripped)
|
||||
if not m:
|
||||
continue
|
||||
key = _normalize_key(m.group(1))
|
||||
value = m.group(2).strip()
|
||||
# Multi-value lines (Channel, Units, etc.) — coalesce by appending.
|
||||
if key in raw:
|
||||
raw[key] = raw[key] + "; " + value
|
||||
else:
|
||||
raw[key] = value
|
||||
|
||||
out: Dict[str, Any] = dict(raw) # keep all raw fields
|
||||
|
||||
# ── Derived fields ───────────────────────────────────────────────────────
|
||||
|
||||
def _take(*candidates: str) -> Optional[str]:
|
||||
for c in candidates:
|
||||
if c in raw:
|
||||
return raw[c]
|
||||
return None
|
||||
|
||||
# Event identity
|
||||
if "serial_number" in raw:
|
||||
out["serial_number"] = raw["serial_number"]
|
||||
if "event_type" in raw:
|
||||
out["event_type"] = raw["event_type"]
|
||||
if "file_name" in raw:
|
||||
out["filename"] = raw["file_name"]
|
||||
|
||||
# Combined date+time. Waveform sidecars use "EventDate" / "EventTime";
|
||||
# histogram sidecars use "HistogramStartDate" / "HistogramStartTime".
|
||||
# Prefer the event_* names when both are present.
|
||||
ed = raw.get("event_date") or raw.get("histogram_start_date")
|
||||
et = raw.get("event_time") or raw.get("histogram_start_time")
|
||||
if ed and et:
|
||||
try:
|
||||
dt = datetime.datetime.strptime(f"{ed} {et}", "%Y-%m-%d %H:%M:%S")
|
||||
out["event_datetime"] = dt.isoformat()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Numeric scalars
|
||||
for key in ("sample_rate",):
|
||||
v = raw.get(key)
|
||||
if v is not None:
|
||||
iv = _parse_int(v)
|
||||
if iv is not None:
|
||||
out[key] = iv
|
||||
|
||||
for key in ("tran_ppv", "vert_ppv", "long_ppv", "peak_vector_sum",
|
||||
"tran_zc_freq", "vert_zc_freq", "long_zc_freq",
|
||||
"tran_peak_acceleration", "vert_peak_acceleration",
|
||||
"long_peak_acceleration",
|
||||
"tran_peak_displacement", "vert_peak_displacement",
|
||||
"long_peak_displacement",
|
||||
"tran_time_of_peak", "vert_time_of_peak", "long_time_of_peak",
|
||||
"mic_time_of_peak", "mic_zc_freq"):
|
||||
v = raw.get(key)
|
||||
if v is not None:
|
||||
fv = _parse_float(v)
|
||||
if fv is not None:
|
||||
out[key] = fv
|
||||
|
||||
# Microphone — Thor reports MicPSPL (dB(L)) which is the closest
|
||||
# analogue to BW's mic_ppv. Stored as a float; units are in the
|
||||
# original raw field (`mic_pspl` raw entry preserves "99.4 dB(L)").
|
||||
mic = raw.get("mic_pspl")
|
||||
if mic is not None:
|
||||
fv = _parse_float(mic)
|
||||
if fv is not None:
|
||||
out["mic_ppv"] = fv
|
||||
|
||||
# Record / pre-trigger duration
|
||||
rt = raw.get("record_time")
|
||||
if rt is not None:
|
||||
fv = _parse_float(rt)
|
||||
if fv is not None:
|
||||
out["record_time_sec"] = fv
|
||||
pt = raw.get("pre_trigger_length")
|
||||
if pt is not None:
|
||||
fv = _parse_float(pt)
|
||||
if fv is not None:
|
||||
out["pre_trigger_sec"] = fv
|
||||
|
||||
# Project / client / operator / location strings. Thor's title
|
||||
# strings are operator-defined; conventional mapping (per Thor's
|
||||
# default TitleNote labels in the example data):
|
||||
# TitleString1 = Location → project (sensor location identifier)
|
||||
# TitleString2 = Client → client
|
||||
# TitleString3 = Company → operator (the monitoring company)
|
||||
# TitleString4 = Notes → notes
|
||||
out["project"] = _take("title_string1")
|
||||
out["client"] = _take("title_string2")
|
||||
out["operator"] = _take("title_string3", "operator")
|
||||
out["notes"] = _take("title_string4", "post_event_note")
|
||||
|
||||
if "setup" in raw:
|
||||
out["setup"] = raw["setup"]
|
||||
if "version" in raw:
|
||||
out["version"] = raw["version"]
|
||||
|
||||
# Battery (e.g. "3.8 volts" → 3.8)
|
||||
bl = raw.get("battery_level")
|
||||
if bl is not None:
|
||||
fv = _parse_float(bl)
|
||||
if fv is not None:
|
||||
out["battery_volts"] = fv
|
||||
|
||||
# Calibration line is free-form (e.g. "November 22, 2023 by Instantel").
|
||||
if "calibration" in raw:
|
||||
out["calibration_text"] = raw["calibration"]
|
||||
|
||||
# Sensor self-check results — bool flags
|
||||
for key, out_key in (
|
||||
("tran_test_results", "tran_test_passed"),
|
||||
("vert_test_results", "vert_test_passed"),
|
||||
("long_test_results", "long_test_passed"),
|
||||
("mic_test_results", "mic_test_passed"),
|
||||
):
|
||||
v = raw.get(key)
|
||||
if v is not None:
|
||||
out[out_key] = v.strip().lower() == "passed"
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def serial_from_filename(name: str) -> Optional[str]:
|
||||
"""Convenience: pull the serial prefix from a Thor event filename.
|
||||
|
||||
Thor uses the literal serial as the filename prefix:
|
||||
UM11719_20231219163444.IDFW → "UM11719"
|
||||
BE9439_20200713124251.IDFH → "BE9439"
|
||||
"""
|
||||
m = re.match(r"^([A-Z]{2}\d+)_\d{14}\.(IDFH|IDFW)(?:\.txt)?$",
|
||||
name, re.IGNORECASE)
|
||||
return m.group(1).upper() if m else None
|
||||
|
||||
|
||||
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime.datetime, str]]:
|
||||
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` → (serial, datetime, kind).
|
||||
|
||||
`kind` is "IDFH" or "IDFW" (upper-case). Returns None on no match.
|
||||
"""
|
||||
m = re.match(r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
|
||||
name, re.IGNORECASE)
|
||||
if not m:
|
||||
return None
|
||||
try:
|
||||
ts = datetime.datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
|
||||
except ValueError:
|
||||
return None
|
||||
return m.group(1).upper(), ts, m.group(3).upper()
|
||||
Reference in New Issue
Block a user