Files
time_manager/network/tcp_protocol.py
insulee 3c14e1e401 Initial commit: Dabit Time Manager project
Python-based time management application with UDP discovery,
TCP protocol communication, time sync, and drift monitoring.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 11:10:55 +09:00

78 lines
2.4 KiB
Python

"""ASCII 프로토콜 (시간 동기화 cmd30, 시간 읽기 cmd31)"""
import socket
from datetime import datetime
from config import TCP_TIMEOUT, TCP_RECV_BUFFER, PROTOCOL_ENCODING
from utils.time_format import datetime_to_protocol, protocol_to_datetime
def sync_time(ip: str, port: int, dt: datetime | None = None) -> tuple[bool, str]:
"""컨트롤러에 PC 시간 동기화 (명령 30).
패킷: ![0030YYMMDDdHHMMSS!]
예시: ![003026020912 30229!] (2026-02-09 월 23:02:29)
응답: ![00300!] (성공) / ![0030F!] (실패)
Returns: (성공여부, 메시지)
"""
if dt is None:
dt = datetime.now()
time_str = datetime_to_protocol(dt)
cmd = f"![0030{time_str}!]"
try:
resp = _tcp_send_recv(ip, port, cmd)
except Exception as e:
return False, f"연결 실패: {e}"
if "![00300!]" in resp:
return True, "동기화 성공"
elif "![0030F!]" in resp:
return False, "동기화 실패 (컨트롤러 응답 F)"
else:
return False, f"알 수 없는 응답: {resp}"
def read_time(ip: str, port: int) -> tuple[datetime | None, datetime, datetime, str]:
"""컨트롤러 시간 읽기 (명령 31).
패킷: ![0031!]
응답: ![0031YYMMDDdHHMMSS!] (13자 시간 데이터)
Returns: (컨트롤러시간 | None, PC시간_before, PC시간_after, 에러메시지)
"""
cmd = "![0031!]"
pc_before = datetime.now()
try:
resp = _tcp_send_recv(ip, port, cmd)
except Exception as e:
pc_after = datetime.now()
return None, pc_before, pc_after, f"연결 실패: {e}"
pc_after = datetime.now()
# 응답 파싱: ![0031YYMMDDdHHMMSS!]
# "![0031" 뒤에 13자의 시간 데이터
try:
start = resp.index("![0031") + 6
time_data = resp[start:start + 13]
ctrl_time = protocol_to_datetime(time_data)
return ctrl_time, pc_before, pc_after, ""
except (ValueError, IndexError) as e:
return None, pc_before, pc_after, f"응답 파싱 실패: {resp} ({e})"
def _tcp_send_recv(ip: str, port: int, cmd: str) -> str:
"""TCP 연결하여 명령 전송 후 응답 수신."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(TCP_TIMEOUT)
try:
sock.connect((ip, port))
sock.sendall(cmd.encode(PROTOCOL_ENCODING))
resp = sock.recv(TCP_RECV_BUFFER)
return resp.decode(PROTOCOL_ENCODING, errors="replace")
finally:
sock.close()