backfill: overlay bw_report onto Event before DB upsert
Mirror what the ingest path does: BW's reported peaks (and sample_rate
/ record_time) take precedence over codec output where present.
Without this, --force backfill silently overwrites bw_report-overlaid
DB columns with codec-derived peaks. Wrong for events where the codec
doesn't fully decode (waveform walker edge cases on SP0/SS0/SV0-style
events, histogram byte[5]!=0 sub-format that isn't yet RE'd), producing
PVS=0 on real high-amplitude events. Bit on prod 2026-05-22 with
three top-10 waveform events ending up at PVS=0 (rolled back same day,
this fix is the proper resolution).
New helper minimateplus.event_file_io.apply_bw_report_dict_to_event
operates on the projected sidecar dict shape (the structure
_bw_report_to_dict produces, which is what gets preserved in the
sidecar). Mirrors apply_report_to_event's semantics: only writes
fields where bw_report has a non-None value, no-ops cleanly on
empty / None input.
Dev validation against prod snapshot:
pre : 1839.7315 pvs_sum 356 events with DB PVS ≠ sidecar bw_report
post : 2016.4902 pvs_sum 2 events still mismatched (both have NULL
timestamp + duplicate rows, edge case)
Both edge-case events DO get the correct value written by the new
backfill — their stale rows from prior backfills remain because
UNIQUE(serial, timestamp) doesn't fire on NULL. Separate dedup
cleanup needed for those 2 events (0.014% of corpus); not blocking.
Backfill remains idempotent + bw_report preservation still passes
(0 WIPED, 0 CHANGED on the 3rd consecutive run).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -254,6 +254,60 @@ def apply_report_to_event(event: Event, report: BwAsciiReport) -> None:
|
|||||||
event.rectime_seconds = report.record_time_s
|
event.rectime_seconds = report.record_time_s
|
||||||
|
|
||||||
|
|
||||||
|
def apply_bw_report_dict_to_event(event: Event, bw_report: dict) -> None:
|
||||||
|
"""Mirror of ``apply_report_to_event`` for the projected sidecar
|
||||||
|
dict shape (as produced by ``_bw_report_to_dict``).
|
||||||
|
|
||||||
|
Why this exists
|
||||||
|
───────────────
|
||||||
|
The ingest path holds a live ``BwAsciiReport`` parsed straight from
|
||||||
|
the ``_ASCII.TXT`` and uses ``apply_report_to_event`` to overlay
|
||||||
|
device-authoritative peaks onto the codec output before insert.
|
||||||
|
|
||||||
|
The backfill path doesn't have the original ``.TXT`` (it's not
|
||||||
|
retained in the waveform store), but it does have the preserved
|
||||||
|
``bw_report`` block from the sidecar — which contains the same
|
||||||
|
projected fields. Re-overlaying those during a backfill keeps the
|
||||||
|
DB peak columns aligned with what BW reports rather than letting
|
||||||
|
the codec output (which may be incomplete for unhandled formats or
|
||||||
|
walker edge cases) win by default.
|
||||||
|
|
||||||
|
No-ops cleanly when ``bw_report`` is ``None``, empty, or missing
|
||||||
|
any particular sub-field — only fields with a concrete value get
|
||||||
|
written. Mirrors ``apply_report_to_event``'s "report wins where
|
||||||
|
present" semantics.
|
||||||
|
"""
|
||||||
|
if not bw_report:
|
||||||
|
return
|
||||||
|
if event.peak_values is None:
|
||||||
|
event.peak_values = PeakValues()
|
||||||
|
pv = event.peak_values
|
||||||
|
|
||||||
|
peaks = bw_report.get("peaks") or {}
|
||||||
|
tran = (peaks.get("tran") or {}).get("ppv_ips")
|
||||||
|
vert = (peaks.get("vert") or {}).get("ppv_ips")
|
||||||
|
long = (peaks.get("long") or {}).get("ppv_ips")
|
||||||
|
if tran is not None: pv.tran = tran
|
||||||
|
if vert is not None: pv.vert = vert
|
||||||
|
if long is not None: pv.long = long
|
||||||
|
vs_ips = (peaks.get("vector_sum") or {}).get("ips")
|
||||||
|
if vs_ips is not None:
|
||||||
|
pv.peak_vector_sum = vs_ips
|
||||||
|
|
||||||
|
mic = bw_report.get("mic") or {}
|
||||||
|
pspl = mic.get("pspl_dbl")
|
||||||
|
if pspl is not None and pspl > 0:
|
||||||
|
pv.micl = _dbl_to_psi(pspl)
|
||||||
|
|
||||||
|
rec = bw_report.get("recording") or {}
|
||||||
|
sr = rec.get("sample_rate_sps")
|
||||||
|
if sr:
|
||||||
|
event.sample_rate = sr
|
||||||
|
rt = rec.get("record_time_s")
|
||||||
|
if rt is not None:
|
||||||
|
event.rectime_seconds = rt
|
||||||
|
|
||||||
|
|
||||||
def _project_info_to_dict(pi: Optional[ProjectInfo]) -> dict:
|
def _project_info_to_dict(pi: Optional[ProjectInfo]) -> dict:
|
||||||
if pi is None:
|
if pi is None:
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -309,6 +309,23 @@ def main(argv=None) -> int:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Overlay BW ASCII report fields onto the rebuilt Event
|
||||||
|
# BEFORE the sidecar + DB write. Mirrors what the ingest
|
||||||
|
# path does — BW's reported peaks (and sample_rate /
|
||||||
|
# record_time) win over codec output where present.
|
||||||
|
#
|
||||||
|
# Without this step, --force backfill silently overwrites
|
||||||
|
# the bw_report-overlaid DB columns with codec-derived
|
||||||
|
# values, which is wrong for events the codec doesn't
|
||||||
|
# fully decode (e.g. waveform walker edge cases on
|
||||||
|
# SP0/SS0/SV0-style events, or histogram sub-formats with
|
||||||
|
# byte[5]!=0 that aren't yet RE'd). Net effect was PVS=0
|
||||||
|
# on three top-10 events on 2026-05-22.
|
||||||
|
if preserved_bw_report:
|
||||||
|
event_file_io.apply_bw_report_dict_to_event(
|
||||||
|
ev, preserved_bw_report,
|
||||||
|
)
|
||||||
|
|
||||||
sidecar = event_file_io.event_to_sidecar_dict(
|
sidecar = event_file_io.event_to_sidecar_dict(
|
||||||
ev,
|
ev,
|
||||||
serial=serial,
|
serial=serial,
|
||||||
|
|||||||
@@ -529,6 +529,77 @@ def test_save_imported_bw_round_trip(tmp_path: Path):
|
|||||||
assert stored_path.read_bytes() == src.read_bytes()
|
assert stored_path.read_bytes() == src.read_bytes()
|
||||||
|
|
||||||
|
|
||||||
|
# ── apply_bw_report_dict_to_event ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_bw_report_dict_overlays_peaks_and_recording():
|
||||||
|
"""Verbatim mirror of the data shape produced by `_bw_report_to_dict`
|
||||||
|
when projecting a parsed `BwAsciiReport` into the sidecar. Confirms
|
||||||
|
each field overlays onto Event correctly so the backfill path
|
||||||
|
matches ingest behavior."""
|
||||||
|
from minimateplus.models import PeakValues
|
||||||
|
ev = Event(index=0)
|
||||||
|
bw_report = {
|
||||||
|
"peaks": {
|
||||||
|
"tran": {"ppv_ips": 9.84375},
|
||||||
|
"vert": {"ppv_ips": 0.305},
|
||||||
|
"long": {"ppv_ips": 0.405},
|
||||||
|
"vector_sum": {"ips": 14.86736},
|
||||||
|
},
|
||||||
|
"mic": {"pspl_dbl": 115.9},
|
||||||
|
"recording": {"sample_rate_sps": 1024, "record_time_s": 3.0},
|
||||||
|
}
|
||||||
|
event_file_io.apply_bw_report_dict_to_event(ev, bw_report)
|
||||||
|
assert ev.peak_values is not None
|
||||||
|
assert ev.peak_values.tran == 9.84375
|
||||||
|
assert ev.peak_values.vert == 0.305
|
||||||
|
assert ev.peak_values.long == 0.405
|
||||||
|
assert ev.peak_values.peak_vector_sum == 14.86736
|
||||||
|
# MicL is converted dB → psi via _dbl_to_psi — just confirm non-zero
|
||||||
|
assert ev.peak_values.micl is not None and ev.peak_values.micl > 0
|
||||||
|
assert ev.sample_rate == 1024
|
||||||
|
assert ev.rectime_seconds == 3.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_bw_report_dict_overwrites_codec_peaks():
|
||||||
|
"""The whole point of this helper: bw_report wins over whatever the
|
||||||
|
codec produced. This is what the 2026-05-22 prod backfill missed —
|
||||||
|
DB peaks got overwritten with codec output (incl. PVS=0 on the
|
||||||
|
three top events) when they should have stayed bw_report-overlaid."""
|
||||||
|
from minimateplus.models import PeakValues
|
||||||
|
ev = Event(index=0)
|
||||||
|
# Simulate codec output that's clearly wrong (incomplete decode):
|
||||||
|
ev.peak_values = PeakValues(
|
||||||
|
tran=2.09, vert=0.0, long=0.0, peak_vector_sum=0.0,
|
||||||
|
)
|
||||||
|
bw_report = {
|
||||||
|
"peaks": {
|
||||||
|
"tran": {"ppv_ips": 9.84},
|
||||||
|
"vert": {"ppv_ips": 4.95},
|
||||||
|
"long": {"ppv_ips": 8.05},
|
||||||
|
"vector_sum": {"ips": 14.95},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
event_file_io.apply_bw_report_dict_to_event(ev, bw_report)
|
||||||
|
assert ev.peak_values.tran == 9.84
|
||||||
|
assert ev.peak_values.vert == 4.95
|
||||||
|
assert ev.peak_values.long == 8.05
|
||||||
|
assert ev.peak_values.peak_vector_sum == 14.95
|
||||||
|
|
||||||
|
|
||||||
|
def test_apply_bw_report_dict_no_op_on_empty():
|
||||||
|
"""None / empty dict / missing keys should leave Event untouched."""
|
||||||
|
from minimateplus.models import PeakValues
|
||||||
|
for empty in (None, {}, {"peaks": {}}, {"peaks": {"tran": {}}}):
|
||||||
|
ev = Event(index=0)
|
||||||
|
ev.peak_values = PeakValues(tran=1.0, vert=2.0, long=3.0)
|
||||||
|
event_file_io.apply_bw_report_dict_to_event(ev, empty)
|
||||||
|
# Unchanged
|
||||||
|
assert ev.peak_values.tran == 1.0
|
||||||
|
assert ev.peak_values.vert == 2.0
|
||||||
|
assert ev.peak_values.long == 3.0
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if pytest is not None:
|
if pytest is not None:
|
||||||
pytest.main([__file__, "-v"])
|
pytest.main([__file__, "-v"])
|
||||||
|
|||||||
Reference in New Issue
Block a user