import json import threading import time from collections import deque from datetime import datetime, timezone from http.server import HTTPServer, BaseHTTPRequestHandler from io import StringIO class SyncSession: def __init__(self): self._lock = threading.Lock() self.start_time = time.time() self.syncs_total = 0 self.syncs_failed = 0 self.syncs_skipped = 0 self.total_added = 0 self.total_updated = 0 self.total_deleted = 0 self.total_duration = 0.0 self.total_latency_ms = 0.0 self.non_skip_count = 0 self.bandwidth_bytes = 0 self.bandwidth_saved_bytes = 0 self.history = deque(maxlen=50) def record(self, ok: bool, duration: float, added: int, updated: int, deleted: int, skipped: bool, ics_latency_ms: float, msg: str, ics_download_size: int = 0) -> None: with self._lock: self.syncs_total += 1 ts = time.strftime("%H:%M:%S", time.gmtime()) if not ok: self.syncs_failed += 1 if skipped: self.syncs_skipped += 1 self.total_added += added self.total_updated += updated self.total_deleted += deleted if not skipped: self.non_skip_count += 1 self.total_duration += duration self.total_latency_ms += ics_latency_ms if ics_download_size: self.bandwidth_bytes += ics_download_size self.history.append({ "time": ts, "ok": ok, "duration": round(duration, 2), "added": added, "updated": updated, "deleted": deleted, "skipped": skipped, "latency_ms": round(ics_latency_ms), "msg": msg, }) def get_status(self) -> dict: with self._lock: uptime = time.time() - self.start_time avg_dur = round(self.total_duration / self.non_skip_count, 2) if self.non_skip_count > 0 else 0 avg_lat = round(self.total_latency_ms / self.non_skip_count) if self.non_skip_count > 0 else 0 return { "uptime_sec": round(uptime), "syncs_total": self.syncs_total, "syncs_failed": self.syncs_failed, "syncs_skipped": self.syncs_skipped, "total_added": self.total_added, "total_updated": self.total_updated, "total_deleted": self.total_deleted, "avg_duration": avg_dur, "avg_latency_ms": avg_lat, "bandwidth_bytes": self.bandwidth_bytes, "bandwidth_saved_bytes": self.bandwidth_saved_bytes, "history": list(self.history), } def add_saved_bandwidth(self, bytes_saved: int) -> None: with self._lock: self.bandwidth_saved_bytes += bytes_saved class HealthServer: def __init__(self, port: int = 8081): self.port = port self.lock = threading.Lock() self.last_sync = None self.last_sync_duration = 0.0 self.last_sync_success = None self.event_count = 0 self.syncs_total = 0 self.syncs_failed = 0 self.server = None self.thread = None def start(self) -> None: handler = self._make_handler() self.server = HTTPServer(("0.0.0.0", self.port), handler) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() def stop(self) -> None: if self.server: self.server.shutdown() self.server = None self.thread = None def update_status(self, last_sync: datetime, duration: float, success: bool, event_count: int) -> None: with self.lock: self.last_sync = last_sync self.last_sync_duration = duration self.last_sync_success = success self.event_count = event_count self.syncs_total += 1 if not success: self.syncs_failed += 1 def _make_handler(self): server_self = self class Handler(BaseHTTPRequestHandler): def do_GET(self): if self.path == "/health": self._handle_health(server_self) elif self.path == "/metrics": self._handle_metrics(server_self) else: self.send_response(404) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"error": "not found"}).encode()) def _handle_health(self, srv): with srv.lock: status = "ok" result = "none" if srv.last_sync_success is not None: if srv.last_sync_success: result = "success" else: result = "failure" status = "error" payload = { "status": status, "last_sync": srv.last_sync.isoformat() if srv.last_sync else None, "last_sync_duration_sec": srv.last_sync_duration, "last_sync_result": result, "event_count": srv.event_count, "syncs_total": srv.syncs_total, "syncs_failed": srv.syncs_failed, } body = json.dumps(payload).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(body) def _handle_metrics(self, srv): with srv.lock: last_success = 1 if srv.last_sync_success else 0 duration = srv.last_sync_duration evt_count = srv.event_count total = srv.syncs_total failed = srv.syncs_failed buf = StringIO() buf.write(f"baikal_sync_last_duration_seconds {duration}\n") buf.write(f"baikal_sync_events_total {evt_count}\n") buf.write(f"baikal_sync_total {total}\n") buf.write(f"baikal_sync_failures_total {failed}\n") buf.write(f"baikal_sync_last_success {last_success}\n") body = buf.getvalue().encode() self.send_response(200) self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") self.end_headers() self.wfile.write(body) def log_message(self, format, *args): pass return Handler