1 Commits

4 changed files with 299 additions and 1 deletions
+14
View File
@@ -1362,6 +1362,20 @@ def _decode_waveform_record_into(data: bytes, event: Event) -> None:
Modifies event in-place.
"""
# ── Always preserve the raw 210 bytes ─────────────────────────────────────
# The 0C record carries far more than just peaks + project strings:
# ZC Freq, Time of Peak, Peak Acceleration, Peak Displacement, Vector
# Sum Time, MicL Time of Peak, and the per-channel sensor self-check
# results (Test Freq / Ratio / Pass-Fail) all live somewhere in this
# 210-byte block. Their byte offsets are not yet mapped — keeping the
# raw bytes lets us decode those fields offline once we have a paired
# (raw 0C, BW-report) sample to fit against. Cheap to keep around
# (210 bytes per event).
try:
event._raw_record = bytes(data[:210])
except Exception:
pass
# ── Record type + format detection ────────────────────────────────────────
# `record_type` is the user-facing label ("Waveform" for any triggered
# event regardless of timestamp-header layout). `fmt` is the internal
+16 -1
View File
@@ -15,6 +15,7 @@ declared in `event_to_sidecar_dict()`.
from __future__ import annotations
import base64
import datetime
import hashlib
import json
@@ -135,6 +136,20 @@ def event_to_sidecar_dict(
captured_at = captured_at or datetime.datetime.utcnow()
# Stash raw 0C record bytes in `extensions.raw_records` so future
# field-decoding work (Peak Acceleration, ZC Freq, Time of Peak,
# sensor self-check results, etc.) can run offline against committed
# sidecars without a live device. Cheap (~280 bytes base64) and
# forward-compatible (older readers ignore unknown extensions keys).
ext_dict: dict = dict(extensions) if extensions else {}
raw_0c = getattr(event, "_raw_record", None)
if raw_0c:
rr = ext_dict.setdefault("raw_records", {})
# Don't clobber a raw_0c that callers explicitly passed in via
# `extensions=...` (e.g. round-trip preservation in patch_sidecar).
rr.setdefault("waveform_record_b64", base64.b64encode(raw_0c).decode("ascii"))
rr.setdefault("waveform_record_len", len(raw_0c))
return {
"schema_version": SCHEMA_VERSION,
"kind": SIDECAR_KIND,
@@ -174,7 +189,7 @@ def event_to_sidecar_dict(
"notes": "",
},
"extensions": extensions or {},
"extensions": ext_dict,
}
+216
View File
@@ -0,0 +1,216 @@
"""
sfm.dump_0c — inspect the raw 210-byte SUB 0C waveform record stored in a
sidecar JSON's `extensions.raw_records.waveform_record_b64`.
Usage:
python -m sfm.dump_0c <sidecar.sfm.json> [<sidecar.sfm.json> ...]
Prints, for each input:
- A header summarising the sidecar's metadata-block claims (peaks,
project, timestamp) — the "what BW says this event measured" view.
- A 16-byte-wide hex dump of the raw 0C record, annotated with known
field anchors (STRT, channel labels, project strings).
- A "candidate float regions" scan that brute-forces every byte
position as a float32 BE and prints any that yield a value in a
plausible range (1e-7 to 1e3) — useful for hunting where Peak
Acceleration / Peak Displacement / ZC Freq / Time of Peak live.
Pairing the printed candidates with the BW Event Report values lets
us nail down byte offsets for the missing fields without a live
device.
"""
from __future__ import annotations
import argparse
import base64
import json
import struct
import sys
from pathlib import Path
# ── Annotations for known anchors in a 210-byte 0C record ──────────────────
# Anchors we look for and label inline in the hex dump. Each is a needle
# (bytes to find) and a short label. Found via .find() — the first
# occurrence wins.
_ANCHORS = [
(b"Tran", "Tran label (PPV @ +6, PVS @ -12)"),
(b"Vert", "Vert label (PPV @ +6)"),
(b"Long", "Long label (PPV @ +6)"),
(b"MicL", "MicL label (peak psi @ +6)"),
(b"Project:", "Project: label"),
(b"Client:", "Client: label"),
(b"User Name:", "User Name: label"),
(b"Seis Loc:", "Seis Loc: label"),
(b"Extended Notes", "Extended Notes label"),
]
def _hex_dump(data: bytes, anchors: dict[int, str]) -> str:
"""Return a 16-byte-wide hex+ASCII dump, with anchor labels printed
on the line that contains the anchor's start byte."""
lines = []
for off in range(0, len(data), 16):
chunk = data[off : off + 16]
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
line = f" {off:04x} {hex_part:<47} |{ascii_part}|"
# If any anchor lands on a byte in this row, append a tag
tags = [
f"[{a:#04x}: {label}]"
for a, label in anchors.items()
if off <= a < off + 16
]
if tags:
line += " " + " ".join(tags)
lines.append(line)
return "\n".join(lines)
def _scan_float32_be(data: bytes, lo: float, hi: float) -> list[tuple[int, float]]:
"""Brute-force every offset where data[off:off+4] is a float32 BE in
(lo, hi). Includes negatives in the symmetric range."""
hits = []
for i in range(len(data) - 3):
try:
v = struct.unpack_from(">f", data, i)[0]
except struct.error:
continue
if v != v: # NaN
continue
if abs(v) < 1e-30 or abs(v) > 1e10: # crap range
continue
a = abs(v)
if lo <= a <= hi:
hits.append((i, v))
return hits
def _scan_uint16_be(data: bytes, lo: int, hi: int) -> list[tuple[int, int]]:
"""Find every offset where uint16 BE is in [lo, hi]."""
hits = []
for i in range(len(data) - 1):
v = (data[i] << 8) | data[i + 1]
if lo <= v <= hi:
hits.append((i, v))
return hits
def _summarize_sidecar(side: dict) -> str:
ev = side.get("event", {})
pv = side.get("peak_values", {})
pi = side.get("project_info", {})
bw = side.get("blastware", {})
return (
f" serial: {ev.get('serial')}\n"
f" timestamp: {ev.get('timestamp')}\n"
f" waveform: {ev.get('waveform_key')} ({ev.get('record_type')})\n"
f" sample_rate:{ev.get('sample_rate')} sps rectime:{ev.get('rectime_seconds')}s\n"
f" bw file: {bw.get('filename')} ({bw.get('filesize')} B)\n"
f" peaks: "
f"Tran={pv.get('transverse'):.5f} "
f"Vert={pv.get('vertical'):.5f} "
f"Long={pv.get('longitudinal'):.5f} "
f"PVS={pv.get('vector_sum'):.5f} in/s "
f"Mic={pv.get('mic_psi'):.6e} psi"
if all(pv.get(k) is not None for k in
("transverse", "vertical", "longitudinal", "vector_sum", "mic_psi"))
else f" peaks: {pv}\n project: {pi}"
) + (
f"\n project: {pi.get('project')!r} / {pi.get('client')!r} / "
f"operator={pi.get('operator')!r} loc={pi.get('sensor_location')!r}"
)
def dump_one(path: Path) -> int:
side = json.loads(path.read_text(encoding="utf-8"))
raw_b64 = (
side.get("extensions", {})
.get("raw_records", {})
.get("waveform_record_b64")
)
if not raw_b64:
print(f"\n=== {path} ===")
print(" ! no extensions.raw_records.waveform_record_b64 — sidecar")
print(" pre-dates raw-0C persistence (added in v0.15.x). Re-save")
print(" the event from the device to capture the bytes.")
return 1
raw = base64.b64decode(raw_b64)
# Build anchor map
anchors: dict[int, str] = {}
for needle, label in _ANCHORS:
i = raw.find(needle)
if i >= 0:
anchors[i] = label
print(f"\n=== {path} ===")
print("metadata claimed by sidecar:")
print(_summarize_sidecar(side))
print(f"\nraw 0C record ({len(raw)} bytes):")
print(_hex_dump(raw, anchors))
# Float32 BE candidates in geo-relevant ranges
geo_hits = _scan_float32_be(raw, 1e-5, 50.0)
# Filter: only show hits that are NOT trivially the per-channel labels'
# +6 PPV floats already documented (those will land in any sweep too).
print("\nfloat32 BE candidates (1e-5 .. 50.0):")
for off, v in geo_hits:
annotation = ""
for needle, _ in _ANCHORS[:4]: # geo + mic labels
i = raw.find(needle)
if i >= 0 and off == i + 6:
annotation = f"{needle.decode()} PPV (label+6)"
break
print(f" {off:#04x} ({off:3d}) {v:>+15.6f}{annotation}")
print("\nuint16 BE candidates ZC-Freq-ish (1..200):")
for off, v in _scan_uint16_be(raw, 1, 200):
if v < 5: # too noisy at very low end
continue
print(f" {off:#04x} ({off:3d}) = {v}")
print("\nuint16 BE candidates Time-of-Peak-ish if stored as ms (1..30000):")
for off, v in _scan_uint16_be(raw, 1, 30000):
if v < 100: # noise filter
continue
# Only the first ~80 are worth showing — too many hits otherwise
if off > 80:
break
print(f" {off:#04x} ({off:3d}) = {v} ms ?")
print()
return 0
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="Inspect a saved 0C waveform record from a sidecar JSON.",
)
p.add_argument(
"sidecars",
nargs="+",
type=Path,
help="Path(s) to <event>.sfm.json sidecar file(s).",
)
args = p.parse_args(argv)
rc = 0
for path in args.sidecars:
try:
rc |= dump_one(path)
except Exception as exc:
print(f"\n=== {path} ===\n ERROR: {exc}", file=sys.stderr)
rc |= 2
return rc
if __name__ == "__main__":
sys.exit(main())
+53
View File
@@ -127,6 +127,59 @@ def test_sidecar_write_and_read_round_trip(tmp_path: Path):
assert loaded["source"]["kind"] == "sfm-ach"
def test_sidecar_persists_raw_0c_record_in_extensions(tmp_path: Path):
"""An Event with _raw_record populated should land its 210 bytes
base64-encoded in extensions.raw_records.waveform_record_b64, so
later analysis (e.g. mapping Peak Acceleration / Time of Peak / ZC
Freq byte offsets) can run offline against the saved sidecar."""
import base64
ev, _ = _make_synthetic_event()
# Synthesize a 210-byte 0C record with embedded label needles so
# the dump tool's anchor scan has something to find.
raw = bytearray(210)
raw[10:14] = b"Tran"
raw[60:64] = b"Vert"
raw[110:114] = b"Long"
raw[160:164] = b"MicL"
ev._raw_record = bytes(raw)
d = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="M529LKIQ.7M0W", blastware_filesize=1024,
blastware_sha256="x" * 64, source_kind="sfm-live",
)
rr = d["extensions"]["raw_records"]
assert rr["waveform_record_len"] == 210
decoded = base64.b64decode(rr["waveform_record_b64"])
assert decoded == ev._raw_record
# Round-trip through write/read
path = tmp_path / "raw0c.sfm.json"
event_file_io.write_sidecar(path, d)
loaded = event_file_io.read_sidecar(path)
assert (
base64.b64decode(loaded["extensions"]["raw_records"]["waveform_record_b64"])
== ev._raw_record
)
def test_sidecar_omits_raw_records_when_event_has_no_0c(tmp_path: Path):
"""Events without a _raw_record (e.g. constructed by importers that
never see 0C) should NOT add an empty raw_records block keep the
sidecar clean for those flows."""
ev, _ = _make_synthetic_event()
assert ev._raw_record is None
d = event_file_io.event_to_sidecar_dict(
ev, serial="BE11529",
blastware_filename="M529LKIQ.7M0W", blastware_filesize=1024,
blastware_sha256="x" * 64, source_kind="bw-import",
)
assert d["extensions"] == {}
def test_sidecar_rejects_unsupported_schema_version(tmp_path: Path):
path = tmp_path / "future.sfm.json"
path.write_text(json.dumps({