diff --git a/nl43_dod_poll.py b/nl43_dod_poll.py new file mode 100644 index 0000000..3691c2e --- /dev/null +++ b/nl43_dod_poll.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +Diagnostic poller for NL-43 TCP connectivity. + +Every interval, open a TCP connection, send DOD?, read response, and log results. +""" + +from __future__ import annotations + +import datetime as dt +import socket +import time +from pathlib import Path + +# ---- Configuration (edit as needed) ---- +HOST = "192.168.0.10" +PORT = 2255 +INTERVAL_SECONDS = 5 * 60 +CONNECT_TIMEOUT_SECONDS = 5.0 +READ_TIMEOUT_SECONDS = 5.0 +LOG_PATH = Path("nl43_dod_poll.log") +# --------------------------------------- + + +def _timestamp() -> str: + return dt.datetime.utcnow().isoformat(timespec="seconds") + "Z" + + +def _read_line(sock_file) -> str: + line = sock_file.readline() + if not line: + raise ConnectionError("Socket closed before full response") + return line.decode("ascii", errors="ignore").strip() + + +def _poll_once() -> tuple[bool, str, str, str, str]: + sock = None + result_code = "" + data_line = "" + try: + sock = socket.create_connection((HOST, PORT), timeout=CONNECT_TIMEOUT_SECONDS) + sock.settimeout(READ_TIMEOUT_SECONDS) + + sock.sendall(b"DOD?\r\n") + + with sock.makefile("rb") as sock_file: + result_code = _read_line(sock_file) + if result_code.startswith("$"): + result_code = result_code[1:].strip() + + if result_code != "R+0000": + return False, "other", f"device_result={result_code}", result_code, data_line + + data_line = _read_line(sock_file) + if data_line.startswith("$"): + data_line = data_line[1:].strip() + + return True, "none", "ok", result_code, data_line + except socket.timeout: + return False, "timeout", "socket_timeout", result_code, data_line + except ConnectionRefusedError: + return False, "refused", "connection_refused", result_code, data_line + except OSError as exc: + return False, "other", f"os_error={exc.__class__.__name__}", result_code, data_line + except Exception as exc: + return False, "other", f"error={exc.__class__.__name__}", result_code, data_line + finally: + if sock is not None: + try: + sock.shutdown(socket.SHUT_RDWR) + except OSError: + pass + sock.close() + + +def _log_line(text: str) -> None: + print(text, flush=True) + with LOG_PATH.open("a", encoding="ascii") as handle: + handle.write(text + "\n") + + +def main() -> None: + while True: + start = time.monotonic() + ok, error_type, detail, result_code, data_line = _poll_once() + + status = "success" if ok else "failure" + msg = ( + f"ts={_timestamp()} status={status} error_type={error_type} " + f"detail={detail} result_code={result_code} data={data_line}" + ) + _log_line(msg) + + elapsed = time.monotonic() - start + sleep_for = max(0.0, INTERVAL_SECONDS - elapsed) + time.sleep(sleep_for) + + +if __name__ == "__main__": + main()