217 lines
7.3 KiB
Python
217 lines
7.3 KiB
Python
"""
|
|
sfm.dump_0c — inspect the raw 210-byte SUB 0C waveform record stored in a
|
|
sidecar JSON's `extensions.raw_records.waveform_record_b64`.
|
|
|
|
Usage:
|
|
|
|
python -m sfm.dump_0c <sidecar.sfm.json> [<sidecar.sfm.json> ...]
|
|
|
|
Prints, for each input:
|
|
- A header summarising the sidecar's metadata-block claims (peaks,
|
|
project, timestamp) — the "what BW says this event measured" view.
|
|
- A 16-byte-wide hex dump of the raw 0C record, annotated with known
|
|
field anchors (STRT, channel labels, project strings).
|
|
- A "candidate float regions" scan that brute-forces every byte
|
|
position as a float32 BE and prints any that yield a value in a
|
|
plausible range (1e-7 to 1e3) — useful for hunting where Peak
|
|
Acceleration / Peak Displacement / ZC Freq / Time of Peak live.
|
|
|
|
Pairing the printed candidates with the BW Event Report values lets
|
|
us nail down byte offsets for the missing fields without a live
|
|
device.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import struct
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
# ── Annotations for known anchors in a 210-byte 0C record ──────────────────
|
|
|
|
# Anchors we look for and label inline in the hex dump. Each is a needle
|
|
# (bytes to find) and a short label. Found via .find() — the first
|
|
# occurrence wins.
|
|
_ANCHORS = [
|
|
(b"Tran", "Tran label (PPV @ +6, PVS @ -12)"),
|
|
(b"Vert", "Vert label (PPV @ +6)"),
|
|
(b"Long", "Long label (PPV @ +6)"),
|
|
(b"MicL", "MicL label (peak psi @ +6)"),
|
|
(b"Project:", "Project: label"),
|
|
(b"Client:", "Client: label"),
|
|
(b"User Name:", "User Name: label"),
|
|
(b"Seis Loc:", "Seis Loc: label"),
|
|
(b"Extended Notes", "Extended Notes label"),
|
|
]
|
|
|
|
|
|
def _hex_dump(data: bytes, anchors: dict[int, str]) -> str:
|
|
"""Return a 16-byte-wide hex+ASCII dump, with anchor labels printed
|
|
on the line that contains the anchor's start byte."""
|
|
lines = []
|
|
for off in range(0, len(data), 16):
|
|
chunk = data[off : off + 16]
|
|
hex_part = " ".join(f"{b:02x}" for b in chunk)
|
|
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
|
|
line = f" {off:04x} {hex_part:<47} |{ascii_part}|"
|
|
|
|
# If any anchor lands on a byte in this row, append a tag
|
|
tags = [
|
|
f"[{a:#04x}: {label}]"
|
|
for a, label in anchors.items()
|
|
if off <= a < off + 16
|
|
]
|
|
if tags:
|
|
line += " " + " ".join(tags)
|
|
lines.append(line)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _scan_float32_be(data: bytes, lo: float, hi: float) -> list[tuple[int, float]]:
|
|
"""Brute-force every offset where data[off:off+4] is a float32 BE in
|
|
(lo, hi). Includes negatives in the symmetric range."""
|
|
hits = []
|
|
for i in range(len(data) - 3):
|
|
try:
|
|
v = struct.unpack_from(">f", data, i)[0]
|
|
except struct.error:
|
|
continue
|
|
if v != v: # NaN
|
|
continue
|
|
if abs(v) < 1e-30 or abs(v) > 1e10: # crap range
|
|
continue
|
|
a = abs(v)
|
|
if lo <= a <= hi:
|
|
hits.append((i, v))
|
|
return hits
|
|
|
|
|
|
def _scan_uint16_be(data: bytes, lo: int, hi: int) -> list[tuple[int, int]]:
|
|
"""Find every offset where uint16 BE is in [lo, hi]."""
|
|
hits = []
|
|
for i in range(len(data) - 1):
|
|
v = (data[i] << 8) | data[i + 1]
|
|
if lo <= v <= hi:
|
|
hits.append((i, v))
|
|
return hits
|
|
|
|
|
|
def _summarize_sidecar(side: dict) -> str:
|
|
ev = side.get("event", {})
|
|
pv = side.get("peak_values", {})
|
|
pi = side.get("project_info", {})
|
|
bw = side.get("blastware", {})
|
|
return (
|
|
f" serial: {ev.get('serial')}\n"
|
|
f" timestamp: {ev.get('timestamp')}\n"
|
|
f" waveform: {ev.get('waveform_key')} ({ev.get('record_type')})\n"
|
|
f" sample_rate:{ev.get('sample_rate')} sps rectime:{ev.get('rectime_seconds')}s\n"
|
|
f" bw file: {bw.get('filename')} ({bw.get('filesize')} B)\n"
|
|
f" peaks: "
|
|
f"Tran={pv.get('transverse'):.5f} "
|
|
f"Vert={pv.get('vertical'):.5f} "
|
|
f"Long={pv.get('longitudinal'):.5f} "
|
|
f"PVS={pv.get('vector_sum'):.5f} in/s "
|
|
f"Mic={pv.get('mic_psi'):.6e} psi"
|
|
if all(pv.get(k) is not None for k in
|
|
("transverse", "vertical", "longitudinal", "vector_sum", "mic_psi"))
|
|
else f" peaks: {pv}\n project: {pi}"
|
|
) + (
|
|
f"\n project: {pi.get('project')!r} / {pi.get('client')!r} / "
|
|
f"operator={pi.get('operator')!r} loc={pi.get('sensor_location')!r}"
|
|
)
|
|
|
|
|
|
def dump_one(path: Path) -> int:
|
|
side = json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
raw_b64 = (
|
|
side.get("extensions", {})
|
|
.get("raw_records", {})
|
|
.get("waveform_record_b64")
|
|
)
|
|
if not raw_b64:
|
|
print(f"\n=== {path} ===")
|
|
print(" ! no extensions.raw_records.waveform_record_b64 — sidecar")
|
|
print(" pre-dates raw-0C persistence (added in v0.15.x). Re-save")
|
|
print(" the event from the device to capture the bytes.")
|
|
return 1
|
|
|
|
raw = base64.b64decode(raw_b64)
|
|
|
|
# Build anchor map
|
|
anchors: dict[int, str] = {}
|
|
for needle, label in _ANCHORS:
|
|
i = raw.find(needle)
|
|
if i >= 0:
|
|
anchors[i] = label
|
|
|
|
print(f"\n=== {path} ===")
|
|
print("metadata claimed by sidecar:")
|
|
print(_summarize_sidecar(side))
|
|
|
|
print(f"\nraw 0C record ({len(raw)} bytes):")
|
|
print(_hex_dump(raw, anchors))
|
|
|
|
# Float32 BE candidates in geo-relevant ranges
|
|
geo_hits = _scan_float32_be(raw, 1e-5, 50.0)
|
|
# Filter: only show hits that are NOT trivially the per-channel labels'
|
|
# +6 PPV floats already documented (those will land in any sweep too).
|
|
print("\nfloat32 BE candidates (1e-5 .. 50.0):")
|
|
for off, v in geo_hits:
|
|
annotation = ""
|
|
for needle, _ in _ANCHORS[:4]: # geo + mic labels
|
|
i = raw.find(needle)
|
|
if i >= 0 and off == i + 6:
|
|
annotation = f" ← {needle.decode()} PPV (label+6)"
|
|
break
|
|
print(f" {off:#04x} ({off:3d}) {v:>+15.6f}{annotation}")
|
|
|
|
print("\nuint16 BE candidates ZC-Freq-ish (1..200):")
|
|
for off, v in _scan_uint16_be(raw, 1, 200):
|
|
if v < 5: # too noisy at very low end
|
|
continue
|
|
print(f" {off:#04x} ({off:3d}) = {v}")
|
|
|
|
print("\nuint16 BE candidates Time-of-Peak-ish if stored as ms (1..30000):")
|
|
for off, v in _scan_uint16_be(raw, 1, 30000):
|
|
if v < 100: # noise filter
|
|
continue
|
|
# Only the first ~80 are worth showing — too many hits otherwise
|
|
if off > 80:
|
|
break
|
|
print(f" {off:#04x} ({off:3d}) = {v} ms ?")
|
|
|
|
print()
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="Inspect a saved 0C waveform record from a sidecar JSON.",
|
|
)
|
|
p.add_argument(
|
|
"sidecars",
|
|
nargs="+",
|
|
type=Path,
|
|
help="Path(s) to <event>.sfm.json sidecar file(s).",
|
|
)
|
|
args = p.parse_args(argv)
|
|
|
|
rc = 0
|
|
for path in args.sidecars:
|
|
try:
|
|
rc |= dump_one(path)
|
|
except Exception as exc:
|
|
print(f"\n=== {path} ===\n ERROR: {exc}", file=sys.stderr)
|
|
rc |= 2
|
|
return rc
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|