- services/drift_monitor.py - threading.Timer 체인 방식 → 폴링 루프 스레드로 교체 - 30초 간격 폴링으로 절전 복귀 시 즉시 감지 - _do_read()를 try/except로 감싸 예외 시에도 루프 유지 - threading.Event로 깔끔한 중지 처리 - _schedule_hourly(), _hourly_tick() 삭제 → _hourly_loop() 신규 - Dabit Time Manager.spec - 빌드 출력 파일명 v1.0 → v1.1 업데이트 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
226 lines
7.6 KiB
Python
226 lines
7.6 KiB
Python
"""주기적 시간 읽기 및 오차 추적 서비스"""
|
|
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timedelta
|
|
from typing import Callable
|
|
|
|
from models.controller import Controller
|
|
from models.reading import TimeReading
|
|
from network.tcp_protocol import read_time
|
|
from config import DEFAULT_MONITOR_INTERVAL
|
|
|
|
|
|
class DriftMonitor:
|
|
"""주기적으로 컨트롤러 시간을 읽어 오차를 추적."""
|
|
|
|
def __init__(self):
|
|
self._timer: threading.Timer | None = None
|
|
self._hourly_thread: threading.Thread | None = None
|
|
self._hourly_stop_event: threading.Event | None = None
|
|
self._running = False
|
|
self._hourly_running = False
|
|
self._interval = DEFAULT_MONITOR_INTERVAL
|
|
self._controllers: list[Controller] = []
|
|
self._readings: list[TimeReading] = []
|
|
self._lock = threading.Lock()
|
|
self._on_readings: Callable[[list[TimeReading]], None] | None = None
|
|
self._on_error: Callable[[str], None] | None = None
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
@property
|
|
def is_hourly_running(self) -> bool:
|
|
return self._hourly_running
|
|
|
|
@property
|
|
def readings(self) -> list[TimeReading]:
|
|
with self._lock:
|
|
return list(self._readings)
|
|
|
|
@property
|
|
def interval(self) -> int:
|
|
return self._interval
|
|
|
|
def configure(
|
|
self,
|
|
controllers: list[Controller],
|
|
interval: int = DEFAULT_MONITOR_INTERVAL,
|
|
on_readings: Callable[[list[TimeReading]], None] | None = None,
|
|
on_error: Callable[[str], None] | None = None,
|
|
):
|
|
self._controllers = controllers
|
|
self._interval = interval
|
|
self._on_readings = on_readings
|
|
self._on_error = on_error
|
|
|
|
def start(self):
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._do_read()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
if self._timer:
|
|
self._timer.cancel()
|
|
self._timer = None
|
|
|
|
def start_hourly(self):
|
|
"""매시 정시(XX:00:00)에 시간 읽기 시작."""
|
|
if self._hourly_running:
|
|
return
|
|
self._hourly_running = True
|
|
self._hourly_stop_event = threading.Event()
|
|
self._hourly_thread = threading.Thread(target=self._hourly_loop, daemon=True)
|
|
self._hourly_thread.start()
|
|
|
|
def stop_hourly(self):
|
|
"""매시 정시 읽기 중지."""
|
|
self._hourly_running = False
|
|
if self._hourly_stop_event:
|
|
self._hourly_stop_event.set()
|
|
self._hourly_thread = None
|
|
|
|
def _hourly_loop(self):
|
|
"""매시 정시 읽기 루프 (절전 복귀 대응)."""
|
|
while self._hourly_running:
|
|
now = datetime.now()
|
|
next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
|
|
|
|
if self._on_error:
|
|
delay = (next_hour - now).total_seconds()
|
|
self._on_error(f"정시 읽기 예약: {next_hour.strftime('%H:%M:%S')} ({delay:.0f}초 후)")
|
|
|
|
# 30초 간격 폴링으로 정시 대기 (절전 복귀 시 즉시 감지)
|
|
while self._hourly_running:
|
|
now = datetime.now()
|
|
if now >= next_hour:
|
|
break
|
|
remaining = (next_hour - now).total_seconds()
|
|
wait = min(remaining, 30)
|
|
if self._hourly_stop_event.wait(timeout=wait):
|
|
return # 중지 요청
|
|
|
|
if not self._hourly_running:
|
|
break
|
|
|
|
# 정시 읽기 실행
|
|
try:
|
|
self._do_read(schedule_next=False)
|
|
except Exception as e:
|
|
if self._on_error:
|
|
try:
|
|
self._on_error(f"정시 읽기 오류: {e}")
|
|
except:
|
|
pass
|
|
|
|
def read_once(self):
|
|
"""즉시 한 번 읽기 (별도 스레드)."""
|
|
threading.Thread(target=self._do_read, args=(False,), daemon=True).start()
|
|
|
|
def clear_readings(self):
|
|
with self._lock:
|
|
self._readings.clear()
|
|
|
|
def _do_read(self, schedule_next: bool = True):
|
|
"""모든 컨트롤러에서 시간 읽기."""
|
|
targets = [c for c in self._controllers if c.selected]
|
|
if not targets:
|
|
if self._running and schedule_next:
|
|
self._schedule_next()
|
|
return
|
|
|
|
new_readings: list[TimeReading] = []
|
|
|
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
|
future_map = {
|
|
executor.submit(read_time, c.ip, c.port): c
|
|
for c in targets
|
|
}
|
|
for future in as_completed(future_map):
|
|
ctrl = future_map[future]
|
|
try:
|
|
ctrl_time, pc_before, pc_after, error = future.result()
|
|
except Exception as e:
|
|
if self._on_error:
|
|
self._on_error(f"{ctrl.display_label}: {e}")
|
|
continue
|
|
|
|
if ctrl_time is None:
|
|
if self._on_error:
|
|
self._on_error(f"{ctrl.display_label}: {error}")
|
|
continue
|
|
|
|
# 네트워크 지연 보정: PC 시간 중간값
|
|
pc_mid = pc_before + (pc_after - pc_before) / 2
|
|
drift = (ctrl_time - pc_mid).total_seconds()
|
|
|
|
reading = TimeReading(
|
|
controller_mac=ctrl.mac,
|
|
controller_label=ctrl.display_label,
|
|
pc_time=pc_mid,
|
|
controller_time=ctrl_time,
|
|
drift_seconds=drift,
|
|
)
|
|
new_readings.append(reading)
|
|
|
|
with self._lock:
|
|
self._readings.extend(new_readings)
|
|
|
|
if self._on_readings and new_readings:
|
|
self._on_readings(new_readings)
|
|
|
|
if self._running and schedule_next:
|
|
self._schedule_next()
|
|
|
|
def _schedule_next(self):
|
|
self._timer = threading.Timer(self._interval, self._do_read)
|
|
self._timer.daemon = True
|
|
self._timer.start()
|
|
|
|
def get_summary(self) -> dict[str, dict]:
|
|
"""컨트롤러별 오차 요약 통계.
|
|
|
|
Returns:
|
|
{mac: {label, count, avg_drift, max_drift, drift_per_hour, first_time, last_time}}
|
|
"""
|
|
with self._lock:
|
|
readings = list(self._readings)
|
|
|
|
by_mac: dict[str, list[TimeReading]] = {}
|
|
for r in readings:
|
|
by_mac.setdefault(r.controller_mac, []).append(r)
|
|
|
|
summary = {}
|
|
for mac, rlist in by_mac.items():
|
|
rlist.sort(key=lambda r: r.pc_time)
|
|
drifts = [r.drift_seconds for r in rlist]
|
|
count = len(drifts)
|
|
avg_drift = sum(drifts) / count if count else 0
|
|
max_drift = max(abs(d) for d in drifts) if drifts else 0
|
|
|
|
# 시간당 오차율
|
|
drift_per_hour = 0.0
|
|
if count >= 2:
|
|
elapsed_hours = (
|
|
rlist[-1].pc_time - rlist[0].pc_time
|
|
).total_seconds() / 3600
|
|
if elapsed_hours > 0:
|
|
drift_change = rlist[-1].drift_seconds - rlist[0].drift_seconds
|
|
drift_per_hour = drift_change / elapsed_hours
|
|
|
|
summary[mac] = {
|
|
"label": rlist[0].controller_label,
|
|
"count": count,
|
|
"avg_drift": avg_drift,
|
|
"max_drift": max_drift,
|
|
"drift_per_hour": drift_per_hour,
|
|
"first_time": rlist[0].pc_time,
|
|
"last_time": rlist[-1].pc_time,
|
|
}
|
|
|
|
return summary
|