Initial commit: Dabit Time Manager project

Python-based time management application with UDP discovery,
TCP protocol communication, time sync, and drift monitoring.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
insulee
2026-02-10 11:10:55 +09:00
commit 3c14e1e401
27 changed files with 2240 additions and 0 deletions

0
services/__init__.py Normal file
View File

211
services/drift_monitor.py Normal file
View File

@@ -0,0 +1,211 @@
"""주기적 시간 읽기 및 오차 추적 서비스"""
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Callable
from models.controller import Controller
from models.reading import TimeReading
from network.tcp_protocol import read_time
from config import DEFAULT_MONITOR_INTERVAL
class DriftMonitor:
"""주기적으로 컨트롤러 시간을 읽어 오차를 추적."""
def __init__(self):
self._timer: threading.Timer | None = None
self._hourly_timer: threading.Timer | None = None
self._running = False
self._hourly_running = False
self._interval = DEFAULT_MONITOR_INTERVAL
self._controllers: list[Controller] = []
self._readings: list[TimeReading] = []
self._lock = threading.Lock()
self._on_readings: Callable[[list[TimeReading]], None] | None = None
self._on_error: Callable[[str], None] | None = None
@property
def is_running(self) -> bool:
return self._running
@property
def is_hourly_running(self) -> bool:
return self._hourly_running
@property
def readings(self) -> list[TimeReading]:
with self._lock:
return list(self._readings)
@property
def interval(self) -> int:
return self._interval
def configure(
self,
controllers: list[Controller],
interval: int = DEFAULT_MONITOR_INTERVAL,
on_readings: Callable[[list[TimeReading]], None] | None = None,
on_error: Callable[[str], None] | None = None,
):
self._controllers = controllers
self._interval = interval
self._on_readings = on_readings
self._on_error = on_error
def start(self):
if self._running:
return
self._running = True
self._do_read()
def stop(self):
self._running = False
if self._timer:
self._timer.cancel()
self._timer = None
def start_hourly(self):
"""매시 정시(XX:00:00)에 시간 읽기 시작."""
if self._hourly_running:
return
self._hourly_running = True
self._schedule_hourly()
def stop_hourly(self):
"""매시 정시 읽기 중지."""
self._hourly_running = False
if self._hourly_timer:
self._hourly_timer.cancel()
self._hourly_timer = None
def _schedule_hourly(self):
"""다음 정시까지 대기 후 읽기 예약."""
if not self._hourly_running:
return
now = datetime.now()
# 다음 정시: 현재 시 + 1, 분/초 = 0
next_hour = now.replace(minute=0, second=0, microsecond=0)
next_hour = next_hour + timedelta(hours=1)
delay = (next_hour - now).total_seconds()
if self._on_error:
self._on_error(f"정시 읽기 예약: {next_hour.strftime('%H:%M:%S')} ({delay:.0f}초 후)")
self._hourly_timer = threading.Timer(delay, self._hourly_tick)
self._hourly_timer.daemon = True
self._hourly_timer.start()
def _hourly_tick(self):
"""정시 도달 시 읽기 실행 후 다음 정시 예약."""
if not self._hourly_running:
return
self._do_read(schedule_next=False)
self._schedule_hourly()
def read_once(self):
"""즉시 한 번 읽기 (별도 스레드)."""
threading.Thread(target=self._do_read, args=(False,), daemon=True).start()
def clear_readings(self):
with self._lock:
self._readings.clear()
def _do_read(self, schedule_next: bool = True):
"""모든 컨트롤러에서 시간 읽기."""
targets = [c for c in self._controllers if c.selected]
if not targets:
if self._running and schedule_next:
self._schedule_next()
return
new_readings: list[TimeReading] = []
with ThreadPoolExecutor(max_workers=8) as executor:
future_map = {
executor.submit(read_time, c.ip, c.port): c
for c in targets
}
for future in as_completed(future_map):
ctrl = future_map[future]
try:
ctrl_time, pc_before, pc_after, error = future.result()
except Exception as e:
if self._on_error:
self._on_error(f"{ctrl.display_label}: {e}")
continue
if ctrl_time is None:
if self._on_error:
self._on_error(f"{ctrl.display_label}: {error}")
continue
# 네트워크 지연 보정: PC 시간 중간값
pc_mid = pc_before + (pc_after - pc_before) / 2
drift = (ctrl_time - pc_mid).total_seconds()
reading = TimeReading(
controller_mac=ctrl.mac,
controller_label=ctrl.display_label,
pc_time=pc_mid,
controller_time=ctrl_time,
drift_seconds=drift,
)
new_readings.append(reading)
with self._lock:
self._readings.extend(new_readings)
if self._on_readings and new_readings:
self._on_readings(new_readings)
if self._running and schedule_next:
self._schedule_next()
def _schedule_next(self):
self._timer = threading.Timer(self._interval, self._do_read)
self._timer.daemon = True
self._timer.start()
def get_summary(self) -> dict[str, dict]:
"""컨트롤러별 오차 요약 통계.
Returns:
{mac: {label, count, avg_drift, max_drift, drift_per_hour, first_time, last_time}}
"""
with self._lock:
readings = list(self._readings)
by_mac: dict[str, list[TimeReading]] = {}
for r in readings:
by_mac.setdefault(r.controller_mac, []).append(r)
summary = {}
for mac, rlist in by_mac.items():
rlist.sort(key=lambda r: r.pc_time)
drifts = [r.drift_seconds for r in rlist]
count = len(drifts)
avg_drift = sum(drifts) / count if count else 0
max_drift = max(abs(d) for d in drifts) if drifts else 0
# 시간당 오차율
drift_per_hour = 0.0
if count >= 2:
elapsed_hours = (
rlist[-1].pc_time - rlist[0].pc_time
).total_seconds() / 3600
if elapsed_hours > 0:
drift_change = rlist[-1].drift_seconds - rlist[0].drift_seconds
drift_per_hour = drift_change / elapsed_hours
summary[mac] = {
"label": rlist[0].controller_label,
"count": count,
"avg_drift": avg_drift,
"max_drift": max_drift,
"drift_per_hour": drift_per_hour,
"first_time": rlist[0].pc_time,
"last_time": rlist[-1].pc_time,
}
return summary

