""" micromate/idf_ascii_report.py — parse Thor (Micromate Series IV) IDF ASCII reports. Thor exports a `.IDFW.txt` or `.IDFH.txt` sidecar next to each `.IDFW` (waveform) or `.IDFH` (histogram) event binary. Each sidecar is a plain-text file with `"Key : Value"` lines covering the full device- authoritative event metadata — PPV per channel, ZC Freq, Time of Peak, Peak Acceleration / Displacement, sensor self-check results, project strings, calibration date, battery level, etc. — followed by a raw waveform-samples block headed by the literal line "Waveform Data Channels". This is the Thor analogue of `minimateplus/bw_ascii_report.py` for the Blastware (Series III) report format. The parser is intentionally permissive: we extract everything we recognise into a flat dict and silently ignore anything we don't. Downstream callers parse units (`"0.2119 in/s"` → 0.2119) only on the fields they need. Example input (truncated): "EventType : Full Waveform" "SampleRate : 1024 sps" "EventTime : 16:27:23" "EventDate : 2023-12-19" "TranPPV : 0.0251 in/s" "VertPPV : 0.2119 in/s" "LongPPV : 0.0282 in/s" "PeakVectorSum : 0.2131 in/s" "MicPSPL : 99.4 dB(L)" "TranZCFreq : 6.5 Hz" "SerialNumber : UM11719" "Version : Micromate ISEE 11.0AK" "FileName : UM11719_20231219162723.IDFW" "BatteryLevel : 3.8 volts" "Calibration : November 22, 2023 by Instantel" "TranTestResults : Passed" "TitleString1 : UPMC Presby-Loc 3-Level1-1R Elevator Rm" Waveform Data Channels Tran Vert Long MicL 0.0003 -0.0003 0.0003 0.00013 ... """ from __future__ import annotations import datetime import re from typing import Any, Dict, Optional, Tuple, Union # Lines look like: "Key : Value" (quotes literal, single ":" separator) _LINE_RE = re.compile(r'^\s*"?([^":]+?)"?\s*:\s*"?(.*?)"?\s*$') # Marker that ends the metadata block — everything after is raw sample data. _WAVEFORM_BLOCK_MARKER = "waveform data channels" def _normalize_key(raw: str) -> str: """Convert "TranPPV" / "PreTriggerLength" → snake_case.""" s = raw.strip() # Insert underscore between lower→upper / digit→letter transitions s = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", "_", s) s = re.sub(r"(?<=[A-Z])(?=[A-Z][a-z])", "_", s) s = s.replace("-", "_").replace(" ", "_") return s.lower() def _strip_unit_suffix(value: str) -> str: """Return the numeric part of values like "0.2119 in/s" → "0.2119". Also strips Thor's below/above-threshold prefixes: "<0.005 in/s" → "0.005" (below-noise-floor reading) ">100 Hz" → "100" (above-measurement-range reading) """ parts = value.strip().split() token = parts[0] if parts else value.strip() if token.startswith("<") or token.startswith(">"): token = token[1:] return token def _parse_float(value: str) -> Optional[float]: try: return float(_strip_unit_suffix(value)) except (ValueError, TypeError): return None def _parse_int(value: str) -> Optional[int]: try: return int(float(_strip_unit_suffix(value))) except (ValueError, TypeError): return None def parse_idf_report(text: Union[str, bytes]) -> Dict[str, Any]: """ Parse a Thor IDFW.txt / IDFH.txt sidecar. Returns a flat dict with two kinds of entries: - **Raw fields** — every `Key : Value` line, keyed by snake_case of the original key, value as a string (unit suffix preserved). Lets callers grab any field we haven't explicitly normalised. - **Derived fields** — a curated set with parsed types: * `serial_number` str * `event_type` str ("Full Waveform" / "Full Histogram") * `event_datetime` ISO-8601 string ("YYYY-MM-DDTHH:MM:SS") when both EventDate and EventTime are present * `sample_rate` int (samples/sec) * `tran_ppv`,`vert_ppv`,`long_ppv` float (in/s) * `mic_ppv` float (dB or psi — same units as MicPSPL) * `peak_vector_sum` float (in/s) * `tran_zc_freq`,`vert_zc_freq`,`long_zc_freq` float (Hz) * `record_time_sec` float (seconds) * `pre_trigger_sec` float (seconds) * `project` str (from TitleString1 — Thor's location) * `client` str (TitleString2) * `operator` str (TitleString3 — company/operator) * `notes` str (TitleString4) * `setup` str * `version` str (firmware) * `battery_volts` float * `calibration_text` str (e.g. "November 22, 2023 by Instantel") * `tran_test_passed`, `vert_test_passed`, `long_test_passed`, `mic_test_passed` bool ("Passed" → True; anything else → False) * `filename` str (FileName line — useful sanity check) Stops parsing at the literal "Waveform Data Channels" line; the raw-samples block is left to whoever wants to decode the binary. Input may be `str` or `bytes` (`utf-8`/`latin-1` tolerant). """ if isinstance(text, bytes): try: text = text.decode("utf-8") except UnicodeDecodeError: text = text.decode("latin-1", errors="replace") raw: Dict[str, str] = {} for line in text.splitlines(): stripped = line.strip() if not stripped: continue if stripped.lower().startswith(_WAVEFORM_BLOCK_MARKER): break m = _LINE_RE.match(stripped) if not m: continue key = _normalize_key(m.group(1)) value = m.group(2).strip() # Multi-value lines (Channel, Units, etc.) — coalesce by appending. if key in raw: raw[key] = raw[key] + "; " + value else: raw[key] = value out: Dict[str, Any] = dict(raw) # keep all raw fields # ── Derived fields ─────────────────────────────────────────────────────── def _take(*candidates: str) -> Optional[str]: for c in candidates: if c in raw: return raw[c] return None # Event identity if "serial_number" in raw: out["serial_number"] = raw["serial_number"] if "event_type" in raw: out["event_type"] = raw["event_type"] if "file_name" in raw: out["filename"] = raw["file_name"] # Combined date+time. Waveform sidecars use "EventDate" / "EventTime"; # histogram sidecars use "HistogramStartDate" / "HistogramStartTime". # Prefer the event_* names when both are present. ed = raw.get("event_date") or raw.get("histogram_start_date") et = raw.get("event_time") or raw.get("histogram_start_time") if ed and et: try: dt = datetime.datetime.strptime(f"{ed} {et}", "%Y-%m-%d %H:%M:%S") out["event_datetime"] = dt.isoformat() except ValueError: pass # Numeric scalars. For every field we typify here, we MUST drop the # raw string copy from `out` when parsing fails — Thor writes things # like "<0.005 in/s" (below threshold) and "N/A" (not measured) that # would otherwise linger in `out` as strings, sneak into SQLite REAL # columns via permissive type affinity, and then crash the JS # frontend on `.toFixed(...)`. int_fields = ("sample_rate",) for key in int_fields: v = raw.get(key) if v is None: continue iv = _parse_int(v) if iv is not None: out[key] = iv else: out.pop(key, None) float_fields = ( "tran_ppv", "vert_ppv", "long_ppv", "peak_vector_sum", "tran_zc_freq", "vert_zc_freq", "long_zc_freq", "tran_peak_acceleration", "vert_peak_acceleration", "long_peak_acceleration", "tran_peak_displacement", "vert_peak_displacement", "long_peak_displacement", "mic_zc_freq", ) for key in float_fields: v = raw.get(key) if v is None: continue fv = _parse_float(v) if fv is not None: out[key] = fv else: out.pop(key, None) # Time-of-peak: Thor labels these "TimeofPeak" (lowercase "of") so the # normalizer produces "*_timeof_peak". Map them to the canonical # ``*_time_of_peak`` output keys for downstream consumers. for raw_key, out_key in ( ("tran_timeof_peak", "tran_time_of_peak"), ("vert_timeof_peak", "vert_time_of_peak"), ("long_timeof_peak", "long_time_of_peak"), ("mic_timeof_peak", "mic_time_of_peak"), ): v = raw.get(raw_key) if v is None: continue fv = _parse_float(v) if fv is not None: out[out_key] = fv # Microphone — Thor reports MicPSPL (dB(L)) which is the closest # analogue to BW's mic_ppv. The raw "99.4 dB(L)" string stays in # `out` under the original `mic_pspl` key for display; the parsed # float goes in `mic_ppv`. mic = raw.get("mic_pspl") if mic is not None: fv = _parse_float(mic) if fv is not None: out["mic_ppv"] = fv # Record / pre-trigger duration — same drop-on-failure discipline. rt = raw.get("record_time") if rt is not None: fv = _parse_float(rt) if fv is not None: out["record_time_sec"] = fv pt = raw.get("pre_trigger_length") if pt is not None: fv = _parse_float(pt) if fv is not None: out["pre_trigger_sec"] = fv # Project / client / operator / location strings. Thor's title # strings are operator-defined; conventional mapping (per Thor's # default TitleNote labels in the example data): # TitleString1 = Location → project (sensor location identifier) # TitleString2 = Client → client # TitleString3 = Company → operator (the monitoring company) # TitleString4 = Notes → notes out["project"] = _take("title_string1") out["client"] = _take("title_string2") out["operator"] = _take("title_string3", "operator") out["notes"] = _take("title_string4", "post_event_note") if "setup" in raw: out["setup"] = raw["setup"] if "version" in raw: out["version"] = raw["version"] # Battery (e.g. "3.8 volts" → 3.8) bl = raw.get("battery_level") if bl is not None: fv = _parse_float(bl) if fv is not None: out["battery_volts"] = fv # Calibration line is free-form (e.g. "November 22, 2023 by Instantel"). if "calibration" in raw: out["calibration_text"] = raw["calibration"] # Sensor self-check results — bool flags for key, out_key in ( ("tran_test_results", "tran_test_passed"), ("vert_test_results", "vert_test_passed"), ("long_test_results", "long_test_passed"), ("mic_test_results", "mic_test_passed"), ): v = raw.get(key) if v is not None: out[out_key] = v.strip().lower() == "passed" return out def serial_from_filename(name: str) -> Optional[str]: """Convenience: pull the serial prefix from a Thor event filename. Thor uses the literal serial as the filename prefix: UM11719_20231219163444.IDFW → "UM11719" BE9439_20200713124251.IDFH → "BE9439" """ m = re.match(r"^([A-Z]{2}\d+)_\d{14}\.(IDFH|IDFW)(?:\.txt)?$", name, re.IGNORECASE) return m.group(1).upper() if m else None def parse_event_filename(name: str) -> Optional[Tuple[str, datetime.datetime, str]]: """Parse `_.` → (serial, datetime, kind). `kind` is "IDFH" or "IDFW" (upper-case). Returns None on no match. """ m = re.match(r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$", name, re.IGNORECASE) if not m: return None try: ts = datetime.datetime.strptime(m.group(2), "%Y%m%d%H%M%S") except ValueError: return None return m.group(1).upper(), ts, m.group(3).upper()