Files
thor-watcher/event_forwarder.py
T

824 lines
29 KiB
Python

"""
event_forwarder.py — forward Thor (Micromate Series IV) IDFH/IDFW event
files to a seismo-relay SFM server.
Walks the same `THORDATA_PATH/<Project>/<Unit>/` tree the heartbeat path
scans. For each event binary that hasn't been forwarded yet, pairs it
with its `<unit>/TXT/<basename>.txt` ASCII report (when available) and
POSTs both to seismo-relay's `/db/import/idf_file` endpoint as one
multipart request.
This is a port of `series3-watcher/event_forwarder.py` adapted for the
Thor file layout. Key differences from the series3 forwarder:
- **Filenames are literal `<SERIAL>_<YYYYMMDDHHMMSS>.IDFH|.IDFW`** —
no base36 stem; the serial is right there in the prefix.
- **TXT sidecars live in a `TXT/` subfolder** next to the binaries
(Thor's exporter writes them there, not alongside the binary).
- **IDFH and IDFW are forwarded as independent events.** Each has
its own sha256-keyed state entry and its own POST. A single
timestamp can produce both a histogram (IDFH) and a waveform (IDFW),
and the seismo-relay endpoint dedupes on (serial, timestamp, kind),
so treating them as separate rows is the right model.
Design notes
────────────
- **stdlib only.** Matches the rest of the watcher (`urllib.request`).
Multipart encoding is hand-rolled.
- **Idempotent across restarts.** Forwarded files are tracked by
sha256 in a JSON state file (default: `<log_dir>/thor_forwarded.json`).
Re-scanning the watch tree doesn't re-POST anything.
- **Default-off.** Callers must enable via config
(`sfm_forward_enabled=true` + `sfm_url=...`). Existing 0.2.x
deployments that auto-update stay non-forwarding until an operator
flips the switch.
- **Quiescence guard.** Files modified within the last few seconds
are skipped — Thor writes the .txt after the binary, so we wait
until both look stable before forwarding.
- **Best-effort report pairing.** When the .txt hasn't appeared yet
but the binary is older than `missing_report_grace_seconds`, the
binary is forwarded alone (seismo-relay accepts that and just skips
the rich fields — we'd rather get the binary indexed than block
forever waiting for a TXT that may never arrive, e.g. operator
hasn't enabled the TXT export in Thor).
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
log = logging.getLogger(__name__)
# Default tuning. All overridable via config.json sfm_* keys.
DEFAULT_QUIESCENCE_SECONDS = 5
DEFAULT_MISSING_REPORT_GRACE_SECONDS = 60
DEFAULT_HTTP_TIMEOUT = 60.0
STATE_SCHEMA_VERSION = 1
# ── Filename matching ─────────────────────────────────────────────────────────
#
# Thor (Micromate Series IV) filename scheme:
# <SERIAL>_<YYYYMMDDHHMMSS>.<KIND>
# where SERIAL is the literal device serial (e.g. UM11719, BE9439),
# and KIND is IDFH (histogram) or IDFW (waveform).
#
# Examples:
# UM11719_20231219163444.IDFH
# UM11719_20231219162723.IDFW
# BE9439_20200713124251.IDFH
_EVENT_FILENAME_RE = re.compile(
r"^([A-Z]{2}\d+)_(\d{14})\.(IDFH|IDFW)$",
re.IGNORECASE,
)
# Filenames we explicitly skip even if they look event-shaped.
_NON_EVENT_EXTS = {
".mlg", # monitor-log files (separate heartbeat path)
".txt", # ASCII reports — paired in, not primary
".csv", # operator-facing derivative
".html", # operator-facing derivative
".pdf", # operator-facing derivative
".xml", # operator-facing derivative
".cdb", # IDFW.CDB cache-database variant — skip
".log",
".ini",
".json",
".sfm.json",
".bak",
".tmp",
}
def is_event_binary(path: str) -> bool:
"""Return True if `path`'s basename looks like a Thor event binary."""
name = os.path.basename(path)
lname = name.lower()
# Explicit reject for compound extensions like .IDFW.CDB
if lname.endswith(".idfw.cdb") or lname.endswith(".idfh.cdb"):
return False
if not _EVENT_FILENAME_RE.match(name):
return False
ext = os.path.splitext(name)[1].lower()
if ext in _NON_EVENT_EXTS:
return False
return True
def parse_event_filename(name: str) -> Optional[Tuple[str, datetime, str]]:
"""Parse `<SERIAL>_<YYYYMMDDHHMMSS>.<KIND>` -> (serial, timestamp, kind).
`kind` is the upper-case extension without the dot — "IDFH" or "IDFW".
Returns None if the filename doesn't match.
"""
m = _EVENT_FILENAME_RE.match(name)
if not m:
return None
serial = m.group(1).upper()
try:
ts = datetime.strptime(m.group(2), "%Y%m%d%H%M%S")
except ValueError:
return None
kind = m.group(3).upper()
return serial, ts, kind
def idf_report_name(binary_name: str) -> str:
"""Thor TXT-export convention: append `.txt` to the binary basename.
UM11719_20231219163444.IDFH → UM11719_20231219163444.IDFH.txt
"""
return binary_name + ".txt"
def idf_report_path(binary_path: str) -> str:
"""Compute the expected TXT sidecar path for a Thor event binary.
Thor's TXT exporter writes sidecars into a `TXT/` subfolder of the
unit directory (verified against captured example-data). The
returned path is the *expected* location — caller is responsible
for checking that it actually exists.
"""
unit_dir = os.path.dirname(binary_path)
name = os.path.basename(binary_path)
return os.path.join(unit_dir, "TXT", idf_report_name(name))
def is_histogram_event(filename: str) -> bool:
"""True if this is an .IDFH (histogram) event. Used purely for
log clarity — Thor doesn't always export a TXT for histograms, so
a missing-report warning is suppressed for them."""
name = os.path.basename(filename)
return name.lower().endswith(".idfh")
def serial_from_filename(name: str) -> Optional[str]:
"""Extract the serial-number prefix from a Thor event filename."""
parsed = parse_event_filename(name)
if parsed is None:
return None
return parsed[0]
# ── State file ────────────────────────────────────────────────────────────────
class ForwardState:
"""Idempotency record: which event files have we already forwarded?
State file format (JSON):
{
"version": 1,
"forwarded": {
"<sha256>": {
"filename": "UM11719_20231219163444.IDFW",
"size": 8800,
"forwarded_at": "2026-05-19T...Z",
"had_report": true
},
...
}
}
Keyed by sha256 (not filename) so identical content is recognised
as already-forwarded even if the file moved or got renamed.
"""
def __init__(self, path: str):
self.path = path
self._data: Dict[str, Any] = {"version": STATE_SCHEMA_VERSION, "forwarded": {}}
self._load()
def _load(self) -> None:
try:
with open(self.path, "r", encoding="utf-8") as f:
d = json.load(f)
if not isinstance(d, dict):
raise ValueError("state file root is not an object")
if d.get("version") != STATE_SCHEMA_VERSION:
log.warning(
"forward state version mismatch (got %r, want %d) — starting fresh",
d.get("version"), STATE_SCHEMA_VERSION,
)
return
forwarded = d.get("forwarded")
if isinstance(forwarded, dict):
self._data["forwarded"] = forwarded
except FileNotFoundError:
pass
except (OSError, ValueError, json.JSONDecodeError) as exc:
log.warning("failed to load forward state from %s: %s", self.path, exc)
def _save(self) -> None:
tmp = self.path + ".tmp"
try:
d = os.path.dirname(self.path)
if d and not os.path.isdir(d):
os.makedirs(d, exist_ok=True)
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except OSError as exc:
log.warning("failed to save forward state to %s: %s", self.path, exc)
def is_forwarded(self, sha256: str) -> bool:
return sha256 in self._data["forwarded"]
def status(self, sha256: str) -> Optional[bool]:
"""Return forwarding status for *sha256*.
Returns:
None — never forwarded. Eligible for a fresh forward.
True — forwarded successfully with its paired report.
NOT a candidate for re-forward.
False — forwarded WITHOUT its paired `.txt`. Eligible for
re-forward IF the TXT now exists, so seismo-relay's
upsert refreshes the DB row with authoritative
device-side values.
Legacy entries without a `had_report` key default to True so
an upgrade doesn't unexpectedly re-forward every entry.
"""
entry = self._data["forwarded"].get(sha256)
if entry is None:
return None
return bool(entry.get("had_report", True))
def mark_forwarded(
self,
sha256: str,
filename: str,
size: int,
had_report: bool = True,
) -> None:
"""Record a successful forward.
Set `had_report=False` when the forward shipped the binary
without its paired ASCII report. Such entries are re-checked
on subsequent scans and re-forwarded once the TXT appears.
Idempotent: re-marking an existing sha256 with `had_report=True`
is the explicit promotion path used when a re-pair succeeds.
"""
self._data["forwarded"][sha256] = {
"filename": filename,
"size": size,
"forwarded_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"had_report": had_report,
}
self._save()
def count(self) -> int:
return len(self._data["forwarded"])
# ── Helpers ───────────────────────────────────────────────────────────────────
def sha256_of_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _is_quiescent(path: str, now_ts: float, quiescence_seconds: float) -> bool:
"""Return True if the file's mtime is at least `quiescence_seconds`
in the past — i.e. no longer being written."""
try:
mtime = os.path.getmtime(path)
except OSError:
return False
return (now_ts - mtime) >= quiescence_seconds
def _iter_unit_dirs(thordata_root: str):
"""Yield (project_name, unit_name, unit_path) for every Thor unit
folder beneath `thordata_root`.
THORDATA layout: <root>/<Project>/<UnitSerial>/
"""
if not os.path.isdir(thordata_root):
return
try:
projects = os.listdir(thordata_root)
except OSError:
return
for proj in projects:
proj_path = os.path.join(thordata_root, proj)
if not os.path.isdir(proj_path):
continue
try:
units = os.listdir(proj_path)
except OSError:
continue
for unit in units:
unit_path = os.path.join(proj_path, unit)
if not os.path.isdir(unit_path):
continue
yield proj, unit, unit_path
def _find_txt_in_unit(unit_path: str, binary_name: str,
now_ts: float, quiescence_seconds: float,
_txt_cache: Dict[str, Dict[str, str]]) -> Optional[str]:
"""Look up the matching `.txt` sidecar for `binary_name` inside
`<unit_path>/TXT/`. Case-insensitive on Windows-shaped paths.
Returns the absolute path if a quiescent sidecar exists, else None.
`_txt_cache` is mutated to memoize the lower-case → actual-name
map for each unit so repeated lookups in one scan don't restat.
"""
expected = idf_report_name(binary_name)
if unit_path not in _txt_cache:
txt_dir = os.path.join(unit_path, "TXT")
listing: Dict[str, str] = {}
if os.path.isdir(txt_dir):
try:
for n in os.listdir(txt_dir):
listing[n.lower()] = n
except OSError:
pass
_txt_cache[unit_path] = listing
listing = _txt_cache[unit_path]
actual = listing.get(expected.lower())
if not actual:
return None
candidate = os.path.join(unit_path, "TXT", actual)
if _is_quiescent(candidate, now_ts, quiescence_seconds):
return candidate
return None
# ── Scan pass ─────────────────────────────────────────────────────────────────
def find_pending_events(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
max_per_pass: int = 0,
) -> List[Tuple[str, Optional[str]]]:
"""
Walk `thordata_root` and return the list of (binary_path, txt_path_or_None)
pairs that need forwarding.
Filtering rules:
- Filename must match the Thor event filename regex.
- File must be quiescent (mtime >= quiescence_seconds in the past).
- File must not exceed `max_age_days`.
- File's sha256 must NOT already be in the forwarded state
(unless it was forwarded without its TXT and the TXT is now
present — see ForwardState.status).
- If a `<unit>/TXT/<basename>.txt` exists and is quiescent,
we pair them. Otherwise, if the binary is older than
missing_report_grace_seconds, we forward without the TXT.
Younger binaries with a missing TXT are deferred.
- When `max_per_pass > 0`, return at most that many pairs.
Older files (lower mtime) are forwarded first so backfill
proceeds chronologically.
"""
if not os.path.isdir(thordata_root):
log.warning("forward scan: thordata root not found: %s", thordata_root)
return []
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
# First pass: collect every event-binary entry with mtime for sorting.
candidates: List[Tuple[float, str, str]] = [] # (mtime, unit_path, binary_path)
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = list(os.scandir(unit_path))
except OSError:
continue
for e in entries:
if not e.is_file():
continue
if not is_event_binary(e.path):
continue
try:
mtime = e.stat().st_mtime
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
continue
candidates.append((mtime, unit_path, e.path))
# Sort oldest-first so backfill is chronological.
candidates.sort(key=lambda t: t[0])
pending: List[Tuple[str, Optional[str]]] = []
skipped_inflight = 0
skipped_already_forwarded = 0
txt_cache: Dict[str, Dict[str, str]] = {}
for mtime, unit_path, binary_path in candidates:
if not _is_quiescent(binary_path, now_ts, quiescence_seconds):
skipped_inflight += 1
continue
try:
digest = sha256_of_file(binary_path)
except OSError as exc:
log.warning("forward scan: sha256 failed for %s: %s", binary_path, exc)
continue
fwd_status = state.status(digest)
if fwd_status is True:
skipped_already_forwarded += 1
continue
binary_name = os.path.basename(binary_path)
txt_path = _find_txt_in_unit(
unit_path, binary_name, now_ts, quiescence_seconds, txt_cache,
)
if fwd_status is False:
# Previously forwarded WITHOUT report. Re-forward only
# if the TXT is now present so seismo-relay's upsert can
# refresh the row with authoritative device values.
if txt_path is None:
skipped_already_forwarded += 1
continue
elif txt_path is None:
# First-time forward and TXT not yet present. Wait for
# the grace period before forwarding alone.
if (now_ts - mtime) < missing_report_grace_seconds:
skipped_inflight += 1
continue
pending.append((binary_path, txt_path))
if max_per_pass and len(pending) >= max_per_pass:
break
log.debug(
"forward scan: %d pending skipped_inflight=%d already_forwarded=%d cap=%d",
len(pending), skipped_inflight, skipped_already_forwarded, max_per_pass,
)
return pending
# ── Multipart upload ──────────────────────────────────────────────────────────
def _encode_multipart(
parts: List[Tuple[str, str, str, bytes]],
) -> Tuple[bytes, str]:
"""Encode a list of (field_name, filename, content_type, data) tuples
as a multipart/form-data body. Returns (body_bytes, content_type
header value)."""
boundary = "----ThorWatcherBoundary" + os.urandom(8).hex()
chunks: List[bytes] = []
for field_name, filename, content_type, data in parts:
chunks.append(("--" + boundary + "\r\n").encode("ascii"))
chunks.append(
(f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{filename}"\r\n').encode("ascii")
)
chunks.append((f"Content-Type: {content_type}\r\n\r\n").encode("ascii"))
chunks.append(data)
chunks.append(b"\r\n")
chunks.append(("--" + boundary + "--\r\n").encode("ascii"))
body = b"".join(chunks)
content_type_hdr = f"multipart/form-data; boundary={boundary}"
return body, content_type_hdr
def _import_endpoint(sfm_url: str) -> str:
"""Compose the import endpoint URL from a base SFM URL."""
return sfm_url.rstrip("/") + "/db/import/idf_file"
def forward_event_pair(
sfm_url: str,
binary_path: str,
txt_path: Optional[str],
*,
serial_hint: Optional[str] = None,
timeout: float = DEFAULT_HTTP_TIMEOUT,
) -> Dict[str, Any]:
"""POST a single event (binary + optional .txt) to the SFM import
endpoint.
Returns a dict mirroring the per-file outcome the server returned
on success, or a dict with `status="error"` on transport/HTTP
failure.
"""
binary_name = os.path.basename(binary_path)
with open(binary_path, "rb") as f:
binary_bytes = f.read()
parts = [("files", binary_name, "application/octet-stream", binary_bytes)]
if txt_path is not None:
with open(txt_path, "rb") as f:
txt_bytes = f.read()
parts.append(("files", os.path.basename(txt_path), "text/plain", txt_bytes))
body, content_type = _encode_multipart(parts)
# Auto-derive serial from filename if caller didn't supply one.
if not serial_hint:
serial_hint = serial_from_filename(binary_name)
url = _import_endpoint(sfm_url)
if serial_hint:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}serial={serial_hint}"
req = urllib.request.Request(
url, data=body, method="POST",
headers={
"Content-Type": content_type,
"Content-Length": str(len(body)),
"User-Agent": "thor-watcher/sfm-forwarder",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return {
"status": "error",
"filename": binary_name,
"detail": f"server returned non-JSON: {raw[:200]!r}",
}
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name and entry.get("status") == "ok":
return entry
for entry in (payload.get("results") or []):
if entry.get("filename") == binary_name:
return entry
return {
"status": "error",
"filename": binary_name,
"detail": f"unexpected server response: {payload!r}",
}
except urllib.error.HTTPError as exc:
try:
body_excerpt = exc.read().decode("utf-8", errors="replace")[:300]
except Exception:
body_excerpt = ""
return {
"status": "error",
"filename": binary_name,
"detail": f"HTTP {exc.code}: {exc.reason} body={body_excerpt!r}",
}
except urllib.error.URLError as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"connection error: {exc.reason}",
}
except (OSError, TimeoutError) as exc:
return {
"status": "error",
"filename": binary_name,
"detail": f"transport error: {exc}",
}
# ── Top-level orchestration ───────────────────────────────────────────────────
def forward_pending(
thordata_root: str,
sfm_url: str,
state: ForwardState,
*,
max_age_days: int,
quiescence_seconds: float = DEFAULT_QUIESCENCE_SECONDS,
missing_report_grace_seconds: float = DEFAULT_MISSING_REPORT_GRACE_SECONDS,
timeout: float = DEFAULT_HTTP_TIMEOUT,
max_per_pass: int = 0,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""
Run one full pass: find pending events, POST each one, update state.
Returns a counts dict suitable for logging:
{
"scanned": <int>, # event binaries selected for forward
"forwarded": <int>, # successfully POSTed this pass
"errors": <int>, # POST failures (will retry next pass)
"with_report":<int>, # of forwarded, how many had a paired TXT
}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
pending = find_pending_events(
thordata_root, state,
max_age_days=max_age_days,
quiescence_seconds=quiescence_seconds,
missing_report_grace_seconds=missing_report_grace_seconds,
max_per_pass=max_per_pass,
)
counts = {"scanned": len(pending), "forwarded": 0, "errors": 0, "with_report": 0}
for binary_path, txt_path in pending:
result = forward_event_pair(
sfm_url, binary_path, txt_path,
timeout=timeout,
)
if result.get("status") == "ok":
try:
digest = sha256_of_file(binary_path)
size = os.path.getsize(binary_path)
state.mark_forwarded(
digest,
os.path.basename(binary_path),
size,
had_report=(txt_path is not None),
)
except OSError as exc:
_log(f"[forward] post-success state save failed for "
f"{os.path.basename(binary_path)}: {exc}")
counts["forwarded"] += 1
if txt_path:
counts["with_report"] += 1
if txt_path:
report_token = "+ {} attached".format(os.path.basename(txt_path))
elif is_histogram_event(binary_path):
report_token = "(histogram, no report expected)"
else:
report_token = "no report"
_log(
"[forward] OK {} ({}B, {}, inserted={}, skipped={})".format(
os.path.basename(binary_path),
result.get("filesize", 0),
report_token,
result.get("inserted", 0),
result.get("skipped", 0),
)
)
else:
counts["errors"] += 1
_log(
f"[forward] ERR {os.path.basename(binary_path)}: "
f"{result.get('detail', 'unknown error')}"
)
return counts
# ── Seed-state mode (skip historical backfill on first deploy) ────────────────
def seed_state_from_folder(
thordata_root: str,
state: ForwardState,
*,
max_age_days: int = 365,
logger: Optional[Any] = None,
) -> Dict[str, int]:
"""Walk `thordata_root` and mark every existing event binary as
already forwarded — without POSTing anything.
Run this ONCE before enabling sfm_forward_enabled on a machine
with a large historical archive. The watcher then starts
forwarding only events that appear AFTER the seed run.
Returns a counts dict:
{"scanned": int, "seeded": int, "already_known": int, "skipped_too_old": int}
"""
def _log(msg: str) -> None:
if logger:
logger(msg)
else:
log.info(msg)
counts = {"scanned": 0, "seeded": 0, "already_known": 0, "skipped_too_old": 0}
if not os.path.isdir(thordata_root):
_log(f"[seed] thordata root not found: {thordata_root}")
return counts
now_ts = time.time()
max_age_seconds = max(1, int(max_age_days)) * 86400.0
for _proj, _unit, unit_path in _iter_unit_dirs(thordata_root):
try:
entries = [e for e in os.scandir(unit_path) if e.is_file()]
except OSError:
continue
for e in entries:
if not is_event_binary(e.path):
continue
counts["scanned"] += 1
try:
mtime = e.stat().st_mtime
size = e.stat().st_size
except OSError:
continue
if (now_ts - mtime) > max_age_seconds:
counts["skipped_too_old"] += 1
continue
try:
digest = sha256_of_file(e.path)
except OSError as exc:
_log(f"[seed] sha256 failed for {e.path}: {exc}")
continue
if state.is_forwarded(digest):
counts["already_known"] += 1
continue
state.mark_forwarded(digest, e.name, size)
counts["seeded"] += 1
if counts["seeded"] % 1000 == 0:
_log(f"[seed] progress: {counts['seeded']} seeded so far...")
_log(
f"[seed] done. scanned={counts['scanned']} seeded={counts['seeded']} "
f"already_known={counts['already_known']} "
f"skipped_too_old={counts['skipped_too_old']}"
)
return counts
# ── CLI entry point ─────────────────────────────────────────────────────────
def _main() -> int:
"""Command-line interface for one-shot operations.
python event_forwarder.py --seed-state \\
--thordata "C:\\THORDATA" \\
--state "<path/to/thor_forwarded.json>" \\
[--max-age-days 365]
"""
import argparse
parser = argparse.ArgumentParser(
description="Thor Watcher — SFM event forwarder utilities",
)
parser.add_argument(
"--seed-state", action="store_true",
help="Mark every event binary in --thordata as already-forwarded "
"(without POSTing). Use this BEFORE enabling sfm_forward "
"on a machine with a large historical archive.",
)
parser.add_argument(
"--thordata", required=True,
help="Path to the THORDATA root folder.",
)
parser.add_argument(
"--state", required=True,
help="Path to the JSON state file. Will be created if missing.",
)
parser.add_argument(
"--max-age-days", type=int, default=365,
help="Only seed files newer than this many days (default 365).",
)
args = parser.parse_args()
if not args.seed_state:
parser.error("specify --seed-state (no other modes supported yet)")
print(f"[seed] thordata = {args.thordata}")
print(f"[seed] state = {args.state}")
print(f"[seed] max_age = {args.max_age_days} days")
state = ForwardState(args.state)
print(f"[seed] state currently has {state.count()} entries")
seed_state_from_folder(
args.thordata, state,
max_age_days=args.max_age_days,
logger=lambda m: print(m),
)
print(f"[seed] state now has {state.count()} entries")
return 0
if __name__ == "__main__":
import sys
sys.exit(_main())