103
services/export_service.py Normal file
View File

@@ -0,0 +1,103 @@
"""CSV export 서비스"""
import csv
import os
from pathlib import Path
from models.reading import TimeReading
def export_readings_csv(
readings: list[TimeReading],
filepath: str | Path,
) -> str:
"""시간 읽기 기록을 CSV로 내보내기 (컨트롤러별 정렬).
Returns: 저장된 파일 경로
"""
filepath = Path(filepath)
# 컨트롤러별로 정렬
sorted_readings = sorted(readings, key=lambda r: (r.controller_mac, r.pc_time))
with open(filepath, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow([
"컨트롤러",
"MAC",
"PC 시간",
"컨트롤러 시간",
"오차(초)",
])
for r in sorted_readings:
writer.writerow([
r.controller_label,
r.controller_mac,
r.pc_time.strftime("%Y-%m-%d %H:%M:%S"),
r.controller_time.strftime("%Y-%m-%d %H:%M:%S"),
f"{r.drift_seconds:.1f}",
])
return str(filepath)
def export_per_controller_csv(
readings: list[TimeReading],
folder: str | Path,
) -> list[str]:
"""컨트롤러별 개별 CSV 파일 생성.
Returns: 생성된 파일 경로 목록
"""
folder = Path(folder)
os.makedirs(folder, exist_ok=True)
by_mac: dict[str, list[TimeReading]] = {}
for r in readings:
by_mac.setdefault(r.controller_mac, []).append(r)
created = []
for mac, rlist in by_mac.items():
rlist.sort(key=lambda r: r.pc_time)
# 파일명: 컨트롤러 이름 기반 (특수문자 제거)
safe_name = rlist[0].controller_label
for ch in r'<>:"/\\|?*':
safe_name = safe_name.replace(ch, "_")
filepath = folder / f"{safe_name}_drift.csv"
export_readings_csv(rlist, filepath)
created.append(str(filepath))
return created
def export_summary_csv(
summary: dict[str, dict],
filepath: str | Path,
) -> str:
"""오차 요약 통계를 CSV로 내보내기.
Returns: 저장된 파일 경로
"""
filepath = Path(filepath)
with open(filepath, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow([
"컨트롤러",
"MAC",
"읽기 횟수",
"평균 오차(초)",
"최대 오차(초)",
"시간당 오차율(초/시간)",
"첫 읽기",
"마지막 읽기",
])
for mac, s in summary.items():
writer.writerow([
s["label"],
mac,
s["count"],
f"{s['avg_drift']:.2f}",
f"{s['max_drift']:.2f}",
f"{s['drift_per_hour']:.3f}",
s["first_time"].strftime("%Y-%m-%d %H:%M:%S"),
s["last_time"].strftime("%Y-%m-%d %H:%M:%S"),
])
return str(filepath)

View File

@@ -0,0 +1,38 @@
"""전체 컨트롤러 시간 동기화 서비스"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from models.controller import Controller
from network.tcp_protocol import sync_time
def sync_all(
controllers: list[Controller],
max_workers: int = 8,
) -> list[tuple[Controller, bool, str]]:
"""선택된 컨트롤러에 일괄 시간 동기화.
Returns: [(Controller, 성공여부, 메시지), ...]
"""
targets = [c for c in controllers if c.selected]
if not targets:
return []
dt = datetime.now()
results: list[tuple[Controller, bool, str]] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(sync_time, c.ip, c.port, dt): c
for c in targets
}
for future in as_completed(future_map):
ctrl = future_map[future]
try:
ok, msg = future.result()
except Exception as e:
ok, msg = False, str(e)
results.append((ctrl, ok, msg))
return results