Python-based time management application with UDP discovery, TCP protocol communication, time sync, and drift monitoring. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
"""ASCII 프로토콜 (시간 동기화 cmd30, 시간 읽기 cmd31)"""
|
|
|
|
import socket
|
|
from datetime import datetime
|
|
|
|
from config import TCP_TIMEOUT, TCP_RECV_BUFFER, PROTOCOL_ENCODING
|
|
from utils.time_format import datetime_to_protocol, protocol_to_datetime
|
|
|
|
|
|
def sync_time(ip: str, port: int, dt: datetime | None = None) -> tuple[bool, str]:
|
|
"""컨트롤러에 PC 시간 동기화 (명령 30).
|
|
|
|
패킷: ![0030YYMMDDdHHMMSS!]
|
|
예시: ![003026020912 30229!] (2026-02-09 월 23:02:29)
|
|
응답: ![00300!] (성공) / ![0030F!] (실패)
|
|
|
|
Returns: (성공여부, 메시지)
|
|
"""
|
|
if dt is None:
|
|
dt = datetime.now()
|
|
|
|
time_str = datetime_to_protocol(dt)
|
|
cmd = f"![0030{time_str}!]"
|
|
|
|
try:
|
|
resp = _tcp_send_recv(ip, port, cmd)
|
|
except Exception as e:
|
|
return False, f"연결 실패: {e}"
|
|
|
|
if "![00300!]" in resp:
|
|
return True, "동기화 성공"
|
|
elif "![0030F!]" in resp:
|
|
return False, "동기화 실패 (컨트롤러 응답 F)"
|
|
else:
|
|
return False, f"알 수 없는 응답: {resp}"
|
|
|
|
|
|
def read_time(ip: str, port: int) -> tuple[datetime | None, datetime, datetime, str]:
|
|
"""컨트롤러 시간 읽기 (명령 31).
|
|
|
|
패킷: ![0031!]
|
|
응답: ![0031YYMMDDdHHMMSS!] (13자 시간 데이터)
|
|
|
|
Returns: (컨트롤러시간 | None, PC시간_before, PC시간_after, 에러메시지)
|
|
"""
|
|
cmd = "![0031!]"
|
|
|
|
pc_before = datetime.now()
|
|
try:
|
|
resp = _tcp_send_recv(ip, port, cmd)
|
|
except Exception as e:
|
|
pc_after = datetime.now()
|
|
return None, pc_before, pc_after, f"연결 실패: {e}"
|
|
pc_after = datetime.now()
|
|
|
|
# 응답 파싱: ![0031YYMMDDdHHMMSS!]
|
|
# "![0031" 뒤에 13자의 시간 데이터
|
|
try:
|
|
start = resp.index("![0031") + 6
|
|
time_data = resp[start:start + 13]
|
|
ctrl_time = protocol_to_datetime(time_data)
|
|
return ctrl_time, pc_before, pc_after, ""
|
|
except (ValueError, IndexError) as e:
|
|
return None, pc_before, pc_after, f"응답 파싱 실패: {resp} ({e})"
|
|
|
|
|
|
def _tcp_send_recv(ip: str, port: int, cmd: str) -> str:
|
|
"""TCP 연결하여 명령 전송 후 응답 수신."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(TCP_TIMEOUT)
|
|
try:
|
|
sock.connect((ip, port))
|
|
sock.sendall(cmd.encode(PROTOCOL_ENCODING))
|
|
resp = sock.recv(TCP_RECV_BUFFER)
|
|
return resp.decode(PROTOCOL_ENCODING, errors="replace")
|
|
finally:
|
|
sock.close()
|