"""주기적 시간 읽기 및 오차 추적 서비스""" import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from typing import Callable from models.controller import Controller from models.reading import TimeReading from network.tcp_protocol import read_time from config import DEFAULT_MONITOR_INTERVAL class DriftMonitor: """주기적으로 컨트롤러 시간을 읽어 오차를 추적.""" def __init__(self): self._timer: threading.Timer | None = None self._hourly_thread: threading.Thread | None = None self._hourly_stop_event: threading.Event | None = None self._running = False self._hourly_running = False self._interval = DEFAULT_MONITOR_INTERVAL self._controllers: list[Controller] = [] self._readings: list[TimeReading] = [] self._lock = threading.Lock() self._on_readings: Callable[[list[TimeReading]], None] | None = None self._on_error: Callable[[str], None] | None = None @property def is_running(self) -> bool: return self._running @property def is_hourly_running(self) -> bool: return self._hourly_running @property def readings(self) -> list[TimeReading]: with self._lock: return list(self._readings) @property def interval(self) -> int: return self._interval def configure( self, controllers: list[Controller], interval: int = DEFAULT_MONITOR_INTERVAL, on_readings: Callable[[list[TimeReading]], None] | None = None, on_error: Callable[[str], None] | None = None, ): self._controllers = controllers self._interval = interval self._on_readings = on_readings self._on_error = on_error def start(self): if self._running: return self._running = True self._do_read() def stop(self): self._running = False if self._timer: self._timer.cancel() self._timer = None def start_hourly(self): """매시 정시(XX:00:00)에 시간 읽기 시작.""" if self._hourly_running: return self._hourly_running = True self._hourly_stop_event = threading.Event() self._hourly_thread = threading.Thread(target=self._hourly_loop, daemon=True) self._hourly_thread.start() def stop_hourly(self): """매시 정시 읽기 중지.""" self._hourly_running = False if self._hourly_stop_event: self._hourly_stop_event.set() self._hourly_thread = None def _hourly_loop(self): """매시 정시 읽기 루프 (절전 복귀 대응).""" while self._hourly_running: now = datetime.now() next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) if self._on_error: delay = (next_hour - now).total_seconds() self._on_error(f"정시 읽기 예약: {next_hour.strftime('%H:%M:%S')} ({delay:.0f}초 후)") # 30초 간격 폴링으로 정시 대기 (절전 복귀 시 즉시 감지) while self._hourly_running: now = datetime.now() if now >= next_hour: break remaining = (next_hour - now).total_seconds() wait = min(remaining, 30) if self._hourly_stop_event.wait(timeout=wait): return # 중지 요청 if not self._hourly_running: break # 정시 읽기 실행 try: self._do_read(schedule_next=False) except Exception as e: if self._on_error: try: self._on_error(f"정시 읽기 오류: {e}") except: pass def read_once(self): """즉시 한 번 읽기 (별도 스레드).""" threading.Thread(target=self._do_read, args=(False,), daemon=True).start() def clear_readings(self): with self._lock: self._readings.clear() def _do_read(self, schedule_next: bool = True): """모든 컨트롤러에서 시간 읽기.""" targets = [c for c in self._controllers if c.selected] if not targets: if self._running and schedule_next: self._schedule_next() return new_readings: list[TimeReading] = [] with ThreadPoolExecutor(max_workers=8) as executor: future_map = { executor.submit(read_time, c.ip, c.port): c for c in targets } for future in as_completed(future_map): ctrl = future_map[future] try: ctrl_time, pc_before, pc_after, error = future.result() except Exception as e: if self._on_error: self._on_error(f"{ctrl.display_label}: {e}") continue if ctrl_time is None: if self._on_error: self._on_error(f"{ctrl.display_label}: {error}") continue # 네트워크 지연 보정: PC 시간 중간값 pc_mid = pc_before + (pc_after - pc_before) / 2 drift = (ctrl_time - pc_mid).total_seconds() reading = TimeReading( controller_mac=ctrl.mac, controller_label=ctrl.display_label, pc_time=pc_mid, controller_time=ctrl_time, drift_seconds=drift, ) new_readings.append(reading) with self._lock: self._readings.extend(new_readings) if self._on_readings and new_readings: self._on_readings(new_readings) if self._running and schedule_next: self._schedule_next() def _schedule_next(self): self._timer = threading.Timer(self._interval, self._do_read) self._timer.daemon = True self._timer.start() def get_summary(self) -> dict[str, dict]: """컨트롤러별 오차 요약 통계. Returns: {mac: {label, count, avg_drift, max_drift, drift_per_hour, first_time, last_time}} """ with self._lock: readings = list(self._readings) by_mac: dict[str, list[TimeReading]] = {} for r in readings: by_mac.setdefault(r.controller_mac, []).append(r) summary = {} for mac, rlist in by_mac.items(): rlist.sort(key=lambda r: r.pc_time) drifts = [r.drift_seconds for r in rlist] count = len(drifts) avg_drift = sum(drifts) / count if count else 0 max_drift = max(abs(d) for d in drifts) if drifts else 0 # 시간당 오차율 drift_per_hour = 0.0 if count >= 2: elapsed_hours = ( rlist[-1].pc_time - rlist[0].pc_time ).total_seconds() / 3600 if elapsed_hours > 0: drift_change = rlist[-1].drift_seconds - rlist[0].drift_seconds drift_per_hour = drift_change / elapsed_hours summary[mac] = { "label": rlist[0].controller_label, "count": count, "avg_drift": avg_drift, "max_drift": max_drift, "drift_per_hour": drift_per_hour, "first_time": rlist[0].pc_time, "last_time": rlist[-1].pc_time, } return summary