#!/usr/bin/env python3 """ frame_db.py — SQLite frame database for Instantel protocol captures. Schema: captures — one row per ingested capture pair (deduped by SHA256) frames — one row per parsed frame byte_values — one row per (frame, offset, value) for fast indexed queries Usage: db = FrameDB() # opens default DB at ~/.seismo_lab/frames.db db = FrameDB(path) # custom path cap_id = db.ingest(sessions, s3_path, bw_path) rows = db.query_frames(sub=0xF7, direction="S3") rows = db.query_by_byte(offset=85, value=0x0A) """ from __future__ import annotations import hashlib import os import sqlite3 import struct from pathlib import Path from typing import Optional # ───────────────────────────────────────────────────────────────────────────── # DB location # ───────────────────────────────────────────────────────────────────────────── DEFAULT_DB_DIR = Path.home() / ".seismo_lab" DEFAULT_DB_PATH = DEFAULT_DB_DIR / "frames.db" # ───────────────────────────────────────────────────────────────────────────── # Schema # ───────────────────────────────────────────────────────────────────────────── _DDL = """ PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON; CREATE TABLE IF NOT EXISTS captures ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, -- ISO-8601 ingest time s3_path TEXT, bw_path TEXT, capture_hash TEXT NOT NULL UNIQUE, -- SHA256 of s3_blob+bw_blob notes TEXT DEFAULT '' ); CREATE TABLE IF NOT EXISTS frames ( id INTEGER PRIMARY KEY AUTOINCREMENT, capture_id INTEGER NOT NULL REFERENCES captures(id) ON DELETE CASCADE, session_idx INTEGER NOT NULL, direction TEXT NOT NULL, -- 'BW' or 'S3' sub INTEGER, -- NULL if malformed page_key INTEGER, sub_name TEXT, payload BLOB NOT NULL, payload_len INTEGER NOT NULL, checksum_ok INTEGER -- 1/0/NULL ); CREATE INDEX IF NOT EXISTS idx_frames_capture ON frames(capture_id); CREATE INDEX IF NOT EXISTS idx_frames_sub ON frames(sub); CREATE INDEX IF NOT EXISTS idx_frames_page_key ON frames(page_key); CREATE INDEX IF NOT EXISTS idx_frames_dir ON frames(direction); CREATE TABLE IF NOT EXISTS byte_values ( id INTEGER PRIMARY KEY AUTOINCREMENT, frame_id INTEGER NOT NULL REFERENCES frames(id) ON DELETE CASCADE, offset INTEGER NOT NULL, value INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_bv_frame ON byte_values(frame_id); CREATE INDEX IF NOT EXISTS idx_bv_offset ON byte_values(offset); CREATE INDEX IF NOT EXISTS idx_bv_value ON byte_values(value); CREATE INDEX IF NOT EXISTS idx_bv_off_val ON byte_values(offset, value); """ # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _sha256_blobs(s3_blob: bytes, bw_blob: bytes) -> str: h = hashlib.sha256() h.update(s3_blob) h.update(bw_blob) return h.hexdigest() def _interp_bytes(data: bytes, offset: int) -> dict: """ Return multi-interpretation dict for 1–4 bytes starting at offset. Used in the GUI's byte interpretation panel. """ result: dict = {} remaining = len(data) - offset if remaining <= 0: return result b1 = data[offset] result["uint8"] = b1 result["int8"] = b1 if b1 < 128 else b1 - 256 if remaining >= 2: u16be = struct.unpack_from(">H", data, offset)[0] u16le = struct.unpack_from("= 4: f32be = struct.unpack_from(">f", data, offset)[0] f32le = struct.unpack_from("I", data, offset)[0] u32le = struct.unpack_from(" None: if path is None: path = DEFAULT_DB_PATH path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) self.path = path self._con = sqlite3.connect(str(path), check_same_thread=False) self._con.row_factory = sqlite3.Row self._init_schema() def _init_schema(self) -> None: self._con.executescript(_DDL) self._con.commit() def close(self) -> None: self._con.close() # ── Ingest ──────────────────────────────────────────────────────────── def ingest( self, sessions: list, # list[Session] from s3_analyzer s3_path: Optional[Path], bw_path: Optional[Path], notes: str = "", ) -> Optional[int]: """ Ingest a list of sessions into the DB. Returns capture_id, or None if already ingested (duplicate hash). """ import datetime s3_blob = s3_path.read_bytes() if s3_path and s3_path.exists() else b"" bw_blob = bw_path.read_bytes() if bw_path and bw_path.exists() else b"" cap_hash = _sha256_blobs(s3_blob, bw_blob) # Dedup check row = self._con.execute( "SELECT id FROM captures WHERE capture_hash=?", (cap_hash,) ).fetchone() if row: return None # already in DB ts = datetime.datetime.now().isoformat(timespec="seconds") cur = self._con.execute( "INSERT INTO captures (timestamp, s3_path, bw_path, capture_hash, notes) " "VALUES (?, ?, ?, ?, ?)", (ts, str(s3_path) if s3_path else None, str(bw_path) if bw_path else None, cap_hash, notes) ) cap_id = cur.lastrowid for sess in sessions: for af in sess.all_frames: frame_id = self._insert_frame(cap_id, af) self._insert_byte_values(frame_id, af.frame.payload) self._con.commit() return cap_id def _insert_frame(self, cap_id: int, af) -> int: """Insert one AnnotatedFrame; return its rowid.""" sub = af.header.sub if af.header else None page_key = af.header.page_key if af.header else None chk_ok = None if af.frame.checksum_valid is True: chk_ok = 1 elif af.frame.checksum_valid is False: chk_ok = 0 cur = self._con.execute( "INSERT INTO frames " "(capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (cap_id, af.session_idx, af.source, sub, page_key, af.sub_name, af.frame.payload, len(af.frame.payload), chk_ok) ) return cur.lastrowid def _insert_byte_values(self, frame_id: int, payload: bytes) -> None: """Insert one row per byte in payload into byte_values.""" rows = [(frame_id, i, b) for i, b in enumerate(payload)] self._con.executemany( "INSERT INTO byte_values (frame_id, offset, value) VALUES (?, ?, ?)", rows ) # ── Queries ─────────────────────────────────────────────────────────── def list_captures(self) -> list[sqlite3.Row]: return self._con.execute( "SELECT id, timestamp, s3_path, bw_path, notes, " " (SELECT COUNT(*) FROM frames WHERE capture_id=captures.id) AS frame_count " "FROM captures ORDER BY id DESC" ).fetchall() def query_frames( self, capture_id: Optional[int] = None, direction: Optional[str] = None, # "BW" or "S3" sub: Optional[int] = None, page_key: Optional[int] = None, limit: int = 500, ) -> list[sqlite3.Row]: """ Query frames table with optional filters. Returns rows with: id, capture_id, session_idx, direction, sub, page_key, sub_name, payload, payload_len, checksum_ok """ clauses = [] params = [] if capture_id is not None: clauses.append("capture_id=?"); params.append(capture_id) if direction is not None: clauses.append("direction=?"); params.append(direction) if sub is not None: clauses.append("sub=?"); params.append(sub) if page_key is not None: clauses.append("page_key=?"); params.append(page_key) where = ("WHERE " + " AND ".join(clauses)) if clauses else "" sql = f"SELECT * FROM frames {where} ORDER BY id LIMIT ?" params.append(limit) return self._con.execute(sql, params).fetchall() def query_by_byte( self, offset: int, value: Optional[int] = None, capture_id: Optional[int] = None, direction: Optional[str] = None, sub: Optional[int] = None, limit: int = 500, ) -> list[sqlite3.Row]: """ Return frames that have a specific byte at a specific offset. Joins byte_values -> frames for indexed lookup. """ clauses = ["bv.offset=?"] params = [offset] if value is not None: clauses.append("bv.value=?"); params.append(value) if capture_id is not None: clauses.append("f.capture_id=?"); params.append(capture_id) if direction is not None: clauses.append("f.direction=?"); params.append(direction) if sub is not None: clauses.append("f.sub=?"); params.append(sub) where = "WHERE " + " AND ".join(clauses) sql = ( f"SELECT f.*, bv.offset AS q_offset, bv.value AS q_value " f"FROM byte_values bv " f"JOIN frames f ON f.id=bv.frame_id " f"{where} " f"ORDER BY f.id LIMIT ?" ) params.append(limit) return self._con.execute(sql, params).fetchall() def get_frame_payload(self, frame_id: int) -> Optional[bytes]: row = self._con.execute( "SELECT payload FROM frames WHERE id=?", (frame_id,) ).fetchone() return bytes(row["payload"]) if row else None def get_distinct_subs(self, capture_id: Optional[int] = None) -> list[int]: if capture_id is not None: rows = self._con.execute( "SELECT DISTINCT sub FROM frames WHERE capture_id=? AND sub IS NOT NULL ORDER BY sub", (capture_id,) ).fetchall() else: rows = self._con.execute( "SELECT DISTINCT sub FROM frames WHERE sub IS NOT NULL ORDER BY sub" ).fetchall() return [r[0] for r in rows] def get_distinct_offsets(self, capture_id: Optional[int] = None) -> list[int]: if capture_id is not None: rows = self._con.execute( "SELECT DISTINCT bv.offset FROM byte_values bv " "JOIN frames f ON f.id=bv.frame_id WHERE f.capture_id=? ORDER BY bv.offset", (capture_id,) ).fetchall() else: rows = self._con.execute( "SELECT DISTINCT offset FROM byte_values ORDER BY offset" ).fetchall() return [r[0] for r in rows] def interpret_offset(self, payload: bytes, offset: int) -> dict: """Return multi-format interpretation of bytes starting at offset.""" return _interp_bytes(payload, offset) def get_stats(self) -> dict: captures = self._con.execute("SELECT COUNT(*) FROM captures").fetchone()[0] frames = self._con.execute("SELECT COUNT(*) FROM frames").fetchone()[0] bv_rows = self._con.execute("SELECT COUNT(*) FROM byte_values").fetchone()[0] return {"captures": captures, "frames": frames, "byte_value_rows": bv_rows}