commit 3c14e1e40150f4946f290351422fbd269d98e021 Author: insulee Date: Tue Feb 10 11:10:55 2026 +0900 Initial commit: Dabit Time Manager project Python-based time management application with UDP discovery, TCP protocol communication, time sync, and drift monitoring. Co-Authored-By: Claude Opus 4.6 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..05484df --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +# Python cache +__pycache__/ +*.py[cod] +*$py.class + +# PyInstaller build artifacts +build/ +dist/ + +# Reference/documentation (large external files) +reference/ + +# IDE +.vscode/ +.idea/ + +# Logs +*.log diff --git a/Dabit Time Manager.spec b/Dabit Time Manager.spec new file mode 100644 index 0000000..c72a005 --- /dev/null +++ b/Dabit Time Manager.spec @@ -0,0 +1,39 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['main.py'], + pathex=[], + binaries=[], + datas=[('icon.ico', '.')], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='Dabit Time Manager', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, + icon=['icon.ico'], +) diff --git a/RTC_Drift_Tester.spec b/RTC_Drift_Tester.spec new file mode 100644 index 0000000..b99ba44 --- /dev/null +++ b/RTC_Drift_Tester.spec @@ -0,0 +1,38 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['main.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='RTC_Drift_Tester', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/config.py b/config.py new file mode 100644 index 0000000..4216a4a --- /dev/null +++ b/config.py @@ -0,0 +1,37 @@ +"""상수 및 기본 설정""" + +# UDP 포트 +UDP_PORT_ETHERNET = 5108 +UDP_PORT_WIFI = 5107 +UDP_BROADCAST_IP = "255.255.255.255" + +# UDP 명령 +SEARCH_CMD = "SEARCHING DIBD B\r\n" +SETT_PREFIX = "SETT " +RESET_PREFIX = "RESET " + +# UDP 수신 버퍼 +UDP_RECV_BUFFER = 1024 +UDP_RECV_TIMEOUT = 3.0 # 초 + +# TCP 설정 +TCP_TIMEOUT = 5.0 # 초 +TCP_RECV_BUFFER = 256 + +# ASCII 프로토콜 +CMD_TIME_SYNC = "30" # PC 시간 동기화 +CMD_TIME_READ = "31" # 컨트롤러 시간 읽기 + +# DIBD 응답 패킷 크기 +DIBD_PACKET_SIZE = 222 + +# 모니터링 기본 주기 (초) +DEFAULT_MONITOR_INTERVAL = 3600 # 1시간 + +# 인코딩 +PROTOCOL_ENCODING = "euc-kr" # MS949 호환 + +# GUI +APP_TITLE = "Dabit Time Manager" +APP_WIDTH = 1050 +APP_HEIGHT = 720 diff --git a/gui/__init__.py b/gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gui/app.py b/gui/app.py new file mode 100644 index 0000000..a6008e7 --- /dev/null +++ b/gui/app.py @@ -0,0 +1,599 @@ +"""RTC Time Drift Tester - 단일 화면 GUI""" + +import os +import sys +import tkinter as tk +from tkinter import ttk, messagebox, filedialog +import threading +from datetime import datetime + +from config import APP_TITLE, APP_WIDTH, APP_HEIGHT, DEFAULT_MONITOR_INTERVAL +from models.controller import Controller +from models.reading import TimeReading +from network.udp_discovery import get_network_interfaces, search_controllers +from services.time_sync_service import sync_all +from services.drift_monitor import DriftMonitor +from services.export_service import ( + export_readings_csv, export_per_controller_csv, export_summary_csv, +) +from gui.settings_dialog import SettingsDialog + + +class App: + + def __init__(self): + self.root = tk.Tk() + self.root.title(APP_TITLE) + self.root.geometry(f"{APP_WIDTH}x{APP_HEIGHT}") + self.root.minsize(900, 600) + + # 아이콘 설정 (윈도우 좌상단 + 작업표시줄) + if getattr(sys, 'frozen', False): + base_path = sys._MEIPASS + else: + base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + icon_path = os.path.join(base_path, "icon.ico") + if os.path.exists(icon_path): + self.root.iconbitmap(icon_path) + + self._controllers: list[Controller] = [] + self._monitor = DriftMonitor() + self._searching = False + self._syncing = False + self._log_queue: list[str] = [] + self._search_finished = False + self._search_results: list[Controller] = [] + + self._build_ui() + self._update_clock() + self.root.protocol("WM_DELETE_WINDOW", self._on_close) + + # ── UI 구성 ───────────────────────────────────────────── + + def _build_ui(self): + # 상단: NIC + 검색 + PC 시간 + self._build_top_bar() + # 컨트롤러 테이블 + self._build_ctrl_table() + # 로그 (2줄) + self._build_log() + # 액션 바 (동기화 + 모니터링) + self._build_action_bar() + # 하단: 기록 + 요약 (좌우 분할) + self._build_bottom_pane() + + def _build_top_bar(self): + top = ttk.Frame(self.root) + top.pack(fill=tk.X, padx=8, pady=(8, 2)) + + ttk.Label(top, text="네트워크:").pack(side=tk.LEFT) + self._nic_var = tk.StringVar() + self._nic_combo = ttk.Combobox( + top, textvariable=self._nic_var, state="readonly", width=22) + self._nic_combo.pack(side=tk.LEFT, padx=(4, 8)) + + self._btn_search = ttk.Button(top, text="검색", command=self._on_search) + self._btn_search.pack(side=tk.LEFT) + + self._search_status = ttk.Label(top, text="", foreground="gray") + self._search_status.pack(side=tk.LEFT, padx=8) + + # PC 시간 (오른쪽) + self._clock_label = ttk.Label(top, text="", font=("Consolas", 13, "bold")) + self._clock_label.pack(side=tk.RIGHT) + ttk.Label(top, text="PC 시간 ").pack(side=tk.RIGHT) + + self._refresh_nics() + + def _build_ctrl_table(self): + frame = ttk.LabelFrame(self.root, text="컨트롤러", padding=4) + frame.pack(fill=tk.BOTH, padx=8, pady=2, expand=False) + + # Treeview + cols = ("sel", "mac", "ip", "port", "name", "last_drift") + self._ctrl_tree = ttk.Treeview(frame, columns=cols, show="headings", height=5) + self._ctrl_tree.heading("sel", text="✓") + self._ctrl_tree.heading("mac", text="MAC") + self._ctrl_tree.heading("ip", text="IP") + self._ctrl_tree.heading("port", text="포트") + self._ctrl_tree.heading("name", text="이름") + self._ctrl_tree.heading("last_drift", text="마지막 오차") + self._ctrl_tree.column("sel", width=30, anchor=tk.CENTER, stretch=False) + self._ctrl_tree.column("mac", width=140) + self._ctrl_tree.column("ip", width=120) + self._ctrl_tree.column("port", width=50) + self._ctrl_tree.column("name", width=160) + self._ctrl_tree.column("last_drift", width=90, anchor=tk.CENTER) + + scr = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self._ctrl_tree.yview) + self._ctrl_tree.configure(yscrollcommand=scr.set) + self._ctrl_tree.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + scr.place(relx=1.0, rely=0, relheight=1.0, anchor=tk.NE) + + self._ctrl_tree.bind("", self._on_ctrl_click) + + # 버튼 행 + btn_row = ttk.Frame(frame) + btn_row.pack(fill=tk.X, pady=(4, 0)) + ttk.Button(btn_row, text="전체 선택", command=self._select_all).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="전체 해제", command=self._deselect_all).pack(side=tk.LEFT, padx=2) + ttk.Separator(btn_row, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=8) + ttk.Button(btn_row, text="시간 동기화", command=self._on_sync).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="설정 변경...", command=self._on_settings).pack(side=tk.LEFT, padx=2) + + def _build_log(self): + frame = ttk.LabelFrame(self.root, text="로그", padding=2) + frame.pack(fill=tk.X, padx=8, pady=2) + self._log = tk.Text(frame, height=8, state=tk.DISABLED, wrap=tk.WORD, + font=("Consolas", 9)) + scr_log = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self._log.yview) + self._log.configure(yscrollcommand=scr_log.set) + scr_log.pack(side=tk.RIGHT, fill=tk.Y) + self._log.pack(fill=tk.X) + + def _build_action_bar(self): + bar = ttk.Frame(self.root) + bar.pack(fill=tk.X, padx=8, pady=2) + + ttk.Label(bar, text="모니터링", font=("", 9, "bold")).pack(side=tk.LEFT, padx=(0, 8)) + ttk.Label(bar, text="주기:").pack(side=tk.LEFT) + self._interval_var = tk.StringVar(value=str(DEFAULT_MONITOR_INTERVAL)) + ttk.Entry(bar, textvariable=self._interval_var, width=6).pack(side=tk.LEFT, padx=2) + ttk.Label(bar, text="초").pack(side=tk.LEFT, padx=(0, 6)) + + self._btn_start = ttk.Button(bar, text="시작", command=self._on_monitor_start) + self._btn_start.pack(side=tk.LEFT, padx=2) + self._btn_stop = ttk.Button(bar, text="중지", command=self._on_monitor_stop, state=tk.DISABLED) + self._btn_stop.pack(side=tk.LEFT, padx=2) + self._btn_read = ttk.Button(bar, text="즉시 읽기", command=self._on_read_now) + self._btn_read.pack(side=tk.LEFT, padx=2) + + ttk.Separator(bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=6) + self._btn_hourly = ttk.Button(bar, text="매시 정시 읽기", command=self._on_hourly_toggle) + self._btn_hourly.pack(side=tk.LEFT, padx=2) + + ttk.Separator(bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=6) + ttk.Button(bar, text="기록 초기화", command=self._on_clear).pack(side=tk.LEFT, padx=2) + + self._mon_status = ttk.Label(bar, text="대기중", foreground="gray") + self._mon_status.pack(side=tk.RIGHT) + + def _build_bottom_pane(self): + pane = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL) + pane.pack(fill=tk.BOTH, expand=True, padx=8, pady=(2, 8)) + + # ── 왼쪽: 오차 기록 ── + left = ttk.LabelFrame(pane, text="오차 기록", padding=4) + + filt = ttk.Frame(left) + filt.pack(fill=tk.X, pady=(0, 4)) + ttk.Label(filt, text="컨트롤러:").pack(side=tk.LEFT) + self._filter_var = tk.StringVar(value="전체") + self._filter_combo = ttk.Combobox( + filt, textvariable=self._filter_var, state="readonly", width=28) + self._filter_combo["values"] = ["전체"] + self._filter_combo.pack(side=tk.LEFT, padx=4) + self._filter_combo.bind("<>", self._on_filter_changed) + + cols_r = ("ctrl", "time", "ctrl_time", "drift") + self._read_tree = ttk.Treeview(left, columns=cols_r, show="headings", height=10) + self._read_tree.heading("ctrl", text="컨트롤러") + self._read_tree.heading("time", text="PC 시간") + self._read_tree.heading("ctrl_time", text="컨트롤러 시간") + self._read_tree.heading("drift", text="오차") + self._read_tree.column("ctrl", width=130) + self._read_tree.column("time", width=135) + self._read_tree.column("ctrl_time", width=135) + self._read_tree.column("drift", width=70, anchor=tk.CENTER) + scr_r = ttk.Scrollbar(left, orient=tk.VERTICAL, command=self._read_tree.yview) + self._read_tree.configure(yscrollcommand=scr_r.set) + self._read_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scr_r.pack(side=tk.RIGHT, fill=tk.Y) + + # ── 오른쪽: 요약 + 내보내기 ── + right = ttk.LabelFrame(pane, text="요약", padding=4) + + cols_s = ("ctrl", "count", "avg", "max_d") + self._sum_tree = ttk.Treeview(right, columns=cols_s, show="headings", height=10) + self._sum_tree.heading("ctrl", text="컨트롤러") + self._sum_tree.heading("count", text="읽기 횟수") + self._sum_tree.heading("avg", text="평균 오차(초)") + self._sum_tree.heading("max_d", text="최대 오차(초)") + self._sum_tree.column("ctrl", width=150) + self._sum_tree.column("count", width=70, anchor=tk.CENTER) + self._sum_tree.column("avg", width=90, anchor=tk.CENTER) + self._sum_tree.column("max_d", width=90, anchor=tk.CENTER) + scr_s = ttk.Scrollbar(right, orient=tk.VERTICAL, command=self._sum_tree.yview) + self._sum_tree.configure(yscrollcommand=scr_s.set) + self._sum_tree.pack(side=tk.TOP, fill=tk.BOTH, expand=True) + scr_s.place(relx=1.0, rely=0, relheight=0.85, anchor=tk.NE) + + # 내보내기 버튼 + exp = ttk.Frame(right) + exp.pack(fill=tk.X, pady=(6, 0)) + mb = ttk.Menubutton(exp, text="CSV 내보내기 ▾") + menu = tk.Menu(mb, tearoff=0) + menu.add_command(label="전체 내보내기 (단일 파일)", command=self._export_all) + menu.add_command(label="컨트롤러별 내보내기 (폴더)", command=self._export_per_ctrl) + menu.add_separator() + menu.add_command(label="요약 내보내기", command=self._export_summary) + mb["menu"] = menu + mb.pack(side=tk.LEFT) + + pane.add(left, weight=3) + pane.add(right, weight=2) + + # ── 유틸 ──────────────────────────────────────────────── + + def _log_msg(self, msg: str): + self._log.config(state=tk.NORMAL) + self._log.insert(tk.END, msg + "\n") + self._log.see(tk.END) + self._log.config(state=tk.DISABLED) + + def _update_clock(self): + self._clock_label.config(text=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + self.root.after(1000, self._update_clock) + + def _refresh_nics(self): + nics = get_network_interfaces() + self._nic_list = nics + self._nic_combo["values"] = [f"{ip}" for ip, _ in nics] + if nics: + self._nic_combo.current(0) + + def _get_bind_ip(self) -> str: + idx = self._nic_combo.current() + if 0 <= idx < len(self._nic_list): + return self._nic_list[idx][0] + return "0.0.0.0" + + def _refresh_ctrl_tree(self): + self._ctrl_tree.delete(*self._ctrl_tree.get_children()) + for ctrl in self._controllers: + sel = "☑" if ctrl.selected else "☐" + self._ctrl_tree.insert("", tk.END, values=( + sel, ctrl.mac, ctrl.ip, ctrl.port, ctrl.name, "-" + )) + + def _update_ctrl_drift(self, mac: str, drift_text: str): + """컨트롤러 테이블의 '마지막 오차' 컬럼 갱신.""" + for item in self._ctrl_tree.get_children(): + vals = self._ctrl_tree.item(item, "values") + if vals[1] == mac: + new_vals = list(vals) + new_vals[5] = drift_text + self._ctrl_tree.item(item, values=new_vals) + break + + def _update_filter_options(self): + labels = ["전체"] + [c.display_label for c in self._controllers] + self._filter_combo["values"] = labels + + def _get_selected_ctrl(self) -> Controller | None: + sel = self._ctrl_tree.selection() + if not sel: + return None + vals = self._ctrl_tree.item(sel[0], "values") + mac = vals[1] + for c in self._controllers: + if c.mac == mac: + return c + return None + + # ── 컨트롤러 체크박스 ─────────────────────────────────── + + def _on_ctrl_click(self, event): + col = self._ctrl_tree.identify_column(event.x) + item = self._ctrl_tree.identify_row(event.y) + if col == "#1" and item: # sel 컬럼 + idx = self._ctrl_tree.index(item) + if idx < len(self._controllers): + ctrl = self._controllers[idx] + ctrl.selected = not ctrl.selected + vals = list(self._ctrl_tree.item(item, "values")) + vals[0] = "☑" if ctrl.selected else "☐" + self._ctrl_tree.item(item, values=vals) + + def _select_all(self): + for ctrl in self._controllers: + ctrl.selected = True + self._refresh_ctrl_tree_sel() + + def _deselect_all(self): + for ctrl in self._controllers: + ctrl.selected = False + self._refresh_ctrl_tree_sel() + + def _refresh_ctrl_tree_sel(self): + for item in self._ctrl_tree.get_children(): + idx = self._ctrl_tree.index(item) + if idx < len(self._controllers): + vals = list(self._ctrl_tree.item(item, "values")) + vals[0] = "☑" if self._controllers[idx].selected else "☐" + self._ctrl_tree.item(item, values=vals) + + # ── 검색 ──────────────────────────────────────────────── + + def _on_search(self): + if self._searching: + return + self._searching = True + self._btn_search.config(state=tk.DISABLED) + self._search_status.config(text="검색중...") + self._controllers.clear() + self._ctrl_tree.delete(*self._ctrl_tree.get_children()) + self._log_queue.clear() + self._search_finished = False + + bind_ip = self._get_bind_ip() + self._log_msg(f"검색 시작 (NIC: {bind_ip})") + threading.Thread(target=self._search_thread, args=(bind_ip,), daemon=True).start() + self._poll_search() + + def _search_thread(self, bind_ip: str): + results = search_controllers( + bind_ip=bind_ip, timeout=3.0, + on_log=lambda m: self._log_queue.append(m), + ) + self._search_results = results + self._search_finished = True + + def _poll_search(self): + while self._log_queue: + self._log_msg(self._log_queue.pop(0)) + if self._search_finished: + self._search_finished = False + self._searching = False + self._btn_search.config(state=tk.NORMAL) + self._controllers.clear() + self._controllers.extend(self._search_results) + self._refresh_ctrl_tree() + self._update_filter_options() + self._search_status.config(text=f"{len(self._controllers)}개 발견") + return + self.root.after(100, self._poll_search) + + # ── 설정 변경 ─────────────────────────────────────────── + + def _on_settings(self): + ctrl = self._get_selected_ctrl() + if not ctrl: + messagebox.showinfo("알림", "컨트롤러를 선택하세요.") + return + SettingsDialog(self.root, ctrl, self._get_bind_ip()) + # 다이얼로그 닫힌 후 테이블 갱신 + self._refresh_ctrl_tree() + + # ── 동기화 ────────────────────────────────────────────── + + def _on_sync(self): + if self._syncing: + return + selected = [c for c in self._controllers if c.selected] + if not selected: + self._log_msg("동기화할 컨트롤러가 없습니다.") + return + self._syncing = True + self._log_msg(f"동기화 시작 ({len(selected)}대)...") + threading.Thread(target=self._sync_thread, daemon=True).start() + + def _sync_thread(self): + results = sync_all(self._controllers) + self.root.after(0, self._sync_done, results) + + def _sync_done(self, results): + self._syncing = False + ok_count = 0 + for ctrl, ok, msg in results: + tag = "OK" if ok else "FAIL" + self._log_msg(f" [{tag}] {ctrl.display_label}: {msg}") + drift_text = "동기화OK" if ok else "동기화실패" + self._update_ctrl_drift(ctrl.mac, drift_text) + if ok: + ok_count += 1 + self._log_msg(f"동기화 완료: {ok_count}/{len(results)} 성공") + + # ── 모니터링 ──────────────────────────────────────────── + + def _on_monitor_start(self): + selected = [c for c in self._controllers if c.selected] + if not selected: + messagebox.showwarning("알림", "모니터링할 컨트롤러가 없습니다.") + return + try: + interval = int(self._interval_var.get().strip()) + if interval < 1: + raise ValueError + except ValueError: + messagebox.showerror("오류", "주기는 1 이상의 정수(초)여야 합니다.") + return + + self._monitor.configure( + controllers=self._controllers, + interval=interval, + on_readings=lambda r: self.root.after(0, self._on_new_readings, r), + on_error=lambda m: self.root.after(0, self._log_msg, m), + ) + self._monitor.start() + self._btn_start.config(state=tk.DISABLED) + self._btn_stop.config(state=tk.NORMAL) + self._mon_status.config(text=f"모니터링 중 (주기 {interval}초)", foreground="green") + + def _on_monitor_stop(self): + self._monitor.stop() + self._btn_start.config(state=tk.NORMAL) + self._btn_stop.config(state=tk.DISABLED) + self._mon_status.config(text="정지됨", foreground="gray") + + def _on_read_now(self): + if not self._controllers: + messagebox.showwarning("알림", "컨트롤러가 없습니다.") + return + if not self._monitor.is_running: + try: + interval = int(self._interval_var.get().strip()) + except ValueError: + interval = DEFAULT_MONITOR_INTERVAL + self._monitor.configure( + controllers=self._controllers, + interval=interval, + on_readings=lambda r: self.root.after(0, self._on_new_readings, r), + on_error=lambda m: self.root.after(0, self._log_msg, m), + ) + self._monitor.read_once() + self._mon_status.config(text="읽기 중...", foreground="orange") + + def _on_clear(self): + self._monitor.clear_readings() + self._read_tree.delete(*self._read_tree.get_children()) + self._sum_tree.delete(*self._sum_tree.get_children()) + for ctrl in self._controllers: + self._update_ctrl_drift(ctrl.mac, "-") + + def _on_new_readings(self, readings: list[TimeReading]): + for r in readings: + self._read_tree.insert("", tk.END, iid=None, values=( + r.controller_label, + r.pc_time.strftime("%Y-%m-%d %H:%M:%S"), + r.controller_time.strftime("%Y-%m-%d %H:%M:%S"), + r.drift_display, + ), tags=(r.controller_mac,)) + self._update_ctrl_drift(r.controller_mac, r.drift_display) + + # 필터 적용 + self._apply_filter() + + # 스크롤 끝 + children = self._read_tree.get_children() + if children: + self._read_tree.see(children[-1]) + + # 요약 갱신 + self._refresh_summary() + + now = datetime.now().strftime("%H:%M:%S") + if self._monitor.is_running: + self._mon_status.config( + text=f"모니터링 중 (마지막: {now})", foreground="green") + else: + self._mon_status.config(text=f"읽기 완료 ({now})", foreground="gray") + + # ── 필터 ──────────────────────────────────────────────── + + def _on_filter_changed(self, event=None): + self._apply_filter() + + def _apply_filter(self): + selected = self._filter_var.get() + # 모든 항목을 보이거나 숨기기 (detach/reattach) + # 간단하게: 트리 다시 빌드 + self._read_tree.delete(*self._read_tree.get_children()) + for r in self._monitor.readings: + if selected == "전체" or r.controller_label == selected: + self._read_tree.insert("", tk.END, values=( + r.controller_label, + r.pc_time.strftime("%Y-%m-%d %H:%M:%S"), + r.controller_time.strftime("%Y-%m-%d %H:%M:%S"), + r.drift_display, + ), tags=(r.controller_mac,)) + + def _refresh_summary(self): + self._sum_tree.delete(*self._sum_tree.get_children()) + summary = self._monitor.get_summary() + + selected = self._filter_var.get() + for mac, s in summary.items(): + if selected != "전체" and s["label"] != selected: + continue + self._sum_tree.insert("", tk.END, values=( + s["label"], + s["count"], + f"{s['avg_drift']:.2f}", + f"{s['max_drift']:.2f}", + )) + + # ── CSV 내보내기 ──────────────────────────────────────── + + def _export_all(self): + readings = self._monitor.readings + if not readings: + messagebox.showinfo("알림", "내보낼 기록이 없습니다.") + return + fp = filedialog.asksaveasfilename( + title="전체 기록 CSV 저장", + defaultextension=".csv", + filetypes=[("CSV", "*.csv")], + initialfile=f"drift_all_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", + ) + if fp: + try: + export_readings_csv(readings, fp) + messagebox.showinfo("성공", f"저장 완료:\n{fp}") + except Exception as e: + messagebox.showerror("오류", str(e)) + + def _export_per_ctrl(self): + readings = self._monitor.readings + if not readings: + messagebox.showinfo("알림", "내보낼 기록이 없습니다.") + return + folder = filedialog.askdirectory(title="컨트롤러별 CSV 저장 폴더 선택") + if folder: + try: + files = export_per_controller_csv(readings, folder) + messagebox.showinfo("성공", f"{len(files)}개 파일 생성:\n" + "\n".join(files)) + except Exception as e: + messagebox.showerror("오류", str(e)) + + def _export_summary(self): + summary = self._monitor.get_summary() + if not summary: + messagebox.showinfo("알림", "내보낼 요약이 없습니다.") + return + fp = filedialog.asksaveasfilename( + title="요약 CSV 저장", + defaultextension=".csv", + filetypes=[("CSV", "*.csv")], + initialfile=f"drift_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", + ) + if fp: + try: + export_summary_csv(summary, fp) + messagebox.showinfo("성공", f"저장 완료:\n{fp}") + except Exception as e: + messagebox.showerror("오류", str(e)) + + # ── 매시 정시 읽기 ──────────────────────────────────────── + + def _on_hourly_toggle(self): + if self._monitor.is_hourly_running: + # 중지 + self._monitor.stop_hourly() + self._btn_hourly.config(text="매시 정시 읽기") + self._log_msg("매시 정시 읽기 중지") + else: + # 시작 + selected = [c for c in self._controllers if c.selected] + if not selected: + messagebox.showwarning("알림", "컨트롤러를 선택하세요.") + return + self._monitor.configure( + controllers=self._controllers, + interval=self._monitor.interval, + on_readings=lambda r: self.root.after(0, self._on_new_readings, r), + on_error=lambda m: self.root.after(0, self._log_msg, m), + ) + self._monitor.start_hourly() + self._btn_hourly.config(text="정시 읽기 중지") + self._log_msg("매시 정시 읽기 시작 (매시 00분 00초에 읽기)") + + # ── 앱 생명주기 ───────────────────────────────────────── + + def _on_close(self): + self._monitor.stop() + self._monitor.stop_hourly() + self.root.destroy() + + def run(self): + self.root.mainloop() diff --git a/gui/discovery_panel.py b/gui/discovery_panel.py new file mode 100644 index 0000000..bfc0fb7 --- /dev/null +++ b/gui/discovery_panel.py @@ -0,0 +1,239 @@ +"""탭1: 컨트롤러 검색/IP변경 패널""" + +import tkinter as tk +from tkinter import ttk, messagebox +import threading + +from models.controller import Controller +from network.udp_discovery import get_network_interfaces, search_controllers, send_sett, send_reset + + +class DiscoveryPanel(ttk.Frame): + """컨트롤러 검색 및 설정 변경 패널.""" + + def __init__(self, parent, shared_controllers: list[Controller]): + super().__init__(parent) + self._controllers = shared_controllers + self._searching = False + self._build_ui() + + def _build_ui(self): + # --- 상단: NIC 선택 + 검색 --- + top = ttk.LabelFrame(self, text="컨트롤러 검색", padding=5) + top.pack(fill=tk.X, padx=5, pady=5) + + ttk.Label(top, text="네트워크:").grid(row=0, column=0, padx=(0, 5)) + self._nic_var = tk.StringVar() + self._nic_combo = ttk.Combobox(top, textvariable=self._nic_var, state="readonly", width=20) + self._nic_combo.grid(row=0, column=1, padx=(0, 10)) + + self._btn_search = ttk.Button(top, text="검색", command=self._on_search) + self._btn_search.grid(row=0, column=2, padx=(0, 5)) + + self._search_status = ttk.Label(top, text="") + self._search_status.grid(row=0, column=3, padx=5) + + self._refresh_nics() + + # --- 중간: 컨트롤러 목록 --- + mid = ttk.LabelFrame(self, text="검색된 컨트롤러", padding=5) + mid.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + cols = ("mac", "ip", "port", "name", "subnet", "gateway") + self._tree = ttk.Treeview(mid, columns=cols, show="headings", height=8) + self._tree.heading("mac", text="MAC") + self._tree.heading("ip", text="IP") + self._tree.heading("port", text="포트") + self._tree.heading("name", text="이름") + self._tree.heading("subnet", text="서브넷") + self._tree.heading("gateway", text="게이트웨이") + self._tree.column("mac", width=140) + self._tree.column("ip", width=120) + self._tree.column("port", width=60) + self._tree.column("name", width=120) + self._tree.column("subnet", width=120) + self._tree.column("gateway", width=120) + + scroll = ttk.Scrollbar(mid, orient=tk.VERTICAL, command=self._tree.yview) + self._tree.configure(yscrollcommand=scroll.set) + self._tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scroll.pack(side=tk.RIGHT, fill=tk.Y) + + self._tree.bind("<>", self._on_select) + + # --- 로그 영역 --- + log_frame = ttk.LabelFrame(self, text="검색 로그", padding=5) + log_frame.pack(fill=tk.X, padx=5, pady=(0, 5)) + + self._log = tk.Text(log_frame, height=5, state=tk.DISABLED, wrap=tk.WORD, + font=("Consolas", 9)) + log_scroll = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self._log.yview) + self._log.configure(yscrollcommand=log_scroll.set) + self._log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + log_scroll.pack(side=tk.RIGHT, fill=tk.Y) + + # --- 하단: 설정 변경 --- + bot = ttk.LabelFrame(self, text="설정 변경", padding=5) + bot.pack(fill=tk.X, padx=5, pady=5) + + fields = [ + ("IP:", "ip_var", 15), + ("포트:", "port_var", 6), + ("이름:", "name_var", 15), + ("서브넷:", "subnet_var", 15), + ("게이트웨이:", "gw_var", 15), + ] + self._edit_vars = {} + for i, (label, var_name, width) in enumerate(fields): + ttk.Label(bot, text=label).grid(row=i // 3, column=(i % 3) * 2, padx=2, sticky=tk.E) + var = tk.StringVar() + self._edit_vars[var_name] = var + ttk.Entry(bot, textvariable=var, width=width).grid( + row=i // 3, column=(i % 3) * 2 + 1, padx=(0, 10), pady=2 + ) + + btn_frame = ttk.Frame(bot) + btn_frame.grid(row=2, column=0, columnspan=6, pady=5) + ttk.Button(btn_frame, text="적용", command=self._on_apply).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="리셋", command=self._on_reset).pack(side=tk.LEFT, padx=5) + + def _log_message(self, msg: str): + self._log.config(state=tk.NORMAL) + self._log.insert(tk.END, msg + "\n") + self._log.see(tk.END) + self._log.config(state=tk.DISABLED) + + def _refresh_nics(self): + nics = get_network_interfaces() + self._nic_list = nics + self._nic_combo["values"] = [f"{name} ({ip})" for ip, name in nics] + if nics: + self._nic_combo.current(0) + + def _get_bind_ip(self) -> str: + idx = self._nic_combo.current() + if idx >= 0 and idx < len(self._nic_list): + return self._nic_list[idx][0] + return "0.0.0.0" + + def _on_search(self): + if self._searching: + return + self._searching = True + self._btn_search.config(state=tk.DISABLED) + self._search_status.config(text="검색중...") + self._tree.delete(*self._tree.get_children()) + self._controllers.clear() + + # 로그 초기화 + self._log.config(state=tk.NORMAL) + self._log.delete("1.0", tk.END) + self._log.config(state=tk.DISABLED) + + bind_ip = self._get_bind_ip() + self._log_message(f"검색 시작 (NIC: {bind_ip})") + + # 검색 로그를 GUI로 전달하는 큐 + self._log_queue: list[str] = [] + + threading.Thread(target=self._search_thread, args=(bind_ip,), daemon=True).start() + self._poll_logs() + + def _search_thread(self, bind_ip: str): + def on_log(msg: str): + self._log_queue.append(msg) + + results = search_controllers(bind_ip=bind_ip, timeout=3.0, on_log=on_log) + self._search_results = results + self._search_finished = True + + def _poll_logs(self): + """백그라운드 스레드의 로그를 GUI에 표시.""" + while self._log_queue: + msg = self._log_queue.pop(0) + self._log_message(msg) + + if hasattr(self, "_search_finished") and self._search_finished: + self._search_finished = False + self._search_done(self._search_results) + return + + self.after(100, self._poll_logs) + + def _search_done(self, results: list[Controller]): + self._searching = False + self._btn_search.config(state=tk.NORMAL) + + self._controllers.clear() + self._controllers.extend(results) + + for ctrl in results: + self._tree.insert("", tk.END, values=( + ctrl.mac, ctrl.ip, ctrl.port, ctrl.name, ctrl.subnet, ctrl.gateway + )) + + self._search_status.config(text=f"{len(results)}개 발견") + + def _on_select(self, event): + sel = self._tree.selection() + if not sel: + return + values = self._tree.item(sel[0], "values") + # mac, ip, port, name, subnet, gateway + self._edit_vars["ip_var"].set(values[1]) + self._edit_vars["port_var"].set(values[2]) + self._edit_vars["name_var"].set(values[3]) + self._edit_vars["subnet_var"].set(values[4]) + self._edit_vars["gw_var"].set(values[5]) + + def _get_selected_controller(self) -> Controller | None: + sel = self._tree.selection() + if not sel: + messagebox.showwarning("선택 필요", "컨트롤러를 선택하세요.") + return None + values = self._tree.item(sel[0], "values") + mac = values[0] + for c in self._controllers: + if c.mac == mac: + return c + return None + + def _on_apply(self): + ctrl = self._get_selected_controller() + if not ctrl: + return + + ctrl.ip = self._edit_vars["ip_var"].get().strip() + try: + ctrl.port = int(self._edit_vars["port_var"].get().strip()) + except ValueError: + messagebox.showerror("오류", "포트는 숫자여야 합니다.") + return + ctrl.name = self._edit_vars["name_var"].get().strip() + ctrl.subnet = self._edit_vars["subnet_var"].get().strip() + ctrl.gateway = self._edit_vars["gw_var"].get().strip() + + bind_ip = self._get_bind_ip() + ok = send_sett(ctrl, bind_ip) + if ok: + messagebox.showinfo("성공", "설정이 전송되었습니다.\n적용을 위해 리셋하세요.") + sel = self._tree.selection() + if sel: + self._tree.item(sel[0], values=( + ctrl.mac, ctrl.ip, ctrl.port, ctrl.name, ctrl.subnet, ctrl.gateway + )) + else: + messagebox.showerror("실패", "설정 전송에 실패했습니다.") + + def _on_reset(self): + ctrl = self._get_selected_controller() + if not ctrl: + return + if not messagebox.askyesno("확인", f"{ctrl.mac} 컨트롤러를 리셋하시겠습니까?"): + return + bind_ip = self._get_bind_ip() + ok = send_reset(ctrl.mac, bind_ip) + if ok: + messagebox.showinfo("성공", "리셋 명령이 전송되었습니다.") + else: + messagebox.showerror("실패", "리셋 전송에 실패했습니다.") diff --git a/gui/monitor_panel.py b/gui/monitor_panel.py new file mode 100644 index 0000000..3e06cc4 --- /dev/null +++ b/gui/monitor_panel.py @@ -0,0 +1,241 @@ +"""탭3: 시간 모니터링/오차 추적 패널""" + +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +from datetime import datetime + +from models.controller import Controller +from models.reading import TimeReading +from services.drift_monitor import DriftMonitor +from services.export_service import export_readings_csv, export_summary_csv +from config import DEFAULT_MONITOR_INTERVAL + + +class MonitorPanel(ttk.Frame): + """시간 모니터링 및 오차 추적 패널.""" + + def __init__(self, parent, shared_controllers: list[Controller]): + super().__init__(parent) + self._controllers = shared_controllers + self._monitor = DriftMonitor() + self._build_ui() + + def _build_ui(self): + # --- 상단: 설정 및 제어 --- + top = ttk.LabelFrame(self, text="모니터링 설정", padding=5) + top.pack(fill=tk.X, padx=5, pady=5) + + ttk.Label(top, text="읽기 주기(초):").pack(side=tk.LEFT, padx=(0, 5)) + self._interval_var = tk.StringVar(value=str(DEFAULT_MONITOR_INTERVAL)) + ttk.Entry(top, textvariable=self._interval_var, width=8).pack(side=tk.LEFT, padx=(0, 10)) + + self._btn_start = ttk.Button(top, text="시작", command=self._on_start) + self._btn_start.pack(side=tk.LEFT, padx=2) + self._btn_stop = ttk.Button(top, text="중지", command=self._on_stop, state=tk.DISABLED) + self._btn_stop.pack(side=tk.LEFT, padx=2) + self._btn_read_now = ttk.Button(top, text="즉시 읽기", command=self._on_read_now) + self._btn_read_now.pack(side=tk.LEFT, padx=2) + self._btn_clear = ttk.Button(top, text="기록 초기화", command=self._on_clear) + self._btn_clear.pack(side=tk.LEFT, padx=2) + + self._status_label = ttk.Label(top, text="정지됨") + self._status_label.pack(side=tk.RIGHT, padx=5) + + # --- 중간: 오차 읽기 기록 테이블 --- + mid = ttk.LabelFrame(self, text="오차 읽기 기록", padding=5) + mid.pack(fill=tk.BOTH, expand=True, padx=5, pady=(5, 2)) + + cols_read = ("controller", "pc_time", "ctrl_time", "drift") + self._read_tree = ttk.Treeview(mid, columns=cols_read, show="headings", height=8) + self._read_tree.heading("controller", text="컨트롤러") + self._read_tree.heading("pc_time", text="PC 시간") + self._read_tree.heading("ctrl_time", text="컨트롤러 시간") + self._read_tree.heading("drift", text="오차") + self._read_tree.column("controller", width=180) + self._read_tree.column("pc_time", width=160) + self._read_tree.column("ctrl_time", width=160) + self._read_tree.column("drift", width=100) + + scroll_r = ttk.Scrollbar(mid, orient=tk.VERTICAL, command=self._read_tree.yview) + self._read_tree.configure(yscrollcommand=scroll_r.set) + self._read_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scroll_r.pack(side=tk.RIGHT, fill=tk.Y) + + # --- 하단: 요약 테이블 + CSV 내보내기 --- + bot = ttk.LabelFrame(self, text="오차 요약", padding=5) + bot.pack(fill=tk.BOTH, expand=True, padx=5, pady=(2, 5)) + + cols_sum = ("controller", "count", "avg", "max_d", "rate") + self._sum_tree = ttk.Treeview(bot, columns=cols_sum, show="headings", height=5) + self._sum_tree.heading("controller", text="컨트롤러") + self._sum_tree.heading("count", text="읽기 횟수") + self._sum_tree.heading("avg", text="평균 오차(초)") + self._sum_tree.heading("max_d", text="최대 오차(초)") + self._sum_tree.heading("rate", text="시간당 오차율(초/h)") + self._sum_tree.column("controller", width=180) + self._sum_tree.column("count", width=80) + self._sum_tree.column("avg", width=100) + self._sum_tree.column("max_d", width=100) + self._sum_tree.column("rate", width=120) + + scroll_s = ttk.Scrollbar(bot, orient=tk.VERTICAL, command=self._sum_tree.yview) + self._sum_tree.configure(yscrollcommand=scroll_s.set) + self._sum_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scroll_s.pack(side=tk.RIGHT, fill=tk.Y) + + # CSV 내보내기 + export_frame = ttk.Frame(self) + export_frame.pack(fill=tk.X, padx=5, pady=5) + ttk.Button(export_frame, text="기록 CSV 내보내기", command=self._export_readings).pack(side=tk.LEFT, padx=5) + ttk.Button(export_frame, text="요약 CSV 내보내기", command=self._export_summary).pack(side=tk.LEFT, padx=5) + + def _on_start(self): + selected = [c for c in self._controllers if c.selected] + if not selected: + messagebox.showwarning("알림", "모니터링할 컨트롤러가 없습니다.\n검색 후 선택하세요.") + return + + try: + interval = int(self._interval_var.get().strip()) + if interval < 1: + raise ValueError + except ValueError: + messagebox.showerror("오류", "읽기 주기는 1 이상의 정수여야 합니다.") + return + + self._monitor.configure( + controllers=self._controllers, + interval=interval, + on_readings=lambda readings: self.after(0, self._on_new_readings, readings), + on_error=lambda msg: self.after(0, self._on_monitor_error, msg), + ) + self._monitor.start() + + self._btn_start.config(state=tk.DISABLED) + self._btn_stop.config(state=tk.NORMAL) + self._status_label.config(text=f"모니터링 중 (주기: {interval}초)") + + def _on_stop(self): + self._monitor.stop() + self._btn_start.config(state=tk.NORMAL) + self._btn_stop.config(state=tk.DISABLED) + self._status_label.config(text="정지됨") + + def _on_read_now(self): + if not self._controllers: + messagebox.showwarning("알림", "컨트롤러가 없습니다.") + return + + # 모니터가 구성되지 않은 경우 현재 설정으로 구성 + if not self._monitor.is_running: + try: + interval = int(self._interval_var.get().strip()) + except ValueError: + interval = DEFAULT_MONITOR_INTERVAL + + self._monitor.configure( + controllers=self._controllers, + interval=interval, + on_readings=lambda readings: self.after(0, self._on_new_readings, readings), + on_error=lambda msg: self.after(0, self._on_monitor_error, msg), + ) + + self._monitor.read_once() + self._status_label.config(text="읽기 중...") + + def _on_clear(self): + self._monitor.clear_readings() + self._read_tree.delete(*self._read_tree.get_children()) + self._sum_tree.delete(*self._sum_tree.get_children()) + + def _on_new_readings(self, readings: list[TimeReading]): + """새 읽기 결과가 도착했을 때 GUI 업데이트.""" + for r in readings: + self._read_tree.insert("", tk.END, values=( + r.controller_label, + r.pc_time.strftime("%Y-%m-%d %H:%M:%S"), + r.controller_time.strftime("%Y-%m-%d %H:%M:%S"), + r.drift_display, + )) + # 스크롤 끝으로 + children = self._read_tree.get_children() + if children: + self._read_tree.see(children[-1]) + + # 요약 갱신 + self._refresh_summary() + + if self._monitor.is_running: + self._status_label.config( + text=f"모니터링 중 (주기: {self._monitor.interval}초) - " + f"마지막 읽기: {datetime.now().strftime('%H:%M:%S')}" + ) + else: + self._status_label.config( + text=f"읽기 완료: {datetime.now().strftime('%H:%M:%S')}" + ) + + def _on_monitor_error(self, msg: str): + # 에러를 읽기 테이블에 표시 + self._read_tree.insert("", tk.END, values=(msg, "", "", "오류")) + children = self._read_tree.get_children() + if children: + self._read_tree.see(children[-1]) + + def _refresh_summary(self): + self._sum_tree.delete(*self._sum_tree.get_children()) + summary = self._monitor.get_summary() + for mac, s in summary.items(): + self._sum_tree.insert("", tk.END, values=( + s["label"], + s["count"], + f"{s['avg_drift']:.2f}", + f"{s['max_drift']:.2f}", + f"{s['drift_per_hour']:.3f}", + )) + + def _export_readings(self): + readings = self._monitor.readings + if not readings: + messagebox.showinfo("알림", "내보낼 기록이 없습니다.") + return + + filepath = filedialog.asksaveasfilename( + title="기록 CSV 저장", + defaultextension=".csv", + filetypes=[("CSV 파일", "*.csv")], + initialfile=f"drift_readings_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", + ) + if not filepath: + return + + try: + saved = export_readings_csv(readings, filepath) + messagebox.showinfo("성공", f"저장 완료: {saved}") + except Exception as e: + messagebox.showerror("오류", f"저장 실패: {e}") + + def _export_summary(self): + summary = self._monitor.get_summary() + if not summary: + messagebox.showinfo("알림", "내보낼 요약이 없습니다.") + return + + filepath = filedialog.asksaveasfilename( + title="요약 CSV 저장", + defaultextension=".csv", + filetypes=[("CSV 파일", "*.csv")], + initialfile=f"drift_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", + ) + if not filepath: + return + + try: + saved = export_summary_csv(summary, filepath) + messagebox.showinfo("성공", f"저장 완료: {saved}") + except Exception as e: + messagebox.showerror("오류", f"저장 실패: {e}") + + def stop_monitor(self): + """앱 종료 시 호출.""" + self._monitor.stop() diff --git a/gui/settings_dialog.py b/gui/settings_dialog.py new file mode 100644 index 0000000..728d7e4 --- /dev/null +++ b/gui/settings_dialog.py @@ -0,0 +1,68 @@ +"""컨트롤러 설정 변경 다이얼로그""" + +import tkinter as tk +from tkinter import ttk, messagebox + +from models.controller import Controller +from network.udp_discovery import send_sett + + +class SettingsDialog(tk.Toplevel): + + def __init__(self, parent, ctrl: Controller, bind_ip: str): + super().__init__(parent) + self.title(f"설정 변경 - {ctrl.mac}") + self.resizable(False, False) + self.grab_set() + self._ctrl = ctrl + self._bind_ip = bind_ip + self._build_ui() + self.transient(parent) + # 화면 중앙 + self.update_idletasks() + x = parent.winfo_rootx() + (parent.winfo_width() - self.winfo_width()) // 2 + y = parent.winfo_rooty() + (parent.winfo_height() - self.winfo_height()) // 2 + self.geometry(f"+{x}+{y}") + + def _build_ui(self): + f = ttk.Frame(self, padding=15) + f.pack(fill=tk.BOTH, expand=True) + + fields = [ + ("MAC:", self._ctrl.mac, False), + ("IP:", self._ctrl.ip, True), + ("포트:", str(self._ctrl.port), True), + ("이름:", self._ctrl.name, True), + ("서브넷:", self._ctrl.subnet, True), + ("게이트웨이:", self._ctrl.gateway, True), + ] + + self._vars = {} + for i, (label, value, editable) in enumerate(fields): + ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.E, padx=(0, 8), pady=3) + var = tk.StringVar(value=value) + state = "normal" if editable else "readonly" + ttk.Entry(f, textvariable=var, width=22, state=state).grid(row=i, column=1, pady=3) + self._vars[label] = var + + btn = ttk.Frame(f) + btn.grid(row=len(fields), column=0, columnspan=2, pady=(12, 0)) + ttk.Button(btn, text="설정", command=self._on_apply).pack(side=tk.LEFT, padx=5) + ttk.Button(btn, text="닫기", command=self.destroy).pack(side=tk.LEFT, padx=5) + + def _on_apply(self): + self._ctrl.ip = self._vars["IP:"].get().strip() + try: + self._ctrl.port = int(self._vars["포트:"].get().strip()) + except ValueError: + messagebox.showerror("오류", "포트는 숫자여야 합니다.", parent=self) + return + self._ctrl.name = self._vars["이름:"].get().strip() + self._ctrl.subnet = self._vars["서브넷:"].get().strip() + self._ctrl.gateway = self._vars["게이트웨이:"].get().strip() + + ok = send_sett(self._ctrl, self._bind_ip) + if ok: + messagebox.showinfo("성공", "설정 되었습니다.", parent=self) + else: + messagebox.showerror("실패", "설정 전송에 실패했습니다.", parent=self) diff --git a/gui/sync_panel.py b/gui/sync_panel.py new file mode 100644 index 0000000..f7f25af --- /dev/null +++ b/gui/sync_panel.py @@ -0,0 +1,124 @@ +"""탭2: PC시간 동기화 패널""" + +import tkinter as tk +from tkinter import ttk +import threading +from datetime import datetime + +from models.controller import Controller +from services.time_sync_service import sync_all + + +class SyncPanel(ttk.Frame): + """PC 시간 동기화 패널.""" + + def __init__(self, parent, shared_controllers: list[Controller]): + super().__init__(parent) + self._controllers = shared_controllers + self._check_vars: list[tk.BooleanVar] = [] + self._syncing = False + self._build_ui() + self._update_clock() + + def _build_ui(self): + # --- 상단: 현재 PC 시간 --- + top = ttk.LabelFrame(self, text="현재 PC 시간", padding=10) + top.pack(fill=tk.X, padx=5, pady=5) + + self._clock_label = ttk.Label(top, text="", font=("Consolas", 18)) + self._clock_label.pack() + + # --- 중간: 컨트롤러 체크리스트 --- + mid = ttk.LabelFrame(self, text="동기화 대상 컨트롤러", padding=5) + mid.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + btn_row = ttk.Frame(mid) + btn_row.pack(fill=tk.X) + ttk.Button(btn_row, text="전체 선택", command=self._select_all).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="전체 해제", command=self._deselect_all).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="목록 새로고침", command=self.refresh_list).pack(side=tk.LEFT, padx=2) + + self._check_frame = ttk.Frame(mid) + self._check_frame.pack(fill=tk.BOTH, expand=True, pady=5) + + # --- 하단: 동기화 실행 + 결과 --- + bot = ttk.LabelFrame(self, text="동기화", padding=5) + bot.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + self._btn_sync = ttk.Button(bot, text="동기화 실행", command=self._on_sync) + self._btn_sync.pack(pady=5) + + self._log = tk.Text(bot, height=8, state=tk.DISABLED, wrap=tk.WORD) + self._log.pack(fill=tk.BOTH, expand=True) + + def _update_clock(self): + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self._clock_label.config(text=now) + self.after(1000, self._update_clock) + + def refresh_list(self): + """컨트롤러 체크리스트 갱신.""" + for w in self._check_frame.winfo_children(): + w.destroy() + self._check_vars.clear() + + for ctrl in self._controllers: + var = tk.BooleanVar(value=ctrl.selected) + self._check_vars.append(var) + cb = ttk.Checkbutton( + self._check_frame, + text=ctrl.display_label, + variable=var, + command=lambda c=ctrl, v=var: setattr(c, "selected", v.get()), + ) + cb.pack(anchor=tk.W) + + def _select_all(self): + for var in self._check_vars: + var.set(True) + for ctrl in self._controllers: + ctrl.selected = True + + def _deselect_all(self): + for var in self._check_vars: + var.set(False) + for ctrl in self._controllers: + ctrl.selected = False + + def _log_message(self, msg: str): + self._log.config(state=tk.NORMAL) + self._log.insert(tk.END, msg + "\n") + self._log.see(tk.END) + self._log.config(state=tk.DISABLED) + + def _on_sync(self): + if self._syncing: + return + selected = [c for c in self._controllers if c.selected] + if not selected: + self._log_message("동기화할 컨트롤러가 없습니다.") + return + + self._syncing = True + self._btn_sync.config(state=tk.DISABLED) + self._log_message(f"--- 동기화 시작: {datetime.now().strftime('%H:%M:%S')} ---") + + threading.Thread(target=self._sync_thread, daemon=True).start() + + def _sync_thread(self): + results = sync_all(self._controllers) + self.after(0, self._sync_done, results) + + def _sync_done(self, results): + self._syncing = False + self._btn_sync.config(state=tk.NORMAL) + + ok_count = 0 + for ctrl, ok, msg in results: + status = "OK" if ok else "FAIL" + self._log_message(f" [{status}] {ctrl.display_label}: {msg}") + if ok: + ok_count += 1 + + total = len(results) + self._log_message(f"--- 완료: {ok_count}/{total} 성공 ---\n") diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..ef3dfaf Binary files /dev/null and b/icon.ico differ diff --git a/icon.png b/icon.png new file mode 100644 index 0000000..0a8e62b Binary files /dev/null and b/icon.png differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..389c967 --- /dev/null +++ b/main.py @@ -0,0 +1,18 @@ +"""RTC Time Drift Tester - 앱 진입점""" + +import sys +import os + +# 프로젝트 루트를 모듈 검색 경로에 추가 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from gui.app import App + + +def main(): + app = App() + app.run() + + +if __name__ == "__main__": + main() diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/controller.py b/models/controller.py new file mode 100644 index 0000000..cb48d8a --- /dev/null +++ b/models/controller.py @@ -0,0 +1,33 @@ +"""Controller 데이터클래스""" + +from dataclasses import dataclass, field + + +@dataclass +class Controller: + """컨트롤러 네트워크 정보""" + mac: str = "" + ip: str = "" + port: int = 5000 + name: str = "" + subnet: str = "255.255.255.0" + gateway: str = "192.168.0.1" + dhcp_mode: int = 0 # 0: Static, 1: DHCP + server_ip: str = "0.0.0.0" + server_port: int = 0 + server_mode: int = 0 # 0: Client, 1: Server + keep_alive: int = 0 + dns_name: str = "" + ap_ssid_name: str = "" + ap_ssid_pw: str = "" + ap_mode: int = 0 # 0: Station, 1: AP + + # 시간 동기화/읽기용 상태 (GUI에서 사용) + selected: bool = field(default=True, repr=False) + + @property + def display_label(self) -> str: + """GUI 표시용 라벨: 이름(IP:포트)""" + if self.name: + return f"{self.name}({self.ip}:{self.port})" + return f"{self.ip}:{self.port}" diff --git a/models/reading.py b/models/reading.py new file mode 100644 index 0000000..d1087b7 --- /dev/null +++ b/models/reading.py @@ -0,0 +1,20 @@ +"""TimeReading 데이터클래스""" + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class TimeReading: + """시간 읽기 결과""" + controller_mac: str + controller_label: str + pc_time: datetime + controller_time: datetime + drift_seconds: float + + @property + def drift_display(self) -> str: + """오차 표시용 문자열""" + sign = "+" if self.drift_seconds >= 0 else "" + return f"{sign}{self.drift_seconds:.1f}초" diff --git a/network/__init__.py b/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/network/packet_parser.py b/network/packet_parser.py new file mode 100644 index 0000000..3423400 --- /dev/null +++ b/network/packet_parser.py @@ -0,0 +1,88 @@ +"""DIBD 응답 패킷 파싱 (가변 길이 지원)""" + +from models.controller import Controller +from config import PROTOCOL_ENCODING + + +def parse_dibd_packet(data: bytes) -> Controller | None: + """DIBD 응답 패킷을 파싱하여 Controller 객체를 반환. + + 패킷 길이: + - 이더넷 전용 모델: 174바이트 (DNS Name까지) + - WiFi 지원 모델: 222바이트 (AP Mode까지) + + 패킷 구조 (0x88 응답, \r\n으로 필드 구분): + 0-5: 'DIBD\\r\\n' + 6-24: MAC (17자) + \\r\\n + 25-41: Local IP (15자) + \\r\\n + 42-58: Subnet Mask (15자) + \\r\\n + 59-75: Gateway IP (15자) + \\r\\n + 76-82: Port (5자) + \\r\\n + 83-86: DHCP Mode (2자) + \\r\\n + 87-103: Server IP (15자) + \\r\\n + 104-110: Server Port (5자) + \\r\\n + 111-114: Server Mode (2자) + \\r\\n + 115-136: Name (20자) + \\r\\n + 137-141: KeepAlive (3자) + \\r\\n + 142-173: DNS Name (30자) + \\r\\n ← 174바이트 여기까지 + 174-195: AP SSID Name (20자) + \\r\\n ← 선택 + 196-217: AP SSID PW (20자) + \\r\\n ← 선택 + 218-221: AP Mode (2자) + \\r\\n ← 선택 + """ + try: + text = data.decode(PROTOCOL_ENCODING, errors="replace") + except Exception: + return None + + # 최소 길이: KeepAlive 필드까지 = 142바이트 + if len(text) < 142: + return None + + if not text.startswith("DIBD"): + return None + + ctrl = Controller() + + ctrl.mac = text[6:23].strip() + ctrl.ip = _normalize_ip(text[25:40].strip()) + ctrl.subnet = _normalize_ip(text[42:57].strip()) + ctrl.gateway = _normalize_ip(text[59:74].strip()) + + ctrl.port = _safe_int(text[76:81].strip(), 5000) + ctrl.dhcp_mode = _safe_int(text[83:85].strip(), 30) % 30 + ctrl.server_ip = _normalize_ip(text[87:102].strip()) + ctrl.server_port = _safe_int(text[104:109].strip(), 0) + ctrl.server_mode = _safe_int(text[111:113].strip(), 30) % 30 + + ctrl.name = text[115:135].strip() + ctrl.keep_alive = _safe_int(text[137:140].strip(), 0) + + # DNS Name (선택 - 174바이트 이상) + if len(text) >= 172: + ctrl.dns_name = text[142:172].strip() + + # AP SSID (선택 - 222바이트 패킷) + if len(text) >= 194: + ctrl.ap_ssid_name = text[174:194].strip() + if len(text) >= 216: + ctrl.ap_ssid_pw = text[196:216].strip() + if len(text) >= 220: + ctrl.ap_mode = _safe_int(text[218:220].strip(), 30) % 30 + + return ctrl + + +def _normalize_ip(ip_str: str) -> str: + """패딩된 IP(예: '192.168.000.074')를 일반형(예: '192.168.0.74')으로 변환.""" + try: + parts = ip_str.split(".") + return ".".join(str(int(p)) for p in parts) + except (ValueError, IndexError): + return ip_str + + +def _safe_int(s: str, default: int = 0) -> int: + try: + return int(s) + except (ValueError, TypeError): + return default diff --git a/network/tcp_protocol.py b/network/tcp_protocol.py new file mode 100644 index 0000000..485f129 --- /dev/null +++ b/network/tcp_protocol.py @@ -0,0 +1,77 @@ +"""ASCII 프로토콜 (시간 동기화 cmd30, 시간 읽기 cmd31)""" + +import socket +from datetime import datetime + +from config import TCP_TIMEOUT, TCP_RECV_BUFFER, PROTOCOL_ENCODING +from utils.time_format import datetime_to_protocol, protocol_to_datetime + + +def sync_time(ip: str, port: int, dt: datetime | None = None) -> tuple[bool, str]: + """컨트롤러에 PC 시간 동기화 (명령 30). + + 패킷: ![0030YYMMDDdHHMMSS!] + 예시: ![003026020912 30229!] (2026-02-09 월 23:02:29) + 응답: ![00300!] (성공) / ![0030F!] (실패) + + Returns: (성공여부, 메시지) + """ + if dt is None: + dt = datetime.now() + + time_str = datetime_to_protocol(dt) + cmd = f"![0030{time_str}!]" + + try: + resp = _tcp_send_recv(ip, port, cmd) + except Exception as e: + return False, f"연결 실패: {e}" + + if "![00300!]" in resp: + return True, "동기화 성공" + elif "![0030F!]" in resp: + return False, "동기화 실패 (컨트롤러 응답 F)" + else: + return False, f"알 수 없는 응답: {resp}" + + +def read_time(ip: str, port: int) -> tuple[datetime | None, datetime, datetime, str]: + """컨트롤러 시간 읽기 (명령 31). + + 패킷: ![0031!] + 응답: ![0031YYMMDDdHHMMSS!] (13자 시간 데이터) + + Returns: (컨트롤러시간 | None, PC시간_before, PC시간_after, 에러메시지) + """ + cmd = "![0031!]" + + pc_before = datetime.now() + try: + resp = _tcp_send_recv(ip, port, cmd) + except Exception as e: + pc_after = datetime.now() + return None, pc_before, pc_after, f"연결 실패: {e}" + pc_after = datetime.now() + + # 응답 파싱: ![0031YYMMDDdHHMMSS!] + # "![0031" 뒤에 13자의 시간 데이터 + try: + start = resp.index("![0031") + 6 + time_data = resp[start:start + 13] + ctrl_time = protocol_to_datetime(time_data) + return ctrl_time, pc_before, pc_after, "" + except (ValueError, IndexError) as e: + return None, pc_before, pc_after, f"응답 파싱 실패: {resp} ({e})" + + +def _tcp_send_recv(ip: str, port: int, cmd: str) -> str: + """TCP 연결하여 명령 전송 후 응답 수신.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(TCP_TIMEOUT) + try: + sock.connect((ip, port)) + sock.sendall(cmd.encode(PROTOCOL_ENCODING)) + resp = sock.recv(TCP_RECV_BUFFER) + return resp.decode(PROTOCOL_ENCODING, errors="replace") + finally: + sock.close() diff --git a/network/udp_discovery.py b/network/udp_discovery.py new file mode 100644 index 0000000..2afd282 --- /dev/null +++ b/network/udp_discovery.py @@ -0,0 +1,177 @@ +"""UDP 검색, SETT 설정, RESET""" + +import socket +import struct +import time +import threading +from typing import Callable + +from config import ( + UDP_PORT_ETHERNET, UDP_PORT_WIFI, UDP_BROADCAST_IP, + SEARCH_CMD, SETT_PREFIX, RESET_PREFIX, + UDP_RECV_BUFFER, UDP_RECV_TIMEOUT, PROTOCOL_ENCODING, +) +from models.controller import Controller +from network.packet_parser import parse_dibd_packet +from utils.ip_format import pad_ip, sized_string + + +def get_network_interfaces() -> list[tuple[str, str]]: + """사용 가능한 네트워크 인터페이스 목록 반환. + Returns: [(ip, name), ...] + """ + interfaces = [] + try: + hostname = socket.gethostname() + addrs = socket.getaddrinfo(hostname, None, socket.AF_INET) + seen = set() + for addr in addrs: + ip = addr[4][0] + if ip not in seen and not ip.startswith("127."): + seen.add(ip) + interfaces.append((ip, ip)) + except Exception: + pass + if not interfaces: + interfaces.append(("0.0.0.0", "기본")) + return interfaces + + +def search_controllers( + bind_ip: str = "0.0.0.0", + timeout: float = UDP_RECV_TIMEOUT, + on_found: Callable[[Controller], None] | None = None, + on_log: Callable[[str], None] | None = None, +) -> list[Controller]: + """UDP 브로드캐스트로 컨트롤러를 검색. + + Java 참조: + - 소켓을 특정 NIC IP + 고정 포트(5109)에 바인딩 + - 검색 패킷을 5108, 5107 순서로 100ms 간격 전송 + - 별도 리스닝 스레드에서 응답 수신 (블로킹) + """ + def log(msg: str): + if on_log: + on_log(msg) + + results: list[Controller] = [] + mac_set: set[str] = set() + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Java: 특정 NIC IP + 포트 5109에 바인딩 + bind_port = 5109 + try: + sock.bind((bind_ip, bind_port)) + log(f"소켓 바인딩: {bind_ip}:{bind_port}") + except OSError: + try: + sock.bind((bind_ip, 0)) + local_addr = sock.getsockname() + log(f"포트 5109 사용 불가, 대체 바인딩: {local_addr[0]}:{local_addr[1]}") + except OSError as e: + log(f"소켓 바인딩 실패: {e}") + sock.close() + return results + + cmd = SEARCH_CMD.encode(PROTOCOL_ENCODING) + log(f"검색 패킷: {SEARCH_CMD.strip()!r} ({len(cmd)}바이트)") + + # 이더넷 포트(5108)로 전송 + try: + sock.sendto(cmd, (UDP_BROADCAST_IP, UDP_PORT_ETHERNET)) + log(f"전송 → {UDP_BROADCAST_IP}:{UDP_PORT_ETHERNET}") + except Exception as e: + log(f"전송 실패 (5108): {e}") + + time.sleep(0.1) + + # WiFi 포트(5107)로 전송 + try: + sock.sendto(cmd, (UDP_BROADCAST_IP, UDP_PORT_WIFI)) + log(f"전송 → {UDP_BROADCAST_IP}:{UDP_PORT_WIFI}") + except Exception as e: + log(f"전송 실패 (5107): {e}") + + # 응답 수신: 짧은 per-recv 타임아웃(0.5초) + 긴 전체 대기시간 + sock.settimeout(0.5) + deadline = time.time() + timeout + recv_count = 0 + + while time.time() < deadline: + try: + data, addr = sock.recvfrom(UDP_RECV_BUFFER) + recv_count += 1 + log(f"수신 #{recv_count}: {addr[0]}:{addr[1]}, {len(data)}바이트") + except socket.timeout: + continue # 타임아웃이면 deadline까지 계속 시도 + except OSError as e: + log(f"수신 오류: {e}") + break + + ctrl = parse_dibd_packet(data) + if ctrl and ctrl.mac: + if ctrl.mac not in mac_set: + mac_set.add(ctrl.mac) + results.append(ctrl) + log(f"발견: {ctrl.mac} ({ctrl.ip}:{ctrl.port} {ctrl.name})") + if on_found: + on_found(ctrl) + elif data: + header = data[:4].decode(PROTOCOL_ENCODING, errors="replace") + log(f"무시됨: 헤더={header!r}, 길이={len(data)}") + + sock.close() + log(f"검색 완료: {len(results)}개 발견 (총 {recv_count}개 패킷 수신)") + return results + + +def send_sett(ctrl: Controller, bind_ip: str = "0.0.0.0") -> bool: + """SETT 명령으로 컨트롤러 설정 변경.""" + data = SETT_PREFIX + data += ctrl.mac + " " + data += pad_ip(ctrl.ip) + " " + data += pad_ip(ctrl.subnet) + " " + data += pad_ip(ctrl.gateway) + " " + data += f"{ctrl.port:05d}" + " " + data += f"3{ctrl.dhcp_mode}" + " " + data += pad_ip(ctrl.server_ip) + " " + data += f"{ctrl.server_port:05d}" + " " + data += f"3{ctrl.server_mode}" + " " + data += sized_string(ctrl.name, 20) + " " + data += f"{ctrl.keep_alive:03d}" + " " + data += sized_string(ctrl.dns_name, 30) + " " + data += sized_string(ctrl.ap_ssid_name, 20) + " " + data += sized_string(ctrl.ap_ssid_pw, 20) + " " + data += f"3{ctrl.ap_mode}" + data += "\r\n" + + return _send_udp(data, bind_ip) + + +def send_reset(mac: str, bind_ip: str = "0.0.0.0") -> bool: + """RESET 명령 전송.""" + data = RESET_PREFIX + mac + "\r\n" + return _send_udp(data, bind_ip) + + +def _send_udp(data: str, bind_ip: str) -> bool: + """UDP 패킷을 브로드캐스트로 전송.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + try: + sock.bind((bind_ip, 0)) + except OSError: + sock.bind(("0.0.0.0", 0)) + + buf = data.encode(PROTOCOL_ENCODING) + sock.sendto(buf, (UDP_BROADCAST_IP, UDP_PORT_ETHERNET)) + time.sleep(0.1) + sock.sendto(buf, (UDP_BROADCAST_IP, UDP_PORT_WIFI)) + sock.close() + return True + except Exception: + return False diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/drift_monitor.py b/services/drift_monitor.py new file mode 100644 index 0000000..e75c37b --- /dev/null +++ b/services/drift_monitor.py @@ -0,0 +1,211 @@ +"""주기적 시간 읽기 및 오차 추적 서비스""" + +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta +from typing import Callable + +from models.controller import Controller +from models.reading import TimeReading +from network.tcp_protocol import read_time +from config import DEFAULT_MONITOR_INTERVAL + + +class DriftMonitor: + """주기적으로 컨트롤러 시간을 읽어 오차를 추적.""" + + def __init__(self): + self._timer: threading.Timer | None = None + self._hourly_timer: threading.Timer | None = None + self._running = False + self._hourly_running = False + self._interval = DEFAULT_MONITOR_INTERVAL + self._controllers: list[Controller] = [] + self._readings: list[TimeReading] = [] + self._lock = threading.Lock() + self._on_readings: Callable[[list[TimeReading]], None] | None = None + self._on_error: Callable[[str], None] | None = None + + @property + def is_running(self) -> bool: + return self._running + + @property + def is_hourly_running(self) -> bool: + return self._hourly_running + + @property + def readings(self) -> list[TimeReading]: + with self._lock: + return list(self._readings) + + @property + def interval(self) -> int: + return self._interval + + def configure( + self, + controllers: list[Controller], + interval: int = DEFAULT_MONITOR_INTERVAL, + on_readings: Callable[[list[TimeReading]], None] | None = None, + on_error: Callable[[str], None] | None = None, + ): + self._controllers = controllers + self._interval = interval + self._on_readings = on_readings + self._on_error = on_error + + def start(self): + if self._running: + return + self._running = True + self._do_read() + + def stop(self): + self._running = False + if self._timer: + self._timer.cancel() + self._timer = None + + def start_hourly(self): + """매시 정시(XX:00:00)에 시간 읽기 시작.""" + if self._hourly_running: + return + self._hourly_running = True + self._schedule_hourly() + + def stop_hourly(self): + """매시 정시 읽기 중지.""" + self._hourly_running = False + if self._hourly_timer: + self._hourly_timer.cancel() + self._hourly_timer = None + + def _schedule_hourly(self): + """다음 정시까지 대기 후 읽기 예약.""" + if not self._hourly_running: + return + now = datetime.now() + # 다음 정시: 현재 시 + 1, 분/초 = 0 + next_hour = now.replace(minute=0, second=0, microsecond=0) + next_hour = next_hour + timedelta(hours=1) + delay = (next_hour - now).total_seconds() + if self._on_error: + self._on_error(f"정시 읽기 예약: {next_hour.strftime('%H:%M:%S')} ({delay:.0f}초 후)") + self._hourly_timer = threading.Timer(delay, self._hourly_tick) + self._hourly_timer.daemon = True + self._hourly_timer.start() + + def _hourly_tick(self): + """정시 도달 시 읽기 실행 후 다음 정시 예약.""" + if not self._hourly_running: + return + self._do_read(schedule_next=False) + self._schedule_hourly() + + def read_once(self): + """즉시 한 번 읽기 (별도 스레드).""" + threading.Thread(target=self._do_read, args=(False,), daemon=True).start() + + def clear_readings(self): + with self._lock: + self._readings.clear() + + def _do_read(self, schedule_next: bool = True): + """모든 컨트롤러에서 시간 읽기.""" + targets = [c for c in self._controllers if c.selected] + if not targets: + if self._running and schedule_next: + self._schedule_next() + return + + new_readings: list[TimeReading] = [] + + with ThreadPoolExecutor(max_workers=8) as executor: + future_map = { + executor.submit(read_time, c.ip, c.port): c + for c in targets + } + for future in as_completed(future_map): + ctrl = future_map[future] + try: + ctrl_time, pc_before, pc_after, error = future.result() + except Exception as e: + if self._on_error: + self._on_error(f"{ctrl.display_label}: {e}") + continue + + if ctrl_time is None: + if self._on_error: + self._on_error(f"{ctrl.display_label}: {error}") + continue + + # 네트워크 지연 보정: PC 시간 중간값 + pc_mid = pc_before + (pc_after - pc_before) / 2 + drift = (ctrl_time - pc_mid).total_seconds() + + reading = TimeReading( + controller_mac=ctrl.mac, + controller_label=ctrl.display_label, + pc_time=pc_mid, + controller_time=ctrl_time, + drift_seconds=drift, + ) + new_readings.append(reading) + + with self._lock: + self._readings.extend(new_readings) + + if self._on_readings and new_readings: + self._on_readings(new_readings) + + if self._running and schedule_next: + self._schedule_next() + + def _schedule_next(self): + self._timer = threading.Timer(self._interval, self._do_read) + self._timer.daemon = True + self._timer.start() + + def get_summary(self) -> dict[str, dict]: + """컨트롤러별 오차 요약 통계. + + Returns: + {mac: {label, count, avg_drift, max_drift, drift_per_hour, first_time, last_time}} + """ + with self._lock: + readings = list(self._readings) + + by_mac: dict[str, list[TimeReading]] = {} + for r in readings: + by_mac.setdefault(r.controller_mac, []).append(r) + + summary = {} + for mac, rlist in by_mac.items(): + rlist.sort(key=lambda r: r.pc_time) + drifts = [r.drift_seconds for r in rlist] + count = len(drifts) + avg_drift = sum(drifts) / count if count else 0 + max_drift = max(abs(d) for d in drifts) if drifts else 0 + + # 시간당 오차율 + drift_per_hour = 0.0 + if count >= 2: + elapsed_hours = ( + rlist[-1].pc_time - rlist[0].pc_time + ).total_seconds() / 3600 + if elapsed_hours > 0: + drift_change = rlist[-1].drift_seconds - rlist[0].drift_seconds + drift_per_hour = drift_change / elapsed_hours + + summary[mac] = { + "label": rlist[0].controller_label, + "count": count, + "avg_drift": avg_drift, + "max_drift": max_drift, + "drift_per_hour": drift_per_hour, + "first_time": rlist[0].pc_time, + "last_time": rlist[-1].pc_time, + } + + return summary diff --git a/services/export_service.py b/services/export_service.py new file mode 100644 index 0000000..3dd47b4 --- /dev/null +++ b/services/export_service.py @@ -0,0 +1,103 @@ +"""CSV export 서비스""" + +import csv +import os +from pathlib import Path + +from models.reading import TimeReading + + +def export_readings_csv( + readings: list[TimeReading], + filepath: str | Path, +) -> str: + """시간 읽기 기록을 CSV로 내보내기 (컨트롤러별 정렬). + + Returns: 저장된 파일 경로 + """ + filepath = Path(filepath) + # 컨트롤러별로 정렬 + sorted_readings = sorted(readings, key=lambda r: (r.controller_mac, r.pc_time)) + + with open(filepath, "w", newline="", encoding="utf-8-sig") as f: + writer = csv.writer(f) + writer.writerow([ + "컨트롤러", + "MAC", + "PC 시간", + "컨트롤러 시간", + "오차(초)", + ]) + for r in sorted_readings: + writer.writerow([ + r.controller_label, + r.controller_mac, + r.pc_time.strftime("%Y-%m-%d %H:%M:%S"), + r.controller_time.strftime("%Y-%m-%d %H:%M:%S"), + f"{r.drift_seconds:.1f}", + ]) + return str(filepath) + + +def export_per_controller_csv( + readings: list[TimeReading], + folder: str | Path, +) -> list[str]: + """컨트롤러별 개별 CSV 파일 생성. + + Returns: 생성된 파일 경로 목록 + """ + folder = Path(folder) + os.makedirs(folder, exist_ok=True) + + by_mac: dict[str, list[TimeReading]] = {} + for r in readings: + by_mac.setdefault(r.controller_mac, []).append(r) + + created = [] + for mac, rlist in by_mac.items(): + rlist.sort(key=lambda r: r.pc_time) + # 파일명: 컨트롤러 이름 기반 (특수문자 제거) + safe_name = rlist[0].controller_label + for ch in r'<>:"/\\|?*': + safe_name = safe_name.replace(ch, "_") + filepath = folder / f"{safe_name}_drift.csv" + export_readings_csv(rlist, filepath) + created.append(str(filepath)) + + return created + + +def export_summary_csv( + summary: dict[str, dict], + filepath: str | Path, +) -> str: + """오차 요약 통계를 CSV로 내보내기. + + Returns: 저장된 파일 경로 + """ + filepath = Path(filepath) + with open(filepath, "w", newline="", encoding="utf-8-sig") as f: + writer = csv.writer(f) + writer.writerow([ + "컨트롤러", + "MAC", + "읽기 횟수", + "평균 오차(초)", + "최대 오차(초)", + "시간당 오차율(초/시간)", + "첫 읽기", + "마지막 읽기", + ]) + for mac, s in summary.items(): + writer.writerow([ + s["label"], + mac, + s["count"], + f"{s['avg_drift']:.2f}", + f"{s['max_drift']:.2f}", + f"{s['drift_per_hour']:.3f}", + s["first_time"].strftime("%Y-%m-%d %H:%M:%S"), + s["last_time"].strftime("%Y-%m-%d %H:%M:%S"), + ]) + return str(filepath) diff --git a/services/time_sync_service.py b/services/time_sync_service.py new file mode 100644 index 0000000..8137a98 --- /dev/null +++ b/services/time_sync_service.py @@ -0,0 +1,38 @@ +"""전체 컨트롤러 시간 동기화 서비스""" + +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime + +from models.controller import Controller +from network.tcp_protocol import sync_time + + +def sync_all( + controllers: list[Controller], + max_workers: int = 8, +) -> list[tuple[Controller, bool, str]]: + """선택된 컨트롤러에 일괄 시간 동기화. + + Returns: [(Controller, 성공여부, 메시지), ...] + """ + targets = [c for c in controllers if c.selected] + if not targets: + return [] + + dt = datetime.now() + results: list[tuple[Controller, bool, str]] = [] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_map = { + executor.submit(sync_time, c.ip, c.port, dt): c + for c in targets + } + for future in as_completed(future_map): + ctrl = future_map[future] + try: + ok, msg = future.result() + except Exception as e: + ok, msg = False, str(e) + results.append((ctrl, ok, msg)) + + return results diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/ip_format.py b/utils/ip_format.py new file mode 100644 index 0000000..b3236c8 --- /dev/null +++ b/utils/ip_format.py @@ -0,0 +1,35 @@ +"""IP 주소 패딩/정규화 유틸리티""" + + +def pad_ip(ip: str) -> str: + """IP 주소 각 옥텟을 3자리로 패딩. + 예: '192.168.0.74' -> '192.168.000.074' + """ + parts = ip.split(".") + padded = [] + for part in parts: + part = part.strip() + padded.append(part.zfill(3)[:3]) + return ".".join(padded) + + +def normalize_ip(ip: str) -> str: + """패딩된 IP를 일반 형태로 변환. + 예: '192.168.000.074' -> '192.168.0.74' + """ + parts = ip.split(".") + return ".".join(str(int(p)) for p in parts) + + +def sized_string(data: str, length: int, encoding: str = "euc-kr") -> str: + """문자열을 지정 바이트 길이로 맞춤 (공백 패딩). MS949/EUC-KR 한글 지원.""" + result = "" + byte_count = 0 + for ch in data: + ch_bytes = len(ch.encode(encoding, errors="replace")) + if byte_count + ch_bytes > length: + break + result += ch + byte_count += ch_bytes + result += " " * (length - byte_count) + return result diff --git a/utils/time_format.py b/utils/time_format.py new file mode 100644 index 0000000..272f7ee --- /dev/null +++ b/utils/time_format.py @@ -0,0 +1,37 @@ +"""날짜시간 <-> 프로토콜 포맷 변환 유틸리티""" + +from datetime import datetime + + +def datetime_to_protocol(dt: datetime) -> str: + """datetime을 프로토콜 포맷으로 변환. + 포맷: YYMMDDdHHMMSS (13자) + d: 요일 (0=일, 1=월, 2=화, 3=수, 4=목, 5=금, 6=토) + + 예: 2026-02-09 23:02:29 (월) → '2602091230229' + """ + yy = dt.strftime("%y") + mm = dt.strftime("%m") + dd = dt.strftime("%d") + dow = dt.isoweekday() % 7 # 7->0(일), 1->1(월), ..., 6->6(토) + hh = dt.strftime("%H") + mi = dt.strftime("%M") + ss = dt.strftime("%S") + return f"{yy}{mm}{dd}{dow}{hh}{mi}{ss}" + + +def protocol_to_datetime(data: str) -> datetime: + """프로토콜 포맷 문자열을 datetime으로 변환. + 입력: YYMMDDdHHMMSS (13자) + + 예: '2602091230229' → 2026-02-09 23:02:29 + """ + yy = int(data[0:2]) + mm = int(data[2:4]) + dd = int(data[4:6]) + # data[6] = 요일 (무시) + hh = int(data[7:9]) + mi = int(data[9:11]) + ss = int(data[11:13]) + year = 2000 + yy + return datetime(year, mm, dd, hh, mi, ss)