101 lines
2.9 KiB
Python
101 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Diagnostic poller for NL-43 TCP connectivity.
|
|
|
|
Every interval, open a TCP connection, send DOD?, read response, and log results.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import socket
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# ---- Configuration (edit as needed) ----
|
|
HOST = "192.168.0.10"
|
|
PORT = 2255
|
|
INTERVAL_SECONDS = 5 * 60
|
|
CONNECT_TIMEOUT_SECONDS = 5.0
|
|
READ_TIMEOUT_SECONDS = 5.0
|
|
LOG_PATH = Path("nl43_dod_poll.log")
|
|
# ---------------------------------------
|
|
|
|
|
|
def _timestamp() -> str:
|
|
return dt.datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
|
|
|
|
|
def _read_line(sock_file) -> str:
|
|
line = sock_file.readline()
|
|
if not line:
|
|
raise ConnectionError("Socket closed before full response")
|
|
return line.decode("ascii", errors="ignore").strip()
|
|
|
|
|
|
def _poll_once() -> tuple[bool, str, str, str, str]:
|
|
sock = None
|
|
result_code = ""
|
|
data_line = ""
|
|
try:
|
|
sock = socket.create_connection((HOST, PORT), timeout=CONNECT_TIMEOUT_SECONDS)
|
|
sock.settimeout(READ_TIMEOUT_SECONDS)
|
|
|
|
sock.sendall(b"DOD?\r\n")
|
|
|
|
with sock.makefile("rb") as sock_file:
|
|
result_code = _read_line(sock_file)
|
|
if result_code.startswith("$"):
|
|
result_code = result_code[1:].strip()
|
|
|
|
if result_code != "R+0000":
|
|
return False, "other", f"device_result={result_code}", result_code, data_line
|
|
|
|
data_line = _read_line(sock_file)
|
|
if data_line.startswith("$"):
|
|
data_line = data_line[1:].strip()
|
|
|
|
return True, "none", "ok", result_code, data_line
|
|
except socket.timeout:
|
|
return False, "timeout", "socket_timeout", result_code, data_line
|
|
except ConnectionRefusedError:
|
|
return False, "refused", "connection_refused", result_code, data_line
|
|
except OSError as exc:
|
|
return False, "other", f"os_error={exc.__class__.__name__}", result_code, data_line
|
|
except Exception as exc:
|
|
return False, "other", f"error={exc.__class__.__name__}", result_code, data_line
|
|
finally:
|
|
if sock is not None:
|
|
try:
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
except OSError:
|
|
pass
|
|
sock.close()
|
|
|
|
|
|
def _log_line(text: str) -> None:
|
|
print(text, flush=True)
|
|
with LOG_PATH.open("a", encoding="ascii") as handle:
|
|
handle.write(text + "\n")
|
|
|
|
|
|
def main() -> None:
|
|
while True:
|
|
start = time.monotonic()
|
|
ok, error_type, detail, result_code, data_line = _poll_once()
|
|
|
|
status = "success" if ok else "failure"
|
|
msg = (
|
|
f"ts={_timestamp()} status={status} error_type={error_type} "
|
|
f"detail={detail} result_code={result_code} data={data_line}"
|
|
)
|
|
_log_line(msg)
|
|
|
|
elapsed = time.monotonic() - start
|
|
sleep_for = max(0.0, INTERVAL_SECONDS - elapsed)
|
|
time.sleep(sleep_for)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|