Feat/incremental sync #1
+25
-14
@@ -1,15 +1,20 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||||
|
from socketserver import ThreadingMixIn
|
||||||
|
|
||||||
from health import HealthServer, SyncSession
|
from health import SyncSession
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
||||||
|
daemon_threads = True
|
||||||
|
|
||||||
|
|
||||||
class DashboardServer:
|
class DashboardServer:
|
||||||
def __init__(self, port: int = 8082, health: HealthServer = None, session: SyncSession = None):
|
def __init__(self, port: int = 8082, session: SyncSession = None):
|
||||||
self.port = port
|
self.port = port
|
||||||
self.health = health
|
|
||||||
self.session = session
|
self.session = session
|
||||||
self.server = None
|
self.server = None
|
||||||
self.thread = None
|
self.thread = None
|
||||||
@@ -19,11 +24,15 @@ class DashboardServer:
|
|||||||
self._event_count = 0
|
self._event_count = 0
|
||||||
self._backoff_min = 0
|
self._backoff_min = 0
|
||||||
self._config = {}
|
self._config = {}
|
||||||
|
self._last_sync = None
|
||||||
|
self._last_duration = 0.0
|
||||||
|
self._last_success = None
|
||||||
|
self._last_latency_ms = 0
|
||||||
self._base_dir = os.path.dirname(os.path.abspath(__file__))
|
self._base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
handler = self._make_handler()
|
handler = self._make_handler()
|
||||||
self.server = HTTPServer(("0.0.0.0", self.port), handler)
|
self.server = ThreadingHTTPServer(("0.0.0.0", self.port), handler)
|
||||||
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
@@ -53,6 +62,13 @@ class DashboardServer:
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
self._backoff_min = n
|
self._backoff_min = n
|
||||||
|
|
||||||
|
def set_last_sync(self, last_sync, duration: float, success: bool, latency_ms: int = 0):
|
||||||
|
with self._lock:
|
||||||
|
self._last_sync = last_sync
|
||||||
|
self._last_duration = duration
|
||||||
|
self._last_success = success
|
||||||
|
self._last_latency_ms = latency_ms
|
||||||
|
|
||||||
def _get_status(self):
|
def _get_status(self):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
syncing = self._syncing
|
syncing = self._syncing
|
||||||
@@ -60,15 +76,10 @@ class DashboardServer:
|
|||||||
event_count = self._event_count
|
event_count = self._event_count
|
||||||
backoff_min = self._backoff_min
|
backoff_min = self._backoff_min
|
||||||
config = dict(self._config)
|
config = dict(self._config)
|
||||||
|
last_sync = self._last_sync
|
||||||
last_sync = None
|
duration = self._last_duration
|
||||||
duration = 0.0
|
last_success = self._last_success
|
||||||
last_success = None
|
latency_ms = self._last_latency_ms
|
||||||
if self.health:
|
|
||||||
with self.health.lock:
|
|
||||||
last_sync = self.health.last_sync
|
|
||||||
duration = self.health.last_sync_duration
|
|
||||||
last_success = self.health.last_sync_success
|
|
||||||
|
|
||||||
status = "idle"
|
status = "idle"
|
||||||
if syncing:
|
if syncing:
|
||||||
@@ -92,7 +103,7 @@ class DashboardServer:
|
|||||||
"status": status,
|
"status": status,
|
||||||
"last_sync": last_sync.isoformat() if last_sync else None,
|
"last_sync": last_sync.isoformat() if last_sync else None,
|
||||||
"duration": duration,
|
"duration": duration,
|
||||||
"ics_latency_ms": ics_latency,
|
"ics_latency_ms": ics_latency if ics_latency else latency_ms,
|
||||||
"event_count": event_count,
|
"event_count": event_count,
|
||||||
"next_sync_in": next_sync_in,
|
"next_sync_in": next_sync_in,
|
||||||
"session": session_data,
|
"session": session_data,
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ services:
|
|||||||
container_name: baikal-sync
|
container_name: baikal-sync
|
||||||
restart: always
|
restart: always
|
||||||
ports:
|
ports:
|
||||||
- "8081:8081"
|
|
||||||
- "8082:8082"
|
- "8082:8082"
|
||||||
environment:
|
environment:
|
||||||
- ICS_URL=${ICS_URL}
|
- ICS_URL=${ICS_URL}
|
||||||
|
|||||||
+12
-6
@@ -333,6 +333,8 @@ header {
|
|||||||
lastPoll: null
|
lastPoll: null
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let countdownRemaining = 0;
|
||||||
|
|
||||||
const $ = (sel) => document.querySelector(sel);
|
const $ = (sel) => document.querySelector(sel);
|
||||||
|
|
||||||
function fmtBytes(b) {
|
function fmtBytes(b) {
|
||||||
@@ -502,7 +504,7 @@ header {
|
|||||||
$("#stackedLegend").innerHTML =
|
$("#stackedLegend").innerHTML =
|
||||||
'<div class="legend-item"><span class="legend-dot" style="background:var(--green)"></span>Added: ' + added + "</div>" +
|
'<div class="legend-item"><span class="legend-dot" style="background:var(--green)"></span>Added: ' + added + "</div>" +
|
||||||
'<div class="legend-item"><span class="legend-dot" style="background:var(--blue)"></span>Updated: ' + updated + "</div>" +
|
'<div class="legend-item"><span class="legend-dot" style="background:var(--blue)"></span>Updated: ' + updated + "</div>" +
|
||||||
'<div class="legend-item"><span class="legend-dot" style="background:var(--purple)"></span>Deleted: " + deleted + "</div>" +
|
'<div class="legend-item"><span class="legend-dot" style="background:var(--purple)"></span>Deleted: ' + deleted + "</div>" +
|
||||||
'<div class="legend-item"><span class="legend-dot" style="background:var(--text-secondary)"></span>Skipped: ' + skipped + "</div>";
|
'<div class="legend-item"><span class="legend-dot" style="background:var(--text-secondary)"></span>Skipped: ' + skipped + "</div>";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -673,14 +675,12 @@ header {
|
|||||||
const fill = $("#progressFill");
|
const fill = $("#progressFill");
|
||||||
if (!el || !fill) return;
|
if (!el || !fill) return;
|
||||||
|
|
||||||
let remaining = (state.data.next_sync_in || 0) - 1;
|
countdownRemaining = Math.max(0, countdownRemaining - 1);
|
||||||
if (remaining < 0) remaining = 0;
|
el.textContent = fmtCountdown(countdownRemaining);
|
||||||
el.textContent = fmtCountdown(remaining);
|
|
||||||
|
|
||||||
const freq = state.data.config ? state.data.config.sync_frequency * 60 : 600;
|
const freq = state.data.config ? state.data.config.sync_frequency * 60 : 600;
|
||||||
const pct = freq > 0 ? Math.max(0, Math.min(100, ((freq - remaining) / freq) * 100)) : 100;
|
const pct = freq > 0 ? Math.max(0, Math.min(100, ((freq - countdownRemaining) / freq) * 100)) : 100;
|
||||||
fill.style.width = pct + "%";
|
fill.style.width = pct + "%";
|
||||||
state.data.next_sync_in = remaining;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function poll() {
|
async function poll() {
|
||||||
@@ -689,6 +689,12 @@ header {
|
|||||||
if (!res.ok) throw new Error("HTTP " + res.status);
|
if (!res.ok) throw new Error("HTTP " + res.status);
|
||||||
state.data = await res.json();
|
state.data = await res.json();
|
||||||
state.lastPoll = Date.now();
|
state.lastPoll = Date.now();
|
||||||
|
if (state.data.last_sync && state.data.config && state.data.config.sync_frequency) {
|
||||||
|
const syncEpoch = new Date(state.data.last_sync).getTime();
|
||||||
|
const freqSec = state.data.config.sync_frequency * 60;
|
||||||
|
const now = Date.now();
|
||||||
|
countdownRemaining = Math.max(0, Math.round(((syncEpoch + freqSec * 1000) - now) / 1000));
|
||||||
|
}
|
||||||
renderStatusBar(state.data);
|
renderStatusBar(state.data);
|
||||||
renderStats(state.data);
|
renderStats(state.data);
|
||||||
renderStackedBar(state.data);
|
renderStackedBar(state.data);
|
||||||
|
|||||||
+9
-1
@@ -89,6 +89,7 @@ def sync_once(
|
|||||||
msg = "no changes (ETag)"
|
msg = "no changes (ETag)"
|
||||||
session.record(True, duration, 0, 0, 0, True, 0, msg)
|
session.record(True, duration, 0, 0, 0, True, 0, msg)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, 0)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -112,6 +113,7 @@ def sync_once(
|
|||||||
if ics_download_size:
|
if ics_download_size:
|
||||||
pass
|
pass
|
||||||
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -141,6 +143,7 @@ def sync_once(
|
|||||||
for uid, h in ics_uids.items():
|
for uid, h in ics_uids.items():
|
||||||
state.upsert_event(uid, h)
|
state.upsert_event(uid, h)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
|
||||||
dashboard.set_event_count(len(ics_uids))
|
dashboard.set_event_count(len(ics_uids))
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return True
|
return True
|
||||||
@@ -156,6 +159,7 @@ def sync_once(
|
|||||||
msg = f"first run, registered {len(to_add)} events"
|
msg = f"first run, registered {len(to_add)} events"
|
||||||
session.record(True, duration, len(to_add), 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
session.record(True, duration, len(to_add), 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
|
||||||
dashboard.set_event_count(len(ics_uids))
|
dashboard.set_event_count(len(ics_uids))
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return True
|
return True
|
||||||
@@ -181,6 +185,7 @@ def sync_once(
|
|||||||
msg = "calendar not found"
|
msg = "calendar not found"
|
||||||
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -226,6 +231,7 @@ def sync_once(
|
|||||||
session.record(True, duration, added, updated, deleted, False, ics_latency_ms, msg, ics_download_size)
|
session.record(True, duration, added, updated, deleted, False, ics_latency_ms, msg, ics_download_size)
|
||||||
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
|
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, True, total)
|
health.update_status(datetime.now(timezone.utc), duration, True, total)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
|
||||||
dashboard.set_event_count(total)
|
dashboard.set_event_count(total)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return True
|
return True
|
||||||
@@ -237,6 +243,7 @@ def sync_once(
|
|||||||
msg = str(exc)[:80]
|
msg = str(exc)[:80]
|
||||||
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -246,6 +253,7 @@ def sync_once(
|
|||||||
msg = str(exc)[:80]
|
msg = str(exc)[:80]
|
||||||
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
||||||
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
||||||
|
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
|
||||||
dashboard.set_syncing(False)
|
dashboard.set_syncing(False)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -267,7 +275,7 @@ def main():
|
|||||||
logger.info("Health endpoint on :8081")
|
logger.info("Health endpoint on :8081")
|
||||||
|
|
||||||
session = SyncSession()
|
session = SyncSession()
|
||||||
dashboard = DashboardServer(8082, health, session)
|
dashboard = DashboardServer(8082, session)
|
||||||
dashboard.update_config({
|
dashboard.update_config({
|
||||||
"ics_url": config.ics_url,
|
"ics_url": config.ics_url,
|
||||||
"baikal_url": config.baikal_url,
|
"baikal_url": config.baikal_url,
|
||||||
|
|||||||
Reference in New Issue
Block a user