Files

185 lines
6.8 KiB
Python

import json
import threading
import time
from collections import deque
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from io import StringIO
class SyncSession:
def __init__(self):
self._lock = threading.Lock()
self.start_time = time.time()
self.syncs_total = 0
self.syncs_failed = 0
self.syncs_skipped = 0
self.total_added = 0
self.total_updated = 0
self.total_deleted = 0
self.total_duration = 0.0
self.total_latency_ms = 0.0
self.non_skip_count = 0
self.bandwidth_bytes = 0
self.bandwidth_saved_bytes = 0
self.history = deque(maxlen=50)
def record(self, ok: bool, duration: float, added: int, updated: int, deleted: int, skipped: bool, ics_latency_ms: float, msg: str, ics_download_size: int = 0) -> None:
with self._lock:
self.syncs_total += 1
ts = time.strftime("%H:%M:%S", time.gmtime())
if not ok:
self.syncs_failed += 1
if skipped:
self.syncs_skipped += 1
self.total_added += added
self.total_updated += updated
self.total_deleted += deleted
if not skipped:
self.non_skip_count += 1
self.total_duration += duration
self.total_latency_ms += ics_latency_ms
if ics_download_size:
self.bandwidth_bytes += ics_download_size
self.history.append({
"time": ts,
"ok": ok,
"duration": round(duration, 2),
"added": added,
"updated": updated,
"deleted": deleted,
"skipped": skipped,
"latency_ms": round(ics_latency_ms),
"msg": msg,
})
def get_status(self) -> dict:
with self._lock:
uptime = time.time() - self.start_time
avg_dur = round(self.total_duration / self.non_skip_count, 2) if self.non_skip_count > 0 else 0
avg_lat = round(self.total_latency_ms / self.non_skip_count) if self.non_skip_count > 0 else 0
return {
"uptime_sec": round(uptime),
"syncs_total": self.syncs_total,
"syncs_failed": self.syncs_failed,
"syncs_skipped": self.syncs_skipped,
"total_added": self.total_added,
"total_updated": self.total_updated,
"total_deleted": self.total_deleted,
"avg_duration": avg_dur,
"avg_latency_ms": avg_lat,
"bandwidth_bytes": self.bandwidth_bytes,
"bandwidth_saved_bytes": self.bandwidth_saved_bytes,
"history": list(self.history),
}
def add_saved_bandwidth(self, bytes_saved: int) -> None:
with self._lock:
self.bandwidth_saved_bytes += bytes_saved
class HealthServer:
def __init__(self, port: int = 8081):
self.port = port
self.lock = threading.Lock()
self.last_sync = None
self.last_sync_duration = 0.0
self.last_sync_success = None
self.event_count = 0
self.syncs_total = 0
self.syncs_failed = 0
self.server = None
self.thread = None
def start(self) -> None:
handler = self._make_handler()
self.server = HTTPServer(("0.0.0.0", self.port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
def stop(self) -> None:
if self.server:
self.server.shutdown()
self.server = None
self.thread = None
def update_status(self, last_sync: datetime, duration: float, success: bool, event_count: int) -> None:
with self.lock:
self.last_sync = last_sync
self.last_sync_duration = duration
self.last_sync_success = success
self.event_count = event_count
self.syncs_total += 1
if not success:
self.syncs_failed += 1
def _make_handler(self):
server_self = self
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self._handle_health(server_self)
elif self.path == "/metrics":
self._handle_metrics(server_self)
else:
self.send_response(404)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "not found"}).encode())
def _handle_health(self, srv):
with srv.lock:
status = "ok"
result = "none"
if srv.last_sync_success is not None:
if srv.last_sync_success:
result = "success"
else:
result = "failure"
status = "error"
payload = {
"status": status,
"last_sync": srv.last_sync.isoformat() if srv.last_sync else None,
"last_sync_duration_sec": srv.last_sync_duration,
"last_sync_result": result,
"event_count": srv.event_count,
"syncs_total": srv.syncs_total,
"syncs_failed": srv.syncs_failed,
}
body = json.dumps(payload).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(body)
def _handle_metrics(self, srv):
with srv.lock:
last_success = 1 if srv.last_sync_success else 0
duration = srv.last_sync_duration
evt_count = srv.event_count
total = srv.syncs_total
failed = srv.syncs_failed
buf = StringIO()
buf.write(f"baikal_sync_last_duration_seconds {duration}\n")
buf.write(f"baikal_sync_events_total {evt_count}\n")
buf.write(f"baikal_sync_total {total}\n")
buf.write(f"baikal_sync_failures_total {failed}\n")
buf.write(f"baikal_sync_last_success {last_success}\n")
body = buf.getvalue().encode()
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
return Handler