Files
seismo-relay/parsers/frame_db.py
serversdwn 154a11d057 Add s3_analyzer.py for live protocol analysis of Instantel MiniMate Plus RS-232
- Implement functionality to read and parse raw_s3.bin and raw_bw.bin files.
- Define protocol constants and mappings for various command and response identifiers.
- Create data structures for frames, sessions, and diffs to facilitate analysis.
- Develop functions for annotating frames, splitting sessions, and generating reports.
- Include live mode for continuous monitoring and reporting of protocol frames.
- Add command-line interface for user interaction and configuration.
2026-03-10 05:00:55 -04:00

338 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
frame_db.py — SQLite frame database for Instantel protocol captures.
Schema:
captures — one row per ingested capture pair (deduped by SHA256)
frames — one row per parsed frame
byte_values — one row per (frame, offset, value) for fast indexed queries
Usage:
db = FrameDB() # opens default DB at ~/.seismo_lab/frames.db
db = FrameDB(path) # custom path
cap_id = db.ingest(sessions, s3_path, bw_path)
rows = db.query_frames(sub=0xF7, direction="S3")
rows = db.query_by_byte(offset=85, value=0x0A)
"""
from __future__ import annotations
import hashlib
import os
import sqlite3
import struct
from pathlib import Path
from typing import Optional
# ─────────────────────────────────────────────────────────────────────────────
# DB location
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_DB_DIR = Path.home() / ".seismo_lab"
DEFAULT_DB_PATH = DEFAULT_DB_DIR / "frames.db"
# ─────────────────────────────────────────────────────────────────────────────
# Schema
# ─────────────────────────────────────────────────────────────────────────────
_DDL = """
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS captures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- ISO-8601 ingest time
s3_path TEXT,
bw_path TEXT,
capture_hash TEXT NOT NULL UNIQUE, -- SHA256 of s3_blob+bw_blob
notes TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS frames (
id INTEGER PRIMARY KEY AUTOINCREMENT,
capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE,
session_idx INTEGER NOT NULL,
direction TEXT NOT NULL, -- 'BW' or 'S3'
sub INTEGER, -- NULL if malformed
page_key INTEGER,
sub_name TEXT,
payload BLOB NOT NULL,
payload_len INTEGER NOT NULL,
checksum_ok INTEGER -- 1/0/NULL
);
CREATE INDEX IF NOT EXISTS idx_frames_capture ON frames(capture_id);
CREATE INDEX IF NOT EXISTS idx_frames_sub ON frames(sub);
CREATE INDEX IF NOT EXISTS idx_frames_page_key ON frames(page_key);
CREATE INDEX IF NOT EXISTS idx_frames_dir ON frames(direction);
CREATE TABLE IF NOT EXISTS byte_values (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER NOT NULL REFERENCES frames(id) ON DELETE CASCADE,
offset INTEGER NOT NULL,
value INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bv_frame ON byte_values(frame_id);
CREATE INDEX IF NOT EXISTS idx_bv_offset ON byte_values(offset);
CREATE INDEX IF NOT EXISTS idx_bv_value ON byte_values(value);
CREATE INDEX IF NOT EXISTS idx_bv_off_val ON byte_values(offset, value);
"""
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _sha256_blobs(s3_blob: bytes, bw_blob: bytes) -> str:
h = hashlib.sha256()
h.update(s3_blob)
h.update(bw_blob)
return h.hexdigest()
def _interp_bytes(data: bytes, offset: int) -> dict:
"""
Return multi-interpretation dict for 14 bytes starting at offset.
Used in the GUI's byte interpretation panel.
"""
result: dict = {}
remaining = len(data) - offset
if remaining <= 0:
return result
b1 = data[offset]
result["uint8"] = b1
result["int8"] = b1 if b1 < 128 else b1 - 256
if remaining >= 2:
u16be = struct.unpack_from(">H", data, offset)[0]
u16le = struct.unpack_from("<H", data, offset)[0]
result["uint16_be"] = u16be
result["uint16_le"] = u16le
if remaining >= 4:
f32be = struct.unpack_from(">f", data, offset)[0]
f32le = struct.unpack_from("<f", data, offset)[0]
u32be = struct.unpack_from(">I", data, offset)[0]
u32le = struct.unpack_from("<I", data, offset)[0]
result["float32_be"] = round(f32be, 6)
result["float32_le"] = round(f32le, 6)
result["uint32_be"] = u32be
result["uint32_le"] = u32le
return result
# ─────────────────────────────────────────────────────────────────────────────
# FrameDB class
# ─────────────────────────────────────────────────────────────────────────────
class FrameDB:
def __init__(self, path: Optional[Path] = None) -> None:
if path is None:
path = DEFAULT_DB_PATH
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._con = sqlite3.connect(str(path), check_same_thread=False)
self._con.row_factory = sqlite3.Row
self._init_schema()
def _init_schema(self) -> None:
self._con.executescript(_DDL)
self._con.commit()
def close(self) -> None:
self._con.close()
# ── Ingest ────────────────────────────────────────────────────────────
def ingest(
self,
sessions: list, # list[Session] from s3_analyzer
s3_path: Optional[Path],
bw_path: Optional[Path],
notes: str = "",
) -> Optional[int]:
"""
Ingest a list of sessions into the DB.
Returns capture_id, or None if already ingested (duplicate hash).
"""
import datetime
s3_blob = s3_path.read_bytes() if s3_path and s3_path.exists() else b""
bw_blob = bw_path.read_bytes() if bw_path and bw_path.exists() else b""
cap_hash = _sha256_blobs(s3_blob, bw_blob)
# Dedup check
row = self._con.execute(
"SELECT id FROM captures WHERE capture_hash=?", (cap_hash,)
).fetchone()
if row:
return None # already in DB
ts = datetime.datetime.now().isoformat(timespec="seconds")
cur = self._con.execute(
"INSERT INTO captures (timestamp, s3_path, bw_path, capture_hash, notes) "
"VALUES (?, ?, ?, ?, ?)",
(ts, str(s3_path) if s3_path else None,
str(bw_path) if bw_path else None,
cap_hash, notes)
)
cap_id = cur.lastrowid
for sess in sessions:
for af in sess.all_frames:
frame_id = self._insert_frame(cap_id, af)
self._insert_byte_values(frame_id, af.frame.payload)
self._con.commit()
return cap_id
def _insert_frame(self, cap_id: int, af) -> int:
"""Insert one AnnotatedFrame; return its rowid."""
sub = af.header.sub if af.header else None
page_key = af.header.page_key if af.header else None
chk_ok = None
if af.frame.checksum_valid is True:
chk_ok = 1
elif af.frame.checksum_valid is False:
chk_ok = 0
cur = self._con.execute(
"INSERT INTO frames "
"(capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(cap_id, af.session_idx, af.source,
sub, page_key, af.sub_name,
af.frame.payload, len(af.frame.payload), chk_ok)
)
return cur.lastrowid
def _insert_byte_values(self, frame_id: int, payload: bytes) -> None:
"""Insert one row per byte in payload into byte_values."""
rows = [(frame_id, i, b) for i, b in enumerate(payload)]
self._con.executemany(
"INSERT INTO byte_values (frame_id, offset, value) VALUES (?, ?, ?)",
rows
)
# ── Queries ───────────────────────────────────────────────────────────
def list_captures(self) -> list[sqlite3.Row]:
return self._con.execute(
"SELECT id, timestamp, s3_path, bw_path, notes, "
" (SELECT COUNT(*) FROM frames WHERE capture_id=captures.id) AS frame_count "
"FROM captures ORDER BY id DESC"
).fetchall()
def query_frames(
self,
capture_id: Optional[int] = None,
direction: Optional[str] = None, # "BW" or "S3"
sub: Optional[int] = None,
page_key: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Query frames table with optional filters.
Returns rows with: id, capture_id, session_idx, direction, sub, page_key,
sub_name, payload, payload_len, checksum_ok
"""
clauses = []
params = []
if capture_id is not None:
clauses.append("capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("direction=?"); params.append(direction)
if sub is not None:
clauses.append("sub=?"); params.append(sub)
if page_key is not None:
clauses.append("page_key=?"); params.append(page_key)
where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
sql = f"SELECT * FROM frames {where} ORDER BY id LIMIT ?"
params.append(limit)
return self._con.execute(sql, params).fetchall()
def query_by_byte(
self,
offset: int,
value: Optional[int] = None,
capture_id: Optional[int] = None,
direction: Optional[str] = None,
sub: Optional[int] = None,
limit: int = 500,
) -> list[sqlite3.Row]:
"""
Return frames that have a specific byte at a specific offset.
Joins byte_values -> frames for indexed lookup.
"""
clauses = ["bv.offset=?"]
params = [offset]
if value is not None:
clauses.append("bv.value=?"); params.append(value)
if capture_id is not None:
clauses.append("f.capture_id=?"); params.append(capture_id)
if direction is not None:
clauses.append("f.direction=?"); params.append(direction)
if sub is not None:
clauses.append("f.sub=?"); params.append(sub)
where = "WHERE " + " AND ".join(clauses)
sql = (
f"SELECT f.*, bv.offset AS q_offset, bv.value AS q_value "
f"FROM byte_values bv "
f"JOIN frames f ON f.id=bv.frame_id "
f"{where} "
f"ORDER BY f.id LIMIT ?"
)
params.append(limit)
return self._con.execute(sql, params).fetchall()
def get_frame_payload(self, frame_id: int) -> Optional[bytes]:
row = self._con.execute(
"SELECT payload FROM frames WHERE id=?", (frame_id,)
).fetchone()
return bytes(row["payload"]) if row else None
def get_distinct_subs(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE capture_id=? AND sub IS NOT NULL ORDER BY sub",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT sub FROM frames WHERE sub IS NOT NULL ORDER BY sub"
).fetchall()
return [r[0] for r in rows]
def get_distinct_offsets(self, capture_id: Optional[int] = None) -> list[int]:
if capture_id is not None:
rows = self._con.execute(
"SELECT DISTINCT bv.offset FROM byte_values bv "
"JOIN frames f ON f.id=bv.frame_id WHERE f.capture_id=? ORDER BY bv.offset",
(capture_id,)
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT offset FROM byte_values ORDER BY offset"
).fetchall()
return [r[0] for r in rows]
def interpret_offset(self, payload: bytes, offset: int) -> dict:
"""Return multi-format interpretation of bytes starting at offset."""
return _interp_bytes(payload, offset)
def get_stats(self) -> dict:
captures = self._con.execute("SELECT COUNT(*) FROM captures").fetchone()[0]
frames = self._con.execute("SELECT COUNT(*) FROM frames").fetchone()[0]
bv_rows = self._con.execute("SELECT COUNT(*) FROM byte_values").fetchone()[0]
return {"captures": captures, "frames": frames, "byte_value_rows": bv_rows}