185 lines
6.8 KiB
Python
185 lines
6.8 KiB
Python
import json
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from datetime import datetime, timezone
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from io import StringIO
|
|
|
|
|
|
class SyncSession:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self.start_time = time.time()
|
|
self.syncs_total = 0
|
|
self.syncs_failed = 0
|
|
self.syncs_skipped = 0
|
|
self.total_added = 0
|
|
self.total_updated = 0
|
|
self.total_deleted = 0
|
|
self.total_duration = 0.0
|
|
self.total_latency_ms = 0.0
|
|
self.non_skip_count = 0
|
|
self.bandwidth_bytes = 0
|
|
self.bandwidth_saved_bytes = 0
|
|
self.history = deque(maxlen=50)
|
|
|
|
def record(self, ok: bool, duration: float, added: int, updated: int, deleted: int, skipped: bool, ics_latency_ms: float, msg: str, ics_download_size: int = 0) -> None:
|
|
with self._lock:
|
|
self.syncs_total += 1
|
|
ts = time.strftime("%H:%M:%S", time.gmtime())
|
|
if not ok:
|
|
self.syncs_failed += 1
|
|
if skipped:
|
|
self.syncs_skipped += 1
|
|
self.total_added += added
|
|
self.total_updated += updated
|
|
self.total_deleted += deleted
|
|
if not skipped:
|
|
self.non_skip_count += 1
|
|
self.total_duration += duration
|
|
self.total_latency_ms += ics_latency_ms
|
|
if ics_download_size:
|
|
self.bandwidth_bytes += ics_download_size
|
|
self.history.append({
|
|
"time": ts,
|
|
"ok": ok,
|
|
"duration": round(duration, 2),
|
|
"added": added,
|
|
"updated": updated,
|
|
"deleted": deleted,
|
|
"skipped": skipped,
|
|
"latency_ms": round(ics_latency_ms),
|
|
"msg": msg,
|
|
})
|
|
|
|
def get_status(self) -> dict:
|
|
with self._lock:
|
|
uptime = time.time() - self.start_time
|
|
avg_dur = round(self.total_duration / self.non_skip_count, 2) if self.non_skip_count > 0 else 0
|
|
avg_lat = round(self.total_latency_ms / self.non_skip_count) if self.non_skip_count > 0 else 0
|
|
return {
|
|
"uptime_sec": round(uptime),
|
|
"syncs_total": self.syncs_total,
|
|
"syncs_failed": self.syncs_failed,
|
|
"syncs_skipped": self.syncs_skipped,
|
|
"total_added": self.total_added,
|
|
"total_updated": self.total_updated,
|
|
"total_deleted": self.total_deleted,
|
|
"avg_duration": avg_dur,
|
|
"avg_latency_ms": avg_lat,
|
|
"bandwidth_bytes": self.bandwidth_bytes,
|
|
"bandwidth_saved_bytes": self.bandwidth_saved_bytes,
|
|
"history": list(self.history),
|
|
}
|
|
|
|
def add_saved_bandwidth(self, bytes_saved: int) -> None:
|
|
with self._lock:
|
|
self.bandwidth_saved_bytes += bytes_saved
|
|
|
|
|
|
class HealthServer:
|
|
def __init__(self, port: int = 8081):
|
|
self.port = port
|
|
self.lock = threading.Lock()
|
|
self.last_sync = None
|
|
self.last_sync_duration = 0.0
|
|
self.last_sync_success = None
|
|
self.event_count = 0
|
|
self.syncs_total = 0
|
|
self.syncs_failed = 0
|
|
self.server = None
|
|
self.thread = None
|
|
|
|
def start(self) -> None:
|
|
handler = self._make_handler()
|
|
|
|
self.server = HTTPServer(("0.0.0.0", self.port), handler)
|
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop(self) -> None:
|
|
if self.server:
|
|
self.server.shutdown()
|
|
self.server = None
|
|
self.thread = None
|
|
|
|
def update_status(self, last_sync: datetime, duration: float, success: bool, event_count: int) -> None:
|
|
with self.lock:
|
|
self.last_sync = last_sync
|
|
self.last_sync_duration = duration
|
|
self.last_sync_success = success
|
|
self.event_count = event_count
|
|
self.syncs_total += 1
|
|
if not success:
|
|
self.syncs_failed += 1
|
|
|
|
def _make_handler(self):
|
|
server_self = self
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == "/health":
|
|
self._handle_health(server_self)
|
|
elif self.path == "/metrics":
|
|
self._handle_metrics(server_self)
|
|
else:
|
|
self.send_response(404)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"error": "not found"}).encode())
|
|
|
|
def _handle_health(self, srv):
|
|
with srv.lock:
|
|
status = "ok"
|
|
result = "none"
|
|
|
|
if srv.last_sync_success is not None:
|
|
if srv.last_sync_success:
|
|
result = "success"
|
|
else:
|
|
result = "failure"
|
|
status = "error"
|
|
|
|
payload = {
|
|
"status": status,
|
|
"last_sync": srv.last_sync.isoformat() if srv.last_sync else None,
|
|
"last_sync_duration_sec": srv.last_sync_duration,
|
|
"last_sync_result": result,
|
|
"event_count": srv.event_count,
|
|
"syncs_total": srv.syncs_total,
|
|
"syncs_failed": srv.syncs_failed,
|
|
}
|
|
|
|
body = json.dumps(payload).encode()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _handle_metrics(self, srv):
|
|
with srv.lock:
|
|
last_success = 1 if srv.last_sync_success else 0
|
|
duration = srv.last_sync_duration
|
|
evt_count = srv.event_count
|
|
total = srv.syncs_total
|
|
failed = srv.syncs_failed
|
|
|
|
buf = StringIO()
|
|
buf.write(f"baikal_sync_last_duration_seconds {duration}\n")
|
|
buf.write(f"baikal_sync_events_total {evt_count}\n")
|
|
buf.write(f"baikal_sync_total {total}\n")
|
|
buf.write(f"baikal_sync_failures_total {failed}\n")
|
|
buf.write(f"baikal_sync_last_success {last_success}\n")
|
|
|
|
body = buf.getvalue().encode()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
return Handler
|