merge full s3 codec decoded #23

Merged
serversdown merged 18 commits from codec-re into main 2026-05-20 13:45:33 -04:00
12 changed files with 1774 additions and 78 deletions
Showing only changes of commit 84ee68f889 - Show all commits
+27
View File
@@ -4,6 +4,33 @@ All notable changes to seismo-relay are documented here.
---
## v0.16.0 — 2026-05-11
The "BW ACH ingestion" release. When paired with **series3-watcher v1.5.0**, every Blastware ACH event (binary + `_ASCII.TXT` report) lands in SeismoDb with device-authoritative peaks, project metadata, sensor self-check, and ZC/Time-of-Peak data — without depending on the still-undecoded waveform body codec. This is the end-to-end product win discussed in v0.15.0's "out of scope" notes: sortable / filterable monthly-summary review of historical events, populated from the BW ASCII export rather than re-decoded samples.
### Added — `/db/import/blastware_file` rich-metadata ingestion
- **Paired BW ASCII reports.** The endpoint now accepts the `<binary>_<ext>_ASCII.TXT` partner BW writes alongside each event. Pairing handles both filename conventions: ACH (`M529LK44_AB0_ASCII.TXT`) and manual-export (`M529LK44.AB0.TXT`). When both present, ACH wins.
- **`minimateplus/bw_ascii_report.py`** (new) — parser + `BwAsciiReport` dataclass for BW's per-event ASCII export. Handles every field BW writes: identity, trigger config, per-channel PPV / ZC Freq / Time of Peak / Peak Acceleration / Peak Displacement, Peak Vector Sum + time, MicL PSPL / Time of Peak / ZC Freq, sensor self-check (Test Freq / Test Ratio / Test Amplitude / Pass-Fail per channel), monitor log, PC SW version.
- **Position-based user-notes parsing.** BW's Compliance Setup → Notes tab labels (Project / Client / User Name / Seis Loc) are *operator-editable* — an operator can rename them to "Building:", "Site Address:", etc. Rather than maintain a label-spelling map, the parser uses positional matching between the `Units :` and `Geo Range :` anchors in the ASCII output. The four canonical slots (project / client / operator / sensor_location) populate by position regardless of label; the original labels BW wrote are preserved in `report.user_note_labels` for downstream UIs (terra-view) to display verbatim.
- **`bw_report` sidecar block.** New top-level block in `.sfm.json` carrying the parsed BW report (trigger config, peaks with per-channel stats, mic block, sensor_check, monitor_log, PC SW version, operator-label labels).
- **`apply_report_to_event(event, report)` helper.** Overlays the report's device-authoritative fields onto an in-memory `Event` so `SeismoDb.insert_events()` writes correct DB columns instead of the broken-codec values from `_peaks_from_samples()`.
### Fixed — three compounding bugs that left forwarded events with garbage data
- **Import endpoint inserted under `serial="UNKNOWN"`.** `_serial_from_event(ev)` was a stub that always returned `None`; the BW-filename-decoded serial that `WaveformStore` had already resolved was never surfaced to `db.insert_events`. Now uses `rec["serial"]` as the authoritative source. `scripts/repair_unknown_serials.py` repairs existing DB rows.
- **`/db/units` ignored events from non-ACH ingest paths.** `query_units()` only aggregated from `ach_sessions` — events that arrived via `save_imported_bw()` were never visible in the fleet overview even though they populated `events` correctly. Now unions both tables.
- **Re-imports left stale DB rows.** The `IntegrityError` handler in `insert_events()` only refreshed filename / sidecar columns when a duplicate `(serial, timestamp)` arrived. Peak values, project info, sample_rate, record_type stayed locked at whatever the first (often broken-codec) insert wrote. Now the upsert path refreshes every device-authoritative column from the new data while preserving `false_trigger` and immutable fields (`id`, `created_at`).
- **Server-side TXT pairing only knew the legacy convention.** The endpoint stripped `.TXT` and looked up `<binary>` — which works for manual exports (`<binary>.TXT`) but not BW ACH (`<stem>_<ext>_ASCII.TXT`). Reports were arriving in the multipart but silently dropped. Now recognises both conventions and registers each report under all matching binary names.
### Migration
For existing deployments where events were forwarded by an older watcher (broken pairing) or imported during the UNKNOWN-bucketing window:
1. `python -m scripts.repair_unknown_serials --db <path> --apply` to re-attribute `serial="UNKNOWN"` rows.
2. Delete the watcher's `sfm_forwarded.json` state file and let it re-forward. The server's upsert path will refresh the existing DB rows with the report's authoritative values.
3. Operator review state (`false_trigger`, sidecar `review` block) is preserved across the re-import.
## v0.15.0 — 2026-05-07
### Added
+2
View File
@@ -1353,6 +1353,8 @@ body) because writing a dial string may require DLE escaping for embedded contro
## What's next
**See [README.md → Roadmap (Future)](README.md#roadmap-future) for the canonical deferred-work list.** This section is kept as a status log of in-progress / recently-shipped technical details (encoding schemes, byte layouts, etc.) that are too low-level for the README's roadmap.
- **Database** — SQLite store for events + monitor log entries; dedup by key; queryable
- **Histograms** — decode histogram-mode A5 data (noise floor tracking)
- **Blastware-compatible file output** — `write_blastware_file()` and `write_mlg()` implemented. `blastware_filename()` generates correct Blastware filenames (AB0 for direct, AB0W/AB0H for ACH). **Confirmed BYTE-PERFECT against BW reference (v0.14.3, 2026-05-05):** when fed the BW 5-1-26 3-sec capture's A5 frames, the SFM-built file matches BW's saved `M529LKIQ.G10` byte-for-byte (8708 bytes, 0 differences). Live SFM downloads of event 0 (3-sec) and event 1 (3-sec continuation) both open cleanly in Blastware with full Event Reports, frequency analysis, and waveform plots. Body assembly is just contiguous concatenation of frame contributions in stream order (probe → meta@0x1002 → meta@0x1004 → samples → TERM); no stripping, no overlay, no special handling. Histogram+Continuous mode deferred (5A stream for those events embeds histogram interval records that may need different handling — untested under v0.14.x). Extension mapping: extensions encode timestamp (AB0T for ACH, AB0 for direct), NOT recording mode. Filename format: `<prefix_letter><serial3><4-char-base36-stem><ext>`
+44 -13
View File
@@ -1,4 +1,4 @@
# seismo-relay `v0.15.0`
# seismo-relay `v0.16.0`
A ground-up replacement for **Blastware** — Instantel's aging Windows-only
software for managing MiniMate Plus seismographs.
@@ -14,11 +14,12 @@ over direct RS-232 or cellular modem (Sierra Wireless RV50 / RV55).
> byte-perfect against Blastware captures across 2-sec, 3-sec, and 10-sec
> events.** Generated `.G10` / `.AB0` files open cleanly in Blastware with
> full Event Reports, frequency analysis, and waveform plots.
> **v0.15.0 (2026-05-07)** adds layered per-event storage (BW binary +
> raw 5A pickle + HDF5 + `.sfm.json` sidecar), a plot-ready
> `sfm.plot.v1` JSON shape with server-side ADC-to-physical-units
> conversion, and a BW-file importer for ingesting externally-produced
> events. See [CHANGELOG.md](CHANGELOG.md) for full version history.
> **v0.16.0 (2026-05-11)** adds BW ASCII report ingestion to
> `/db/import/blastware_file` — paired with **series3-watcher v1.5.0**,
> every Blastware ACH event lands in SeismoDb with device-authoritative
> peaks, project metadata, sensor self-check, and ZC/Time-of-Peak data,
> without depending on the still-undecoded waveform body codec.
> See [CHANGELOG.md](CHANGELOG.md) for full version history.
---
@@ -356,10 +357,40 @@ Use **com0com** or **VSPD** to create the virtual COM pair on Windows.
## Roadmap (Future)
- [ ] Verify 30-sec event download — body may exceed `0xFFFF` and force the device into a different `end_key` encoding (none of 2/3/10-sec test cases hit this boundary)
- [ ] Terra-view integration — seismo-relay router, unit detail page, VISON-style event listing
- [ ] Vibration summary reports — highest legit PPV per project → Word doc (false trigger filtering first)
- [ ] Compliance config encoder — build raw write payloads from a `ComplianceConfig` object
- [ ] Modem manager — push RV50/RV55 configs via Sierra Wireless API
- [ ] Histogram mode recording support (5A stream analysis for mode 0x03)
- [ ] Call Home dial_string write support (requires DLE escaping for embedded control characters)
### High-impact (unblocks product features)
- [ ] **Waveform body codec reverse-engineering.** The 5A bulk-stream body is some kind of compressed/encoded format (not raw int16 LE as previously assumed — see §7.6.1 retraction in `docs/instantel_protocol_reference.md`). Structural framing is ~50% decoded on branch `claude/codec-re-cBGNe` (tagged-block walker, segment counters); per-byte sample mapping is still open. Until this lands, the in-app waveform viewer renders garbage and BW-import peak values fall back to `_peaks_from_samples()` saturation noise. Workaround: pair every BW-imported event with its `_ASCII.TXT` so the device-authoritative peaks land in the DB regardless of codec.
- [ ] **In-app waveform viewer accuracy.** Depends on codec decode. Plot.v1 JSON pipeline + viewer skeleton already exist; will start showing real waveforms automatically once `_decode_a5_waveform` produces correct samples.
- [ ] **Terra-view integration** — seismo-relay router, unit detail page, VISON-style event listing.
- [ ] **Vibration summary reports** — highest legit PPV per project → Word doc (false-trigger filtering first).
### BW ASCII report parser enhancements (built in v0.16.0)
- [ ] **Histogram-specific structural fields.** Current parser handles the shared fields (PPV, ZC Freq, sensor self-check, project) but silently drops histogram-only fields: `Histogram Start/Stop Time`, `Histogram Start/Stop Date`, `Number of Intervals`, `Interval Size`, per-channel `Peak Time` + `Peak Date` (absolute timestamps rather than the waveform's `Time of Peak` relative seconds).
- [ ] **Histogram interval bin-table parsing.** Trailing 792-row table (per-interval Peak/Freq per channel + MicL) in histogram TXTs is unparsed. Probably too big for the sidecar JSON; may want a separate `.histogram.h5` companion file.
- [ ] **`>100 Hz` value parsing.** Histogram TXTs use `>100 Hz` for out-of-range ZC freq; current `_parse_number()` returns `None` for these (loses information).
### Ingestion gaps
- [ ] **MLG forwarding.** `series3-watcher` forwards event binaries + their `_ASCII.TXT` reports, but skips `.MLG` per-unit monitor log files entirely. Adding an `POST /db/import/mlg_file` endpoint + watcher scan path would populate `monitor_log` for non-ACH-routed units (coverage queries, "was this unit monitoring on date X" lookups).
- [ ] **0C-record raw bytes persistence in the sidecar.** Currently on branch `claude/codec-re-cBGNe` as commit `a187124`; cherry-pick if useful as a standalone fix. Preserves the 210-byte 0C record under `extensions.raw_records.waveform_record_b64` so future field-offset analysis (Peak Acceleration / Time of Peak / etc. — the fields BW computes client-side from samples) can run offline.
### Operational
- [ ] **`series3-watcher` file archive manager** — 90-day-old events moved to `<watch_folder>_archive/<year>/<month>/` subfolders. Plan drafted in `claude/codec-re-cBGNe`'s plan-mode session; awaiting a 5-minute test on whether Blastware UI walks subfolders before any code lands (determines layout: in-place subfolders vs sibling archive).
- [ ] **Compliance config encoder** — build raw write payloads from a `ComplianceConfig` object.
- [ ] **Modem manager** — push RV50/RV55 configs via Sierra Wireless API.
- [ ] **Call Home dial_string write support** (requires DLE escaping for embedded control characters).
- [ ] **Histogram mode recording support** (5A stream analysis for mode 0x03 — separate from histogram ASCII parsing above).
### Test coverage
- [ ] Verify 30-sec event download — body may exceed `0xFFFF` and force the device into a different `end_key` encoding (none of the 2/3/10-sec test cases hit this boundary).
- [ ] Histogram mode (0x03) write via SFM — confirmed working for Single Shot / Continuous / Histogram+Continuous; Histogram (0x03) needs a live test from a non-Histogram starting state.
### Lower-priority cleanups
- [ ] Compliance write anchor-9 cleanup — when changing recording_mode via SFM, a spurious `0x10` may persist after Histogram→other mode transitions. Doesn't affect device operation but differs from BW's byte-perfect output.
- [ ] Locate "Sensor Check" byte in compliance config (need capture with Disabled vs Before-monitoring).
- [ ] Call Home — map time slots 3/4 offsets; confirm `modem_power_relay_enabled`.
- [ ] RV55 DCD/DTR — newer RV55 firmware doesn't assert DCD by default; units don't resume monitoring after call-home disconnect (`--restart-monitoring` flag deferred).
+522
View File
@@ -0,0 +1,522 @@
"""
minimateplus/bw_ascii_report.py parser for Blastware's per-event ASCII
report (the .TXT file BW writes alongside each saved event binary).
The ASCII export is the authoritative source for every "rich" per-event
field that BW computes from the waveform but never persists in the BW
binary itself:
- Per-channel PPV (Tran / Vert / Long / MicL)
- Peak Vector Sum + Peak Vector Sum Time
- Per-channel ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement
- MicL PSPL, MicL Time of Peak, MicL ZC Freq
- Per-channel Sensor Self-Check (Test Freq / Test Ratio / Test Results)
- MicL Test Amplitude (mV)
- Battery, calibration date, monitor-log timestamps
Persisting these values into the SFM database lets the monthly-summary
review workflow ("show me events at Location X with PVS > 0.5") work
without depending on the (still-undecoded) waveform body codec.
Format (verified against decode-re/5-8-26 4-event bundle):
- One field per line, wrapped in double quotes: `"Field Name : Value"`
- Field/value separator: literal ` : ` (space-colon-space).
- Some field names contain an internal `:` already (e.g. `"Project:"`),
so we split on the FIRST ` : ` only.
- Some fields have unit suffixes: `"0.500 in/s"` / `"7.5 Hz"` / `"533 mv"`.
- A `"Monitor Log(s)"` marker line is followed by tab-separated rows
of `start_time<TAB>stop_time<TAB>description`.
- Final `"PC SW Version : ..."` line ends the metadata block.
- A blank line separates metadata from the sample table.
- Sample table starts with ` Tran <TAB> Vert <TAB>...`, then
one row per sample (tab-separated, right-padded numeric values).
- Geo channel values are in in/s; MicL in dB(L) (or 0.000 below threshold).
Because some metadata fields have whitespace quirks ("MicL Time of
Peak" has two spaces; the leading "Project:" value has its own colon),
we normalise whitespace in the key before lookup.
"""
from __future__ import annotations
import datetime
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# ─────────────────────────────────────────────────────────────────────────────
# Output dataclasses
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ChannelStats:
"""Per-channel derived stats, populated from an event report."""
ppv_ips: Optional[float] = None # in/s (geo channels only)
zc_freq_hz: Optional[float] = None # Hz
time_of_peak_s: Optional[float] = None # seconds (relative to trigger; can be negative)
peak_accel_g: Optional[float] = None # g (geo channels only)
peak_disp_in: Optional[float] = None # in (geo channels only)
@dataclass
class MicStats:
"""MicL-specific stats."""
weighting: Optional[str] = None # e.g. "Linear Weighting"
pspl_dbl: Optional[float] = None # dB(L)
zc_freq_hz: Optional[float] = None
time_of_peak_s: Optional[float] = None
@dataclass
class SensorCheck:
"""Per-channel sensor self-check result.
Geo channels report a frequency + ratio; MicL reports a frequency +
amplitude (mV). All channels also have a Pass/Fail string.
"""
test_freq_hz: Optional[float] = None
test_ratio: Optional[float] = None # geo channels only
test_amplitude_mv: Optional[float] = None # MicL only
test_results: Optional[str] = None # "Passed" / "Failed"
@dataclass
class MonitorLogEntry:
"""One row of the trailing Monitor Log(s) block."""
start_time: Optional[datetime.datetime] = None
stop_time: Optional[datetime.datetime] = None
description: Optional[str] = None
@dataclass
class BwAsciiReport:
"""Structured representation of one BW per-event ASCII export."""
# ── Identity ─────────────────────────────────────────────────────────────
event_type: Optional[str] = None # e.g. "Full Waveform"
serial: Optional[str] = None # e.g. "BE11529"
version: Optional[str] = None # firmware version line
file_name: Optional[str] = None # e.g. "M529LK44.AB0"
event_datetime: Optional[datetime.datetime] = None # parsed from Event Time + Event Date
# ── Trigger / recording config ──────────────────────────────────────────
trigger_channel: Optional[str] = None # e.g. "Vert" or "From Unit"
geo_trigger_level_ips: Optional[float] = None
pretrig_s: Optional[float] = None # negative seconds
record_time_s: Optional[float] = None
record_stop_mode: Optional[str] = None
sample_rate_sps: Optional[int] = None
battery_volts: Optional[float] = None
calibration_date: Optional[datetime.date] = None
calibration_by: Optional[str] = None # e.g. "Instantel"
units: Optional[str] = None # e.g. "in/s and dB(L)"
# ── Operator-supplied metadata ──────────────────────────────────────────
# Parsed by POSITION from the 4-line "User Notes" block BW writes
# between the `Units :` and `Geo Range :` lines. Position-based so
# the values populate correctly even when an operator renames the
# labels in Blastware's Compliance Setup → Notes tab (the 4 labels
# are user-editable, e.g. "Seis Loc:" → "Building:" → "Site Address:").
# The original labels BW wrote are preserved in `user_note_labels`
# so terra-view can render them as the operator named them.
project: Optional[str] = None # position 1 (BW default label "Project:")
client: Optional[str] = None # position 2 (BW default label "Client:")
operator: Optional[str] = None # position 3 (BW default label "User Name:")
sensor_location: Optional[str] = None # position 4 (BW default label "Seis Loc:")
# Maps canonical slot name → the literal label BW wrote in the ASCII
# export. Empty if the User Notes block wasn't present. Example
# when the operator renamed slot 4 to "Building:":
# {"project": "Project:", "client": "Client:",
# "operator": "User Name:", "sensor_location": "Building:"}
user_note_labels: Dict[str, str] = field(default_factory=dict)
# ── Geo channel scaling ─────────────────────────────────────────────────
geo_range_ips: Optional[float] = None # 10.000 / 1.250
# ── Per-channel derived stats (geo + mic) ───────────────────────────────
channels: Dict[str, ChannelStats] = field(default_factory=dict)
mic: MicStats = field(default_factory=MicStats)
# ── Vector sum ──────────────────────────────────────────────────────────
peak_vector_sum_ips: Optional[float] = None
peak_vector_sum_time_s: Optional[float] = None
# ── Sensor self-check (per channel) ─────────────────────────────────────
sensor_check: Dict[str, SensorCheck] = field(default_factory=dict)
# ── Monitor log + tooling version ───────────────────────────────────────
monitor_log: List[MonitorLogEntry] = field(default_factory=list)
pc_sw_version: Optional[str] = None
# ── Sample table (optional; only parsed if requested) ───────────────────
# Each entry: (Tran, Vert, Long, MicL) in the report's units (geo
# channels in in/s, MicL in dB(L)). None when parse_samples=False.
samples: Optional[List[Tuple[float, float, float, float]]] = None
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
_KEY_NORMALISE_RE = re.compile(r"\s+")
_NUMERIC_RE = re.compile(r"^-?\d+(?:\.\d+)?")
def _normalise_key(k: str) -> str:
"""Collapse whitespace runs (incl. tabs) and strip — handles BW's
"MicL Time of Peak" double-space and leading-colon quirks."""
return _KEY_NORMALISE_RE.sub(" ", k).strip()
def _strip_quotes(line: str) -> str:
line = line.rstrip("\r\n")
if len(line) >= 2 and line.startswith('"') and line.endswith('"'):
return line[1:-1]
return line
def _parse_number(value: str) -> Optional[float]:
"""Pull the leading numeric portion out of a value like "0.500 in/s"."""
m = _NUMERIC_RE.match(value.strip())
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
def _parse_int(value: str) -> Optional[int]:
n = _parse_number(value)
return None if n is None else int(round(n))
# Months exactly as BW writes them.
_MONTHS = {
"January": 1, "February": 2, "March": 3, "April": 4,
"May": 5, "June": 6, "July": 7, "August": 8,
"September": 9, "October": 10, "November": 11, "December": 12,
# Short forms used in monitor-log rows ("Apr 23 /26").
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "Jun": 6, "Jul": 7,
"Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12,
}
def _parse_event_date(s: str) -> Optional[datetime.date]:
"""Parse "April 23, 2026" or "May 8, 2026" → date."""
s = s.strip()
parts = s.replace(",", " ").split()
if len(parts) < 3:
return None
month_name, day_str, year_str = parts[0], parts[1], parts[2]
month = _MONTHS.get(month_name)
if month is None:
return None
try:
return datetime.date(int(year_str), month, int(day_str))
except ValueError:
return None
def _parse_event_time(s: str) -> Optional[datetime.time]:
"""Parse "15:56:35" → time."""
s = s.strip()
try:
h, m, sec = s.split(":")
return datetime.time(int(h), int(m), int(sec))
except (ValueError, IndexError):
return None
def _parse_calibration(value: str) -> Tuple[Optional[datetime.date], Optional[str]]:
"""Parse "April 29, 2025 by Instantel" → (date, "Instantel")."""
parts = value.split(" by ", 1)
date = _parse_event_date(parts[0])
by = parts[1].strip() if len(parts) > 1 else None
return date, by
def _parse_monitor_row(line: str) -> Optional[MonitorLogEntry]:
"""Parse a tab-separated monitor log row.
Format: `<start>\t<stop>\t<desc>` where each timestamp is BW's
short form "Mon DD /YY HH:MM:SS" (e.g. "Apr 23 /26 15:46:16").
Year is encoded as a 2-digit suffix; we expand "/26" 2026.
"""
parts = line.split("\t")
if len(parts) < 2:
return None
start = _parse_monitor_ts(parts[0])
stop = _parse_monitor_ts(parts[1])
desc = parts[2].strip() if len(parts) > 2 else None
if start is None and stop is None and not desc:
return None
return MonitorLogEntry(start_time=start, stop_time=stop, description=desc)
def _parse_monitor_ts(s: str) -> Optional[datetime.datetime]:
"""Parse "Apr 23 /26 15:46:16" → datetime."""
s = s.strip()
parts = s.split()
if len(parts) < 4:
return None
month = _MONTHS.get(parts[0])
if month is None:
return None
try:
day = int(parts[1])
# parts[2] looks like "/26" → century-flip to 2026
yy = int(parts[2].lstrip("/"))
year = 2000 + yy if yy < 80 else 1900 + yy
h, m, sec = (int(x) for x in parts[3].split(":"))
return datetime.datetime(year, month, day, h, m, sec)
except (ValueError, IndexError):
return None
# ── User-notes positional slot map ──────────────────────────────────────────
#
# Blastware's Compliance Setup → Notes tab shows four operator-supplied
# fields whose LABELS the operator can rename (see screenshot in
# project archive). Defaults are "Project:" / "Client:" /
# "User Name:" / "Seis Loc:", but an operator using a different
# convention can rename them to anything ("Building:", "Site:",
# "Address:", etc.). The ASCII export reflects whatever the operator
# typed, so label-based matching is fragile.
#
# What IS reliable: BW always writes the 4 user-notes lines in the
# same order, contiguously between the `Units :` line and the
# `Geo Range :` line. We parse them by POSITION and preserve the
# operator's labels in `report.user_note_labels` so terra-view can
# render them as the operator intended.
_USER_NOTE_SLOTS = ("project", "client", "operator", "sensor_location")
# ─────────────────────────────────────────────────────────────────────────────
# Top-level parser
# ─────────────────────────────────────────────────────────────────────────────
def parse_report(text: Union[str, bytes], *, parse_samples: bool = False) -> BwAsciiReport:
"""Parse a BW per-event ASCII export into a structured BwAsciiReport.
Set ``parse_samples=True`` to also populate ``report.samples`` with
the trailing sample table. Default False because the table is
huge and most callers only want metadata for indexing.
"""
if isinstance(text, bytes):
text = text.decode("ascii", errors="replace")
report = BwAsciiReport()
# Pre-create channel stat slots so callers can rely on them existing.
for ch in ("Tran", "Vert", "Long", "MicL"):
report.channels.setdefault(ch, ChannelStats())
report.sensor_check.setdefault(ch, SensorCheck())
lines = text.splitlines()
i = 0
n = len(lines)
in_monitor_log_section = False
event_time_str: Optional[str] = None
event_date: Optional[datetime.date] = None
# User-notes block detection. We enter the block after parsing
# the "Units :" line and exit on the "Geo Range :" line. Inside,
# the first 4 unmatched `<label> : <value>` lines are assigned to
# the 4 canonical operator-supplied slots by POSITION (project,
# client, operator, sensor_location) regardless of what the
# operator named the labels in BW's Compliance Setup → Notes tab.
in_user_notes_block = False
user_note_position = 0
while i < n:
raw_line = lines[i]
i += 1
# Blank line marks the start of the sample table.
if raw_line.strip() == "":
break
line = _strip_quotes(raw_line)
# Monitor log section: "Monitor Log(s)" header followed by N rows
# (still inside double-quoted lines), terminated by a non-row line
# like "PC SW Version : ..." or a blank line.
if not in_monitor_log_section and line.strip() == "Monitor Log(s)":
in_monitor_log_section = True
continue
if in_monitor_log_section:
# Heuristic: monitor rows contain a tab; the next "Field : Value"
# line ends the section.
if "\t" in line:
entry = _parse_monitor_row(line)
if entry:
report.monitor_log.append(entry)
continue
# Falls through to the field parser below; clear the flag.
in_monitor_log_section = False
# "Field : Value" — split on FIRST occurrence of " : "
idx = line.find(" : ")
if idx < 0:
continue
key = _normalise_key(line[:idx])
value = line[idx + 3 :].strip()
# ── Identity / config ────────────────────────────────────────────────
if key == "Event Type": report.event_type = value
elif key == "Serial Number": report.serial = value
elif key == "Version": report.version = value
elif key == "File Name": report.file_name = value
elif key == "Event Time": event_time_str = value
elif key == "Event Date": event_date = _parse_event_date(value)
elif key == "Trigger": report.trigger_channel = value
elif key == "Geo Trigger Level": report.geo_trigger_level_ips = _parse_number(value)
elif key == "Pre-trigger Length": report.pretrig_s = _parse_number(value)
elif key == "Record Time": report.record_time_s = _parse_number(value)
elif key == "Record Stop Mode": report.record_stop_mode = value
elif key == "Sample Rate": report.sample_rate_sps = _parse_int(value)
elif key == "Battery Level": report.battery_volts = _parse_number(value)
elif key == "Calibration":
report.calibration_date, report.calibration_by = _parse_calibration(value)
elif key == "Units":
report.units = value
# Entering the user-notes block. Next ~4 lines until
# "Geo Range :" are the operator-supplied notes.
in_user_notes_block = True
user_note_position = 0
elif key == "Geo Range":
# Exiting the user-notes block.
in_user_notes_block = False
report.geo_range_ips = _parse_number(value)
# User-notes block: assign by position (operator may have
# renamed the labels, so we don't trust them). Preserve the
# original labels in `user_note_labels` for downstream UIs
# (terra-view) that want to display them as the operator
# named them.
elif in_user_notes_block and user_note_position < len(_USER_NOTE_SLOTS):
slot = _USER_NOTE_SLOTS[user_note_position]
setattr(report, slot, value)
report.user_note_labels[slot] = key
user_note_position += 1
# ── Per-channel stats ────────────────────────────────────────────────
# All match the pattern "{Channel} <stat-name>"
elif key in (
"Tran PPV", "Vert PPV", "Long PPV",
"Tran ZC Freq", "Vert ZC Freq", "Long ZC Freq",
"Tran Time of Peak", "Vert Time of Peak", "Long Time of Peak",
"Tran Peak Acceleration", "Vert Peak Acceleration", "Long Peak Acceleration",
"Tran Peak Displacement", "Vert Peak Displacement", "Long Peak Displacement",
):
ch_name, stat = key.split(" ", 1)
cs = report.channels.setdefault(ch_name, ChannelStats())
num = _parse_number(value)
if stat == "PPV": cs.ppv_ips = num
elif stat == "ZC Freq": cs.zc_freq_hz = num
elif stat == "Time of Peak": cs.time_of_peak_s = num
elif stat == "Peak Acceleration": cs.peak_accel_g = num
elif stat == "Peak Displacement": cs.peak_disp_in = num
# ── Vector Sum ───────────────────────────────────────────────────────
elif key == "Peak Vector Sum":
report.peak_vector_sum_ips = _parse_number(value)
elif key == "Peak Vector Sum Time":
report.peak_vector_sum_time_s = _parse_number(value)
# ── Microphone block ────────────────────────────────────────────────
elif key == "Microphone":
report.mic.weighting = value
elif key == "MicL PSPL":
report.mic.pspl_dbl = _parse_number(value)
# Mirror onto the "MicL" entry in channels so callers querying
# `channels["MicL"].ppv_ips` see something — but it's dB(L), not
# in/s, so we store as-is in the MicStats and mark the channel.
elif key == "MicL Time of Peak":
report.mic.time_of_peak_s = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.time_of_peak_s = report.mic.time_of_peak_s
elif key == "MicL ZC Freq":
report.mic.zc_freq_hz = _parse_number(value)
cs = report.channels.setdefault("MicL", ChannelStats())
cs.zc_freq_hz = report.mic.zc_freq_hz
# ── Sensor self-check ────────────────────────────────────────────────
elif key in (
"Tran Test Freq", "Vert Test Freq", "Long Test Freq", "MicL Test Freq",
"Tran Test Ratio", "Vert Test Ratio", "Long Test Ratio",
"MicL Test Amplitude",
"Tran Test Results", "Vert Test Results", "Long Test Results", "MicL Test Results",
):
ch_name, stat = key.split(" ", 1)
sc = report.sensor_check.setdefault(ch_name, SensorCheck())
if stat == "Test Freq": sc.test_freq_hz = _parse_number(value)
elif stat == "Test Ratio": sc.test_ratio = _parse_number(value)
elif stat == "Test Amplitude": sc.test_amplitude_mv = _parse_number(value)
elif stat == "Test Results": sc.test_results = value
# ── Trailer ─────────────────────────────────────────────────────────
elif key == "PC SW Version":
report.pc_sw_version = value
# Unknown keys are silently dropped — forward-compat for future
# BW versions that may add fields.
# Combine event date + time into a datetime
if event_date is not None and event_time_str is not None:
t = _parse_event_time(event_time_str)
if t is not None:
report.event_datetime = datetime.datetime.combine(event_date, t)
if parse_samples:
report.samples = _parse_sample_table(lines, i)
return report
def _parse_sample_table(
lines: List[str], start: int,
) -> List[Tuple[float, float, float, float]]:
"""Parse the trailing sample table.
The table starts with a header row (" Tran <TAB>...") and continues
until EOF. Each data row is a tab-separated quartet of numeric values.
"""
samples: List[Tuple[float, float, float, float]] = []
seen_header = False
for line in lines[start:]:
line = line.rstrip("\r\n")
if not line.strip():
continue
cols = [c.strip() for c in line.split("\t") if c.strip()]
if not seen_header:
# Header row contains channel names; numeric rows don't.
if any(c in ("Tran", "Vert", "Long", "MicL") for c in cols):
seen_header = True
continue
if len(cols) < 4:
continue
try:
samples.append((
float(cols[0]), float(cols[1]),
float(cols[2]), float(cols[3]),
))
except ValueError:
continue
return samples
def parse_report_file(
path: Union[str, Path], *, parse_samples: bool = False,
) -> BwAsciiReport:
"""Convenience: read a .TXT file from disk and parse it."""
return parse_report(Path(path).read_bytes(), parse_samples=parse_samples)
+241 -9
View File
@@ -26,6 +26,12 @@ from typing import Optional, Union
from .models import Event, PeakValues, ProjectInfo, Timestamp
from . import blastware_file as _bw # avoid circular reference at module load
from .bw_ascii_report import BwAsciiReport
# Reference pressure for dB(L) → psi conversion (20 µPa expressed in psi).
# Same constant as sfm/sfm_webapp.html so server-side and browser-side
# conversions agree.
_DBL_REF_PSI = 2.9e-9
log = logging.getLogger(__name__)
@@ -41,7 +47,7 @@ SIDECAR_KIND = "sfm.event"
# bumped without a `pip install` re-run — leading to confusing stale
# version stamps in sidecars. Bump this constant and CHANGELOG.md
# together at release time.
TOOL_VERSION = "0.15.0"
TOOL_VERSION = "0.16.0"
try:
# Best-effort: prefer the installed metadata when it's NEWER than the
@@ -94,6 +100,158 @@ def _peak_values_to_dict(pv: Optional[PeakValues]) -> dict:
}
def _bw_report_to_dict(report: BwAsciiReport) -> dict:
"""Project a parsed BW ASCII report into the sidecar's `bw_report` block.
All fields are rendered as plain JSON-compatible types (no datetime
objects). Channels are uniformly lowercased for stable JSON keys.
"""
def _ch(ch_name: str) -> dict:
cs = report.channels.get(ch_name)
if cs is None:
return {}
out = {
"ppv_ips": cs.ppv_ips,
"zc_freq_hz": cs.zc_freq_hz,
"time_of_peak_s": cs.time_of_peak_s,
"peak_accel_g": cs.peak_accel_g,
"peak_disp_in": cs.peak_disp_in,
}
# Drop all-None entries — keeps the JSON tidy for partial reports.
return {k: v for k, v in out.items() if v is not None}
def _sc(ch_name: str) -> dict:
sc = report.sensor_check.get(ch_name)
if sc is None:
return {}
out = {
"freq_hz": sc.test_freq_hz,
"ratio": sc.test_ratio,
"amplitude_mv": sc.test_amplitude_mv,
"result": sc.test_results,
}
return {k: v for k, v in out.items() if v is not None}
monitor_log = []
for entry in report.monitor_log:
e = {
"start": entry.start_time.isoformat() if entry.start_time else None,
"stop": entry.stop_time.isoformat() if entry.stop_time else None,
"description": entry.description,
}
monitor_log.append({k: v for k, v in e.items() if v is not None})
return {
"available": True,
"event_type": report.event_type,
"version": report.version,
"trigger": {
"channel": report.trigger_channel,
"geo_level_ips": report.geo_trigger_level_ips,
},
"recording": {
"sample_rate_sps": report.sample_rate_sps,
"record_time_s": report.record_time_s,
"pretrig_s": report.pretrig_s,
"stop_mode": report.record_stop_mode,
"geo_range_ips": report.geo_range_ips,
"units": report.units,
},
"device": {
"battery_volts": report.battery_volts,
"calibration_date": report.calibration_date.isoformat() if report.calibration_date else None,
"calibration_by": report.calibration_by,
},
"peaks": {
"tran": _ch("Tran"),
"vert": _ch("Vert"),
"long": _ch("Long"),
"vector_sum": {
"ips": report.peak_vector_sum_ips,
"time_s": report.peak_vector_sum_time_s,
},
},
"mic": {
"weighting": report.mic.weighting,
"pspl_dbl": report.mic.pspl_dbl,
"zc_freq_hz": report.mic.zc_freq_hz,
"time_of_peak_s": report.mic.time_of_peak_s,
},
"sensor_check": {
"tran": _sc("Tran"),
"vert": _sc("Vert"),
"long": _sc("Long"),
"mic": _sc("MicL"),
},
"monitor_log": monitor_log,
"pc_sw_version": report.pc_sw_version,
}
def _dbl_to_psi(pspl_dbl: float) -> float:
"""Convert dB(L) sound pressure level back to psi. Uses the same
20 µPa reference (= 2.9e-9 psi) as the webapp so server-side and
browser-side conversions agree."""
return _DBL_REF_PSI * (10.0 ** (pspl_dbl / 20.0))
def apply_report_to_event(event: Event, report: BwAsciiReport) -> None:
"""Overlay device-authoritative fields from a parsed BW ASCII report
onto an in-memory Event, IN-PLACE.
Why this exists
`read_blastware_file()` parses the BW binary and fills `Event.peak_values`
via `_peaks_from_samples()` which runs the (still-undecoded) BW body
codec assuming raw int16 LE and produces ±32K-shaped noise on every
channel. Result: peak values land in the SeismoDb event row as
~10 in/s on every event regardless of the actual signal.
When a paired BW ASCII report is available, the report carries the
device's own authoritative peak / project / sample-rate / record-time
values. This helper folds those onto the Event before it flows to
`SeismoDb.insert_events()`, so the DB columns reflect the report
rather than the broken-codec output.
Fields overlaid (only when the report supplies a non-None value):
- peak_values.tran / .vert / .long (from report.channels)
- peak_values.peak_vector_sum (from report.peak_vector_sum_ips)
- peak_values.micl (psi) (from report.mic.pspl_dbl psi)
- project_info.project / .client / .operator / .sensor_location
- sample_rate (from report.sample_rate_sps)
- rectime_seconds (from report.record_time_s)
Fields NOT touched (operator-edit / parser-output preserved):
- timestamp, raw_samples, record_type, total_samples,
pretrig_samples, _waveform_key, _a5_frames, _raw_record
- false_trigger and review state (those live on the sidecar, not on Event)
"""
if event.peak_values is None:
event.peak_values = PeakValues()
pv = event.peak_values
ch = report.channels
if (t := ch.get("Tran")) and t.ppv_ips is not None: pv.tran = t.ppv_ips
if (v := ch.get("Vert")) and v.ppv_ips is not None: pv.vert = v.ppv_ips
if (l := ch.get("Long")) and l.ppv_ips is not None: pv.long = l.ppv_ips
if report.peak_vector_sum_ips is not None:
pv.peak_vector_sum = report.peak_vector_sum_ips
if report.mic.pspl_dbl is not None and report.mic.pspl_dbl > 0:
pv.micl = _dbl_to_psi(report.mic.pspl_dbl)
if event.project_info is None:
event.project_info = ProjectInfo()
pi = event.project_info
if report.project: pi.project = report.project
if report.client: pi.client = report.client
if report.operator: pi.operator = report.operator
if report.sensor_location: pi.sensor_location = report.sensor_location
if report.sample_rate_sps:
event.sample_rate = report.sample_rate_sps
if report.record_time_s is not None:
event.rectime_seconds = report.record_time_s
def _project_info_to_dict(pi: Optional[ProjectInfo]) -> dict:
if pi is None:
return {
@@ -123,35 +281,104 @@ def event_to_sidecar_dict(
captured_at: Optional[datetime.datetime] = None,
review: Optional[dict] = None,
extensions: Optional[dict] = None,
bw_report: Optional[BwAsciiReport] = None,
) -> dict:
"""
Build a v1 sidecar dict from an Event + the surrounding metadata.
Pure helper no file I/O. Callers stitch the result into a sidecar
via `write_sidecar()` (or POST it back via the PATCH endpoint).
When *bw_report* is supplied (e.g. by the ACH-forwarded import path
where Blastware writes a per-event ASCII report alongside the binary),
its decoded fields are folded into the sidecar:
- A new top-level ``bw_report`` block carries the rich derived
per-channel stats (Peak Acceleration, Peak Displacement, ZC Freq,
Time of Peak), the Peak Vector Sum + time, the per-channel sensor
self-check results, and monitor-log timestamps.
- ``peak_values`` is overlaid from the report (the report's PPV/PVS
values are computed by the device firmware and are authoritative;
anything ``read_blastware_file()`` derived from samples is
approximate at best until the body codec is decoded).
- ``project_info`` is overlaid from the report when the report
supplies a non-empty value (the report mirrors the device's
compliance config, which is what BW shows in its event report).
- ``event.timestamp`` is overlaid from the report's Event Date +
Event Time (BW's report timestamps are second-resolution and
match the binary's footer; we prefer the report value because
the BW-binary footer timestamp can drift on some firmware).
"""
if source_kind not in {"sfm-live", "sfm-ach", "bw-import"}:
raise ValueError(f"unknown source_kind: {source_kind!r}")
captured_at = captured_at or datetime.datetime.utcnow()
return {
"schema_version": SCHEMA_VERSION,
"kind": SIDECAR_KIND,
# ── Overlay event fields from the report when present ───────────────────
timestamp_iso = _ts_iso(event.timestamp)
if bw_report and bw_report.event_datetime:
timestamp_iso = bw_report.event_datetime.isoformat()
"event": {
# Build peak_values, optionally overlaid from the report. The report
# stores Mic peak as PSPL (dB(L)); we convert to psi to match the
# existing peak_values.mic_psi field.
peak_dict = _peak_values_to_dict(event.peak_values)
if bw_report:
ch = bw_report.channels
if (t := ch.get("Tran")) and t.ppv_ips is not None: peak_dict["transverse"] = t.ppv_ips
if (v := ch.get("Vert")) and v.ppv_ips is not None: peak_dict["vertical"] = v.ppv_ips
if (l := ch.get("Long")) and l.ppv_ips is not None: peak_dict["longitudinal"] = l.ppv_ips
if bw_report.peak_vector_sum_ips is not None:
peak_dict["vector_sum"] = bw_report.peak_vector_sum_ips
if bw_report.mic.pspl_dbl is not None and bw_report.mic.pspl_dbl > 0:
peak_dict["mic_psi"] = _dbl_to_psi(bw_report.mic.pspl_dbl)
# Project info: overlay from report (the report mirrors the
# session-start compliance config that BW renders in event reports).
proj_dict = _project_info_to_dict(event.project_info)
if bw_report:
if bw_report.project: proj_dict["project"] = bw_report.project
if bw_report.client: proj_dict["client"] = bw_report.client
if bw_report.operator: proj_dict["operator"] = bw_report.operator
if bw_report.sensor_location: proj_dict["sensor_location"] = bw_report.sensor_location
# Event-block fields: overlay from report where available.
event_block = {
"serial": serial,
"timestamp": _ts_iso(event.timestamp),
"timestamp": timestamp_iso,
"waveform_key": event._waveform_key.hex() if event._waveform_key else None,
"record_type": event.record_type,
"sample_rate": event.sample_rate,
"rectime_seconds": event.rectime_seconds,
"total_samples": event.total_samples,
"pretrig_samples": event.pretrig_samples,
},
}
if bw_report:
# Report values are authoritative — they're the user-configured
# values BW reads back, not STRT-derived guesses. In particular
# `event.rectime_seconds` from `read_blastware_file()` reads
# STRT[18] which is actually the `0x46` record-type marker (= 70)
# rather than the user's Record Time setting. Always overwrite.
if bw_report.sample_rate_sps:
event_block["sample_rate"] = bw_report.sample_rate_sps
if bw_report.record_time_s is not None:
event_block["rectime_seconds"] = bw_report.record_time_s
# Derive total_samples + pretrig_samples per channel from the
# report's sample_rate × times. These match the row count of
# the report's sample table (verified: event-c reports 1024 sps
# × (1.0 + 0.25) = 1280 rows).
if (sr := bw_report.sample_rate_sps) and bw_report.record_time_s is not None:
pretrig_s = abs(bw_report.pretrig_s) if bw_report.pretrig_s is not None else 0.0
event_block["total_samples"] = int(round(sr * (bw_report.record_time_s + pretrig_s)))
event_block["pretrig_samples"] = int(round(sr * pretrig_s))
"peak_values": _peak_values_to_dict(event.peak_values),
"project_info": _project_info_to_dict(event.project_info),
out = {
"schema_version": SCHEMA_VERSION,
"kind": SIDECAR_KIND,
"event": event_block,
"peak_values": peak_dict,
"project_info": proj_dict,
"blastware": {
"filename": blastware_filename,
@@ -177,6 +404,11 @@ def event_to_sidecar_dict(
"extensions": extensions or {},
}
if bw_report:
out["bw_report"] = _bw_report_to_dict(bw_report)
return out
# ── Sidecar IO ────────────────────────────────────────────────────────────────
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "seismo-relay"
version = "0.15.0"
version = "0.16.0"
description = "Python client and REST server for MiniMate Plus seismographs"
requires-python = ">=3.10"
dependencies = [
+151
View File
@@ -0,0 +1,151 @@
"""
scripts/repair_unknown_serials.py re-attribute events stuck under
`serial = 'UNKNOWN'` to their correct serial by decoding the BW filename.
Why this is needed
The /db/import/blastware_file endpoint had a bug (fixed in commit a032fa5+1
on the ach-report-ingestion branch) where every forwarded event was inserted
with serial='UNKNOWN' because the endpoint's `_serial_from_event(ev)` stub
returned None and never consulted the BW-filename serial that
`WaveformStore.save_imported_bw()` had already decoded.
Effect on a server that ran a buggy version: every forwarded event's
SeismoDb row has `serial='UNKNOWN'`, even though the on-disk waveform
store has correctly bucketed the files into `BE<NNNN>/` folders. So
the BW binaries / sidecars / HDF5s are fine, but `/db/units` and
`/db/events?serial=...` queries don't surface the events.
This script
Walks the events table looking for rows with `serial='UNKNOWN'` and
re-attributes each one to the serial decoded from its
`blastware_filename` column. If the row's serial would collide with
an existing row (already-correct duplicate from a later re-forward),
the UNKNOWN row is deleted. Otherwise the row's `serial` column is
updated in-place.
Idempotent: re-running after a successful repair finds zero matching
rows and exits cleanly.
Usage
# Dry-run (default): print what would change, don't touch the DB
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db
# Apply the repair
python -m scripts.repair_unknown_serials --db bridges/captures/seismo_relay.db --apply
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from pathlib import Path
# Reach into sfm.waveform_store for the serial decoder. This script
# is run from the repo root via `python -m scripts.repair_unknown_serials`.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from sfm.waveform_store import _serial_from_bw_filename
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="Re-attribute events stuck under serial='UNKNOWN'.",
)
p.add_argument(
"--db", required=True, type=Path,
help="Path to seismo_relay.db (e.g. bridges/captures/seismo_relay.db)",
)
p.add_argument(
"--apply", action="store_true",
help="Apply the repair. Without this flag the script runs in "
"dry-run mode and only reports what would change.",
)
args = p.parse_args(argv)
if not args.db.exists():
print(f"DB not found: {args.db}", file=sys.stderr)
return 2
conn = sqlite3.connect(str(args.db))
conn.row_factory = sqlite3.Row
rows = list(conn.execute(
"SELECT id, serial, timestamp, blastware_filename "
" FROM events "
" WHERE serial = 'UNKNOWN' "
" ORDER BY timestamp",
))
print(f"Found {len(rows)} UNKNOWN-serial rows in events table.")
if not rows:
return 0
updated = 0
deleted = 0
unresolved = 0
by_serial: dict[str, int] = {}
for row in rows:
rid = row["id"]
ts = row["timestamp"]
bw_name = row["blastware_filename"]
new_serial = _serial_from_bw_filename(bw_name) if bw_name else None
if not new_serial:
print(f" ⚠ id={rid[:8]} ts={ts} filename={bw_name!r}"
f"cannot decode serial from filename; skipping")
unresolved += 1
continue
# Check for an existing row at the target (serial, timestamp).
existing = conn.execute(
"SELECT id FROM events WHERE serial = ? AND timestamp = ?",
(new_serial, ts),
).fetchone()
action: str
if existing is None:
# Safe to UPDATE in place.
if args.apply:
conn.execute(
"UPDATE events SET serial = ? WHERE id = ?",
(new_serial, rid),
)
action = "UPDATE"
updated += 1
else:
# A correctly-attributed row already exists. Drop the
# UNKNOWN duplicate.
if args.apply:
conn.execute("DELETE FROM events WHERE id = ?", (rid,))
action = "DELETE (dup)"
deleted += 1
by_serial[new_serial] = by_serial.get(new_serial, 0) + 1
print(f" {action:14s} id={rid[:8]} ts={ts} "
f"filename={bw_name}{new_serial}")
if args.apply:
conn.commit()
conn.close()
print()
print(f"Summary:")
print(f" UNKNOWN rows scanned: {len(rows)}")
print(f" Updated to real serial: {updated}")
print(f" Deleted (duplicate of an ")
print(f" already-correct row): {deleted}")
print(f" Unresolved (bad filename): {unresolved}")
print()
if by_serial:
print(f"Per-serial breakdown of repaired rows:")
for serial, count in sorted(by_serial.items()):
print(f" {serial:12s} {count}")
if not args.apply:
print()
print("(dry-run — re-run with --apply to commit)")
return 0
if __name__ == "__main__":
sys.exit(main())
+117 -23
View File
@@ -374,24 +374,60 @@ class SeismoDb:
inserted += 1
except sqlite3.IntegrityError:
skipped += 1
# Upsert waveform fields onto the existing dedup row so a
# re-download via the live endpoint refreshes filename /
# size / sidecar without churning the rest of the row.
if rec and ts:
# UPSERT path: a row for this (serial, timestamp) already
# exists. Refresh every device-authoritative field from
# the new data so that a re-import with better data (e.g.
# a watcher re-forward where the previous attempt missed
# the paired BW ASCII report) replaces stale peaks /
# project info / sample_rate.
#
# Preserved (not in this UPDATE):
# id, waveform_key, session_id, created_at — immutable / FK
# false_trigger — operator review state
#
# Behaviour change vs prior versions: this UPDATE used
# to only refresh filename / filesize / a5_pickle /
# sidecar fields. As a result, the first insert's
# broken-codec peak values were locked in forever even
# if subsequent re-forwards arrived with correct
# report-derived values. Now every re-import lifts the
# DB row up to whatever the latest Event carries.
conn.execute(
"""
UPDATE events
SET blastware_filename = ?,
SET tran_ppv = ?,
vert_ppv = ?,
long_ppv = ?,
peak_vector_sum = ?,
mic_ppv = ?,
project = ?,
client = ?,
operator = ?,
sensor_location = ?,
sample_rate = ?,
record_type = ?,
blastware_filename = ?,
blastware_filesize = ?,
a5_pickle_filename = ?,
sidecar_filename = ?
WHERE serial = ? AND timestamp = ?
""",
(
rec.get("filename"),
rec.get("filesize"),
rec.get("a5_pickle_filename"),
rec.get("sidecar_filename"),
pv.tran if pv else None,
pv.vert if pv else None,
pv.long if pv else None,
pv.peak_vector_sum if pv else None,
pv.micl if pv else None,
pi.project if pi else None,
pi.client if pi else None,
pi.operator if pi else None,
pi.sensor_location if pi else None,
ev.sample_rate,
ev.record_type,
rec.get("filename") if rec else None,
rec.get("filesize") if rec else None,
rec.get("a5_pickle_filename") if rec else None,
rec.get("sidecar_filename") if rec else None,
serial,
ts,
),
@@ -564,21 +600,79 @@ class SeismoDb:
def query_units(self) -> list[dict]:
"""
Return one row per known serial with summary stats:
last_seen, total_events, total_monitor_entries.
Return one row per known serial with summary stats.
Aggregates from BOTH source tables:
- `events` populated by every ingest path
(live ACH, /db/import/blastware_file
from the series3-watcher forwarder, etc.)
- `ach_sessions` only populated by the live ACH server;
empty for events that came in via the
BW-importer route.
Earlier this method only joined on `ach_sessions`, which made
watcher-forwarded units invisible to the SFM webapp's fleet
overview even though their events were correctly populated in
`events`. Now we union the two and surface every serial that
has activity in either table.
Fields:
serial unit serial number (e.g. "BE11529")
last_seen most recent of MAX(events.timestamp)
and MAX(ach_sessions.session_time)
total_events COUNT(*) from `events` (the
authoritative count regardless of
ingest path)
total_monitor_entries from `ach_sessions`, 0 when absent
total_sessions COUNT(*) from `ach_sessions`, 0 when absent
"""
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
s.serial,
MAX(s.session_time) AS last_seen,
SUM(s.events_downloaded) AS total_events,
SUM(s.monitor_entries) AS total_monitor_entries,
COUNT(*) AS total_sessions
FROM ach_sessions s
GROUP BY s.serial
ORDER BY last_seen DESC
event_stats = {
row["serial"]: row
for row in conn.execute(
"""
SELECT serial,
MAX(timestamp) AS last_event_at,
COUNT(*) AS total_events
FROM events
GROUP BY serial
""",
).fetchall()
return [dict(r) for r in rows]
}
session_stats = {
row["serial"]: row
for row in conn.execute(
"""
SELECT serial,
MAX(session_time) AS last_session_at,
SUM(monitor_entries) AS total_monitor_entries,
COUNT(*) AS total_sessions
FROM ach_sessions
GROUP BY serial
""",
).fetchall()
}
all_serials = set(event_stats) | set(session_stats)
units = []
for serial in all_serials:
e = event_stats.get(serial)
s = session_stats.get(serial)
last_event_at = e["last_event_at"] if e else None
last_session_at = s["last_session_at"] if s else None
# Prefer whichever timestamp is more recent
last_seen = max(
(t for t in (last_event_at, last_session_at) if t),
default=None,
)
units.append({
"serial": serial,
"last_seen": last_seen,
"total_events": e["total_events"] if e else 0,
"total_monitor_entries": s["total_monitor_entries"] if s else 0,
"total_sessions": s["total_sessions"] if s else 0,
})
# Sort by last_seen desc; serials with no timestamp at all sink to the bottom.
units.sort(key=lambda u: u.get("last_seen") or "", reverse=True)
return units
+83 -6
View File
@@ -1619,6 +1619,21 @@ async def db_import_blastware_file(
writes a .sfm.json sidecar with source.kind = "bw-import".
2. Upsert a row into `events` (dedup'd on serial+timestamp).
**Paired BW ASCII reports.** When Blastware's ACH writes events,
it also emits a per-event report alongside each binary as
``<binary>.TXT`` (e.g. ``M529LK44.AB0`` + ``M529LK44.AB0.TXT``).
If a request includes ``.TXT`` files matching a binary's filename,
the report is parsed and its decoded fields land in the sidecar's
``bw_report`` block including device-authoritative peaks, ZC
Freq, Peak Acceleration, Peak Displacement, Time of Peak, sensor
self-check results, and monitor-log timestamps. The daemon-
forwarded ACH workflow should always send both files together
so the SFM database has the rich metadata for sort/filter/report.
Pairing is by exact filename match (case-insensitive on the
extension): a binary named ``foo.AB0`` is paired with a report
named ``foo.AB0.TXT`` or ``foo.AB0.txt``.
Response includes per-file outcomes so the caller can see which
landed cleanly and which failed (e.g. malformed file, unknown
serial, etc.).
@@ -1627,46 +1642,108 @@ async def db_import_blastware_file(
db = _get_db()
results: list[dict] = []
# Read every upload up front (UploadFile.read() is one-shot under
# FastAPI's spooled-tempfile backing) and split into binaries vs
# paired ASCII reports.
binaries: list[tuple[str, bytes]] = []
reports: dict[str, bytes] = {} # keyed by lower-cased stem (without .txt)
for upload in files:
name = upload.filename or ""
try:
content = await upload.read()
except Exception as exc:
results.append({
"filename": upload.filename, "status": "error",
"filename": name or "<unnamed>", "status": "error",
"detail": f"read failed: {exc}",
})
continue
if name.lower().endswith(".txt"):
# Pair the report back to its binary. BW writes ASCII
# reports under two conventions:
#
# 1. ACH convention (Blastware's official Auto Call Home):
# binary: M529LK44.AB0
# report: M529LK44_AB0_ASCII.TXT
# (replaces the "." with "_", appends "_ASCII.TXT")
#
# 2. Manual-export convention (operator clicks Save As Text
# in BW's UI):
# binary: M529LK44.AB0
# report: M529LK44.AB0.TXT
# (literal binary filename + ".TXT" suffix)
#
# We register BOTH possible binary names as keys so the
# subsequent lookup matches whichever convention was used.
stripped = name[:-4] # remove ".TXT"
# ACH convention: strip "_ASCII" and convert the last "_"
# back to "." to recover the binary's filename.
if stripped.lower().endswith("_ascii"):
inner = stripped[:-6] # remove "_ASCII"
under = inner.rfind("_")
if under >= 0:
ach_binary = inner[:under] + "." + inner[under + 1 :]
reports[ach_binary.lower()] = content
# Legacy convention: the stripped name IS the binary's name.
reports[stripped.lower()] = content
else:
binaries.append((name, content))
for filename, content in binaries:
report_bytes = reports.get(filename.lower())
try:
ev, rec = store.save_imported_bw(
content,
source_path=Path(upload.filename or "imported.bw"),
source_path=Path(filename or "imported.bw"),
serial_hint=serial,
bw_report_text=report_bytes,
)
# WaveformStore decoded the serial from the BW filename
# (e.g. T104… → BE18104) and surfaces it on `rec`. Use that
# rather than the placeholder `_serial_from_event(ev)` stub,
# which always returned None and was silently bucketing every
# forwarded event into serial="UNKNOWN" in the DB.
resolved_serial = (
serial
or rec.get("serial")
or _serial_from_event(ev)
or "UNKNOWN"
)
inserted, skipped = db.insert_events(
[ev],
serial=(serial or _serial_from_event(ev) or "UNKNOWN"),
serial=resolved_serial,
waveform_records={
ev._waveform_key.hex(): rec
if ev._waveform_key else None
} if ev._waveform_key else None,
)
results.append({
"filename": upload.filename,
"filename": filename,
"status": "ok",
"stored_filename": rec["filename"],
"filesize": rec["filesize"],
"sha256": rec["sha256"],
"serial": resolved_serial,
"report_attached": report_bytes is not None,
"inserted": inserted,
"skipped": skipped,
})
except Exception as exc:
log.error("import failed for %s: %s", upload.filename, exc, exc_info=True)
log.error("import failed for %s: %s", filename, exc, exc_info=True)
results.append({
"filename": upload.filename, "status": "error",
"filename": filename, "status": "error",
"detail": str(exc),
})
# Surface unmatched .txt uploads so the daemon can detect mis-pairings.
used_report_keys = {fn.lower() for fn, _ in binaries}
for stem in reports.keys() - used_report_keys:
results.append({
"filename": stem + ".txt",
"status": "warning",
"detail": "BW ASCII report supplied but no matching binary in this upload",
})
return {"count": len(results), "results": results}
+45 -4
View File
@@ -34,7 +34,7 @@ import logging
import pickle
import shutil
from pathlib import Path
from typing import Optional
from typing import Optional, Union
from minimateplus import event_file_io
from minimateplus.blastware_file import blastware_filename, write_blastware_file
@@ -258,6 +258,7 @@ class WaveformStore:
source_path: Path,
*,
serial_hint: Optional[str] = None,
bw_report_text: Optional[Union[str, bytes]] = None,
) -> tuple[Event, dict]:
"""
Ingest a Blastware event file produced by an external tool
@@ -267,10 +268,17 @@ class WaveformStore:
Workflow:
1. Parse the bytes via event_file_io.read_blastware_file (writes
a temp file to do that, since the parser takes a path).
2. Resolve serial from BW filename (`<P><serial3>...`) or use
2. Optionally parse a paired BW ASCII event report (the .TXT
file BW writes alongside the binary). When supplied, its
decoded fields land in the sidecar's `bw_report` block AND
overlay the device-authoritative peak values into the
top-level `peak_values` block. This is the right path for
the ACH-forwarder daemon use case where Blastware's own
ACH writes both files into the watch folder.
3. Resolve serial from BW filename (`<P><serial3>...`) or use
serial_hint. Falls back to "UNKNOWN".
3. Copy the BW bytes verbatim into <root>/<serial>/<filename>.
4. Write the .sfm.json sidecar with source.kind = "bw-import"
4. Copy the BW bytes verbatim into <root>/<serial>/<filename>.
5. Write the .sfm.json sidecar with source.kind = "bw-import"
and a5_pickle_filename = None. Does NOT write a .a5.pkl
(no A5 source available; byte-for-byte regeneration not
possible the on-disk BW file IS the byte-for-byte source).
@@ -292,6 +300,37 @@ class WaveformStore:
except FileNotFoundError:
pass
# Parse the BW ASCII report if one was supplied. Failures here
# are non-fatal: we still write the binary + sidecar without the
# rich derived fields.
bw_report = None
if bw_report_text is not None:
try:
from minimateplus.bw_ascii_report import parse_report
bw_report = parse_report(bw_report_text)
except Exception as exc:
log.warning(
"save_imported_bw: BW report parse failed: %s — continuing without it",
exc,
)
# If we have a report, overlay its device-authoritative fields
# (peaks, project, sample_rate, record_time) onto the Event
# BEFORE handing it to db.insert_events(). Without this overlay
# the DB row gets `peak_values` from _peaks_from_samples(), which
# runs the still-undecoded waveform codec on the BW body and
# produces ±10 in/s saturation values on every channel for every
# event. The sidecar JSON had the correct values via
# event_to_sidecar_dict(bw_report=...) but the DB columns didn't.
if bw_report is not None:
try:
event_file_io.apply_report_to_event(ev, bw_report)
except Exception as exc:
log.warning(
"save_imported_bw: failed to overlay report onto event: %s",
exc,
)
# Resolve serial. blastware_filename derives a 4-char prefix from
# the numeric serial (e.g. BE11529 → M529); we go the other way
# via the source filename if a hint wasn't given.
@@ -345,6 +384,7 @@ class WaveformStore:
source_kind="bw-import",
a5_pickle_filename=None,
review=existing_review,
bw_report=bw_report,
)
event_file_io.write_sidecar(sidecar_path, sidecar)
@@ -360,6 +400,7 @@ class WaveformStore:
"a5_pickle_filename": None,
"hdf5_filename": hdf5_filename,
"sidecar_filename": sidecar_path.name,
"serial": serial,
}
def load_a5(self, serial: str, filename: str) -> Optional[list[S3Frame]]:
+407
View File
@@ -0,0 +1,407 @@
"""
test_bw_ascii_report.py parser for Blastware's per-event ASCII export.
Run:
python -m pytest tests/test_bw_ascii_report.py -q
"""
from __future__ import annotations
import datetime
import os
import sys
from pathlib import Path
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from minimateplus.bw_ascii_report import (
BwAsciiReport,
parse_report,
parse_report_file,
)
FIXTURES = Path(__file__).parent.parent / "decode-re" / "5-8-26"
def _fixture(event_name: str) -> Path:
"""Find the .TXT file inside a fixture event folder."""
for p in (FIXTURES / event_name).iterdir():
if p.suffix.lower() == ".txt":
return p
raise FileNotFoundError(f"no .TXT in {FIXTURES / event_name}")
# ── Identity / config ───────────────────────────────────────────────────────
def test_event_c_identity_and_config():
r = parse_report_file(_fixture("event-c"))
assert r.event_type == "Full Waveform"
assert r.serial == "BE11529"
assert r.file_name == "M529LK44.AB0"
assert r.event_datetime == datetime.datetime(2026, 4, 23, 15, 56, 35)
assert r.trigger_channel == "Vert"
assert r.geo_trigger_level_ips == pytest.approx(0.5)
assert r.pretrig_s == pytest.approx(-0.25)
assert r.record_time_s == pytest.approx(1.0)
assert r.record_stop_mode == "Fixed"
assert r.sample_rate_sps == 1024
assert r.battery_volts == pytest.approx(6.8)
assert r.calibration_date == datetime.date(2025, 4, 29)
assert r.calibration_by == "Instantel"
assert r.units == "in/s and dB(L)"
def test_event_c_operator_metadata():
r = parse_report_file(_fixture("event-c"))
# The "Project: : value" pattern (key has its own trailing colon)
# is handled by stripping the colon at lookup time.
assert r.project == "Test4-21-26"
assert r.client == "Test-Client1"
assert r.operator == "Brian and claude"
assert r.sensor_location == "catbed"
def test_event_c_geo_range():
r = parse_report_file(_fixture("event-c"))
assert r.geo_range_ips == pytest.approx(10.0)
# ── Per-channel derived stats ───────────────────────────────────────────────
def test_event_c_per_channel_stats():
r = parse_report_file(_fixture("event-c"))
tran = r.channels["Tran"]
assert tran.ppv_ips == pytest.approx(0.065)
assert tran.zc_freq_hz == pytest.approx(47.0)
assert tran.time_of_peak_s == pytest.approx(0.007)
assert tran.peak_accel_g == pytest.approx(0.066)
assert tran.peak_disp_in == pytest.approx(0.001)
vert = r.channels["Vert"]
assert vert.ppv_ips == pytest.approx(0.610)
assert vert.zc_freq_hz == pytest.approx(16.0)
assert vert.time_of_peak_s == pytest.approx(0.024)
assert vert.peak_accel_g == pytest.approx(0.437)
assert vert.peak_disp_in == pytest.approx(0.006)
long_ = r.channels["Long"]
assert long_.ppv_ips == pytest.approx(0.070)
assert long_.zc_freq_hz == pytest.approx(22.0)
assert long_.time_of_peak_s == pytest.approx(0.019)
assert long_.peak_accel_g == pytest.approx(0.040)
assert long_.peak_disp_in == pytest.approx(0.001)
def test_event_c_micl_stats():
r = parse_report_file(_fixture("event-c"))
# MicL specific block
assert r.mic.weighting == "Linear Weighting"
assert r.mic.pspl_dbl == pytest.approx(88.0)
assert r.mic.zc_freq_hz == pytest.approx(57.0)
assert r.mic.time_of_peak_s == pytest.approx(-0.004)
# Mirrored onto channels["MicL"] for uniform per-channel access
micl_ch = r.channels["MicL"]
assert micl_ch.zc_freq_hz == pytest.approx(57.0)
assert micl_ch.time_of_peak_s == pytest.approx(-0.004)
def test_event_c_vector_sum():
r = parse_report_file(_fixture("event-c"))
assert r.peak_vector_sum_ips == pytest.approx(0.612)
assert r.peak_vector_sum_time_s == pytest.approx(0.024)
# ── Sensor self-check ───────────────────────────────────────────────────────
def test_event_c_sensor_check_geo_channels():
r = parse_report_file(_fixture("event-c"))
for ch_name, expected_freq, expected_ratio in [
("Tran", 7.4, 3.7),
("Vert", 7.6, 3.5),
("Long", 7.5, 3.8),
]:
sc = r.sensor_check[ch_name]
assert sc.test_freq_hz == pytest.approx(expected_freq), ch_name
assert sc.test_ratio == pytest.approx(expected_ratio), ch_name
assert sc.test_results == "Passed", ch_name
# Geo channels don't have an Test Amplitude
assert sc.test_amplitude_mv is None
def test_event_c_sensor_check_micl():
r = parse_report_file(_fixture("event-c"))
sc = r.sensor_check["MicL"]
assert sc.test_freq_hz == pytest.approx(20.1)
assert sc.test_amplitude_mv == pytest.approx(533.0)
assert sc.test_results == "Passed"
# MicL doesn't have a ratio — it has amplitude instead
assert sc.test_ratio is None
# ── Monitor log + tooling ───────────────────────────────────────────────────
def test_event_c_monitor_log_and_pc_version():
r = parse_report_file(_fixture("event-c"))
assert len(r.monitor_log) == 1
e = r.monitor_log[0]
assert e.start_time == datetime.datetime(2026, 4, 23, 15, 46, 16)
assert e.stop_time == datetime.datetime(2026, 4, 23, 15, 56, 36)
assert e.description == "Event recorded."
assert r.pc_sw_version == "V 10.74"
# ── Sample table ─────────────────────────────────────────────────────────────
def test_event_c_sample_table_parsed_when_requested():
r = parse_report_file(_fixture("event-c"), parse_samples=True)
# 1 sec event @ 1024 sps + 0.25 sec pretrig = 1280 samples
assert r.samples is not None
assert len(r.samples) == 1280, f"expected 1280 samples, got {len(r.samples)}"
# First row: "0.000 \t0.005 \t0.005 \t-81.94"
t, v, l, m = r.samples[0]
assert t == pytest.approx(0.000)
assert v == pytest.approx(0.005)
assert l == pytest.approx(0.005)
assert m == pytest.approx(-81.94)
def test_event_c_sample_table_skipped_by_default():
r = parse_report_file(_fixture("event-c"))
assert r.samples is None
# ── Cross-event smoke ───────────────────────────────────────────────────────
@pytest.mark.parametrize("event_name", ["event-a", "event-b", "event-c", "event-d"])
def test_all_fixtures_parse_without_error(event_name):
"""Every fixture in the bundle must parse cleanly with the same parser."""
r = parse_report_file(_fixture(event_name))
# Common invariants: serial, event_datetime, sample rate, all four
# channels surfaced.
assert r.serial == "BE11529"
assert r.event_datetime is not None
assert r.sample_rate_sps in (1024, 2048, 4096)
for ch in ("Tran", "Vert", "Long", "MicL"):
assert ch in r.channels
assert ch in r.sensor_check
# PVS should be present and positive on triggered events
if r.peak_vector_sum_ips is not None:
assert r.peak_vector_sum_ips >= 0
# ── Edge cases / defensive parsing ──────────────────────────────────────────
def test_parse_empty_input():
r = parse_report("")
assert r.serial is None
assert r.event_datetime is None
assert all(cs.ppv_ips is None for cs in r.channels.values())
def test_parse_unknown_keys_ignored():
"""Forward-compat: future BW versions may add fields we don't recognise.
Those should be silently dropped, not raise."""
text = (
'"Serial Number : BE99999"\n'
'"Future Field That Does Not Exist : 42 widgets"\n'
'"Tran PPV : 0.123 in/s"\n'
)
r = parse_report(text)
assert r.serial == "BE99999"
assert r.channels["Tran"].ppv_ips == pytest.approx(0.123)
def test_parse_numeric_with_units_strips_unit():
text = (
'"Vert PPV : 1.275 in/s"\n'
'"Vert ZC Freq : 23 Hz"\n'
'"MicL Test Amplitude : 569 mv"\n'
)
r = parse_report(text)
assert r.channels["Vert"].ppv_ips == pytest.approx(1.275)
assert r.channels["Vert"].zc_freq_hz == pytest.approx(23.0)
assert r.sensor_check["MicL"].test_amplitude_mv == pytest.approx(569.0)
def test_parse_handles_micl_double_space_in_key():
"""BW writes "MicL Time of Peak" with TWO spaces; the parser must
normalise whitespace before key lookup."""
text = (
'"MicL Time of Peak : 0.012 sec"\n'
'"MicL ZC Freq : 51 Hz"\n'
)
r = parse_report(text)
assert r.mic.time_of_peak_s == pytest.approx(0.012)
assert r.mic.zc_freq_hz == pytest.approx(51.0)
# ── Position-based user-notes parsing ───────────────────────────────────────
#
# The 4 user-supplied note slots (Project / Client / User Name / Seis Loc
# by default) have OPERATOR-EDITABLE labels in BW's Compliance Setup →
# Notes tab. An operator could rename them to "Building:", "Site:",
# "Address:", etc. and the ASCII export would write those labels
# verbatim. We parse by POSITION between the `Units :` and `Geo Range :`
# anchors, NOT by matching the label text.
def _wrap_user_notes(*lines: str) -> str:
"""Helper: wrap N user-note lines in the minimal context the parser
needs (`Units :` opens the block, `Geo Range :` closes it)."""
body = ['"Units : in/s and dB(L)"']
body.extend('"' + l + '"' for l in lines)
body.append('"Geo Range : 10.000 in/s"')
return "\n".join(body) + "\n"
def test_user_notes_default_labels_populate_by_position():
"""The BW-default labels (Project / Client / User Name / Seis Loc)
populate the four canonical slots in order."""
r = parse_report(_wrap_user_notes(
"Project: : Test4-21-26",
"Client: : Acme Inc",
"User Name: : Brian",
"Seis Loc: : Catbed",
))
assert r.project == "Test4-21-26"
assert r.client == "Acme Inc"
assert r.operator == "Brian"
assert r.sensor_location == "Catbed"
assert r.user_note_labels == {
"project": "Project:",
"client": "Client:",
"operator": "User Name:",
"sensor_location": "Seis Loc:",
}
def test_user_notes_operator_renamed_labels_still_populate():
"""If the operator renames the labels in BW's UI (e.g. "Seis Loc:"
"Building:"), the values STILL populate the canonical slots by
position and the operator's labels are preserved in
`user_note_labels` for terra-view to display."""
r = parse_report(_wrap_user_notes(
"Building : Main Office",
"Project Manager : Brian",
"Inspector : Claude",
"Site Address : 123 Main St",
))
assert r.project == "Main Office"
assert r.client == "Brian"
assert r.operator == "Claude"
assert r.sensor_location == "123 Main St"
assert r.user_note_labels == {
"project": "Building",
"client": "Project Manager",
"operator": "Inspector",
"sensor_location": "Site Address",
}
def test_user_notes_with_histogram_label_spelling():
"""Histogram exports use 'Seis. Location:' (with period and colon)
instead of 'Seis Loc:'. Position-based parsing handles both."""
r = parse_report(_wrap_user_notes(
"Project: : Plum Cont.- Rainbow Run",
"Client: : Plum Contracting In.c",
"User Name: : Terra-Mechanics Inc.",
"Seis. Location: : Loc #1 - 2652 Hepner",
))
assert r.project == "Plum Cont.- Rainbow Run"
assert r.client == "Plum Contracting In.c"
assert r.operator == "Terra-Mechanics Inc."
assert r.sensor_location == "Loc #1 - 2652 Hepner"
# And the histogram's specific label spelling is preserved
assert r.user_note_labels["sensor_location"] == "Seis. Location:"
def test_user_notes_outside_block_are_ignored():
"""Lines that look like user-notes but appear OUTSIDE the
UnitsGeo Range range don't get assigned to user-note slots."""
# No Units anchor — these lines shouldn't populate user-note slots
text = (
'"Serial Number : BE11529"\n'
'"Project: : SHOULD NOT POPULATE"\n'
)
r = parse_report(text)
assert r.serial == "BE11529"
assert r.project is None
def test_user_notes_partial_block_only_fills_present_slots():
"""If BW writes fewer than 4 user-notes (e.g. operator disabled
Extended Notes mid-block), only the present positions populate;
later slots stay None."""
r = parse_report(_wrap_user_notes(
"Project: : Just-a-project",
"Client: : Just-a-client",
))
assert r.project == "Just-a-project"
assert r.client == "Just-a-client"
assert r.operator is None
assert r.sensor_location is None
def test_user_notes_extra_lines_beyond_four_are_dropped():
"""If somehow more than 4 lines appear in the user-notes block
(e.g. BW adds an Extended Notes line), only the first 4 are
captured slots 5+ have nowhere to go."""
r = parse_report(_wrap_user_notes(
"L1 : v1",
"L2 : v2",
"L3 : v3",
"L4 : v4",
"L5 : v5", # ignored — no fifth slot
))
assert r.project == "v1"
assert r.client == "v2"
assert r.operator == "v3"
assert r.sensor_location == "v4"
# 5th label not captured
assert "L5" not in r.user_note_labels.values()
def test_real_histogram_fixture_populates_sensor_location():
"""End-to-end: the histogram fixture uses 'Seis. Location:' — must
successfully populate sensor_location via position-based parsing."""
fixture_dir = (
Path(__file__).parent.parent / "example-events" / "histogram"
)
if not fixture_dir.exists():
pytest.skip("histogram fixtures not present")
txt = next(fixture_dir.glob("*_ASCII.TXT"), None)
if txt is None:
pytest.skip("no histogram TXT in fixture dir")
r = parse_report_file(txt)
assert r.sensor_location is not None
assert len(r.sensor_location) > 0
assert r.user_note_labels.get("sensor_location") is not None
# Sanity: other shared fields still parse correctly
assert r.serial is not None
assert r.serial.startswith("BE")
assert r.geo_range_ips is not None
+112
View File
@@ -294,6 +294,114 @@ def test_read_blastware_file_round_trip(tmp_path: Path):
assert parsed.peak_values.peak_vector_sum == 0.0
def test_save_imported_bw_with_paired_report(tmp_path: Path):
"""save_imported_bw + a paired BW ASCII report fold the report's
rich derived fields into the sidecar. This is the daemon-forwarded
ACH workflow: BW writes <event>.AB0 and <event>.AB0.TXT side by side;
the daemon ships both; we overlay the report-decoded values onto the
sidecar (peaks, project, plus the rich `bw_report` block)."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
# Use one of the real BW ASCII exports as the paired report.
report_path = (
Path(__file__).parent.parent
/ "decode-re" / "5-8-26" / "event-c" / "M529LK44.AB0.TXT"
)
if not report_path.exists():
import pytest as _pt
_pt.skip("decode-re fixtures not present")
report_bytes = report_path.read_bytes()
store = WaveformStore(tmp_path / "waveforms")
parsed_ev, rec = store.save_imported_bw(
src.read_bytes(),
source_path=src,
bw_report_text=report_bytes,
)
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
# ── bw_report block populated with the rich fields ──────────────────
assert "bw_report" in sc
br = sc["bw_report"]
assert br["available"] is True
assert br["event_type"] == "Full Waveform"
assert br["recording"]["sample_rate_sps"] == 1024
assert br["recording"]["geo_range_ips"] == 10.0
# Per-channel derived stats
assert br["peaks"]["tran"]["ppv_ips"] == 0.065
assert br["peaks"]["vert"]["ppv_ips"] == 0.610
assert br["peaks"]["long"]["ppv_ips"] == 0.070
assert br["peaks"]["vert"]["peak_accel_g"] == 0.437
assert br["peaks"]["vert"]["peak_disp_in"] == 0.006
assert br["peaks"]["tran"]["zc_freq_hz"] == 47.0
assert br["peaks"]["vector_sum"]["ips"] == 0.612
assert br["peaks"]["vector_sum"]["time_s"] == 0.024
# Sensor self-check per channel
assert br["sensor_check"]["tran"]["freq_hz"] == 7.4
assert br["sensor_check"]["tran"]["ratio"] == 3.7
assert br["sensor_check"]["tran"]["result"] == "Passed"
assert br["sensor_check"]["mic"]["amplitude_mv"] == 533.0
# Mic block
assert br["mic"]["weighting"] == "Linear Weighting"
assert br["mic"]["pspl_dbl"] == 88.0
# Monitor log roundtripped
assert len(br["monitor_log"]) == 1
assert "2026-04-23T15:46:16" in br["monitor_log"][0]["start"]
assert br["pc_sw_version"] == "V 10.74"
# ── Overlay onto canonical peak_values ──────────────────────────────
# Report values win over the broken-codec samples-derived peaks.
assert sc["peak_values"]["transverse"] == 0.065
assert sc["peak_values"]["vertical"] == 0.610
assert sc["peak_values"]["longitudinal"] == 0.070
assert sc["peak_values"]["vector_sum"] == 0.612
# Mic PSPL converted to psi (dbl=88 → 10^(88/20) * 2.9e-9)
assert sc["peak_values"]["mic_psi"] is not None
assert 1e-5 < sc["peak_values"]["mic_psi"] < 1e-3
# ── Overlay onto project_info ───────────────────────────────────────
assert sc["project_info"]["project"] == "Test4-21-26"
assert sc["project_info"]["client"] == "Test-Client1"
assert sc["project_info"]["operator"] == "Brian and claude"
assert sc["project_info"]["sensor_location"] == "catbed"
# ── Event timestamp overlaid from report ───────────────────────────
assert sc["event"]["timestamp"] == "2026-04-23T15:56:35"
def test_save_imported_bw_without_report_works_unchanged(tmp_path: Path):
"""Calling save_imported_bw with no bw_report_text behaves exactly
as before no `bw_report` block, peak_values come from samples."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
from sfm.waveform_store import WaveformStore
ev, frames = _make_synthetic_event()
fname = blastware_filename(ev, "BE11529")
src = tmp_path / fname
write_blastware_file(ev, frames, src)
store = WaveformStore(tmp_path / "waveforms")
store.save_imported_bw(src.read_bytes(), source_path=src)
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
assert "bw_report" not in sc # block is absent without a report
# Synthetic event has zero samples → peaks all zero (was true before this change)
assert sc["peak_values"]["transverse"] == 0.0
def test_save_imported_bw_round_trip(tmp_path: Path):
"""save_imported_bw stores a copy + sidecar with source.kind = bw-import."""
from minimateplus.blastware_file import write_blastware_file, blastware_filename
@@ -310,6 +418,10 @@ def test_save_imported_bw_round_trip(tmp_path: Path):
assert rec["filename"] == fname
assert rec["a5_pickle_filename"] is None # no A5 source for BW imports
# The serial decoded from the BW filename surfaces on the record so
# the import endpoint can use it when calling SeismoDb.insert_events()
# (otherwise forwarded events would all bucket into serial="UNKNOWN").
assert rec["serial"] == "BE11529"
sc = store.load_sidecar("BE11529", fname)
assert sc is not None
assert sc["source"]["kind"] == "bw-import"