diff --git a/Dockerfile b/Dockerfile
index db4d074..961f932 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -9,6 +9,8 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . .
+EXPOSE 8081 8082
+
HEALTHCHECK --interval=60s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8081/health')" || exit 1
diff --git a/dashboard.py b/dashboard.py
new file mode 100644
index 0000000..11520f0
--- /dev/null
+++ b/dashboard.py
@@ -0,0 +1,146 @@
+import json
+import os
+import threading
+from http.server import HTTPServer, BaseHTTPRequestHandler
+
+from health import HealthServer, SyncSession
+
+
+class DashboardServer:
+ def __init__(self, port: int = 8082, health: HealthServer = None, session: SyncSession = None):
+ self.port = port
+ self.health = health
+ self.session = session
+ self.server = None
+ self.thread = None
+ self._lock = threading.Lock()
+ self._syncing = False
+ self._next_sync_in = 0
+ self._event_count = 0
+ self._backoff_min = 0
+ self._config = {}
+ self._base_dir = os.path.dirname(os.path.abspath(__file__))
+
+ def start(self):
+ handler = self._make_handler()
+ self.server = HTTPServer(("0.0.0.0", self.port), handler)
+ self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
+ self.thread.start()
+
+ def stop(self):
+ if self.server:
+ self.server.shutdown()
+ self.server = None
+ self.thread = None
+
+ def update_config(self, config_dict: dict):
+ with self._lock:
+ self._config = dict(config_dict)
+
+ def set_syncing(self, syncing: bool):
+ with self._lock:
+ self._syncing = syncing
+
+ def set_next_sync_in(self, seconds: int):
+ with self._lock:
+ self._next_sync_in = seconds
+
+ def set_event_count(self, n: int):
+ with self._lock:
+ self._event_count = n
+
+ def set_backoff_min(self, n: int):
+ with self._lock:
+ self._backoff_min = n
+
+ def _get_status(self):
+ with self._lock:
+ syncing = self._syncing
+ next_sync_in = self._next_sync_in
+ event_count = self._event_count
+ backoff_min = self._backoff_min
+ config = dict(self._config)
+
+ last_sync = None
+ duration = 0.0
+ last_success = None
+ if self.health:
+ with self.health.lock:
+ last_sync = self.health.last_sync
+ duration = self.health.last_sync_duration
+ last_success = self.health.last_sync_success
+
+ status = "idle"
+ if syncing:
+ status = "syncing"
+ elif last_success is False:
+ status = "error"
+ elif backoff_min > 0:
+ status = "backoff"
+
+ session_data = {}
+ history = []
+ if self.session:
+ session_data = self.session.get_status()
+ history = session_data.get("history", [])
+
+ ics_latency = 0.0
+ if self.session and self.session.non_skip_count > 0:
+ ics_latency = round(self.session.total_latency_ms / self.session.non_skip_count)
+
+ return {
+ "status": status,
+ "last_sync": last_sync.isoformat() if last_sync else None,
+ "duration": duration,
+ "ics_latency_ms": ics_latency,
+ "event_count": event_count,
+ "next_sync_in": next_sync_in,
+ "session": session_data,
+ "history": history,
+ "config": config,
+ }
+
+ def _make_handler(self):
+ server_self = self
+ base_dir = server_self._base_dir
+
+ class Handler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ if self.path == "/api/status":
+ self._handle_api(server_self)
+ elif self.path == "/":
+ self._handle_dashboard(base_dir)
+ else:
+ self.send_response(404)
+ self.send_header("Content-Type", "application/json")
+ self.end_headers()
+ self.wfile.write(json.dumps({"error": "not found"}).encode())
+
+ def _handle_api(self, srv):
+ data = srv._get_status()
+ body = json.dumps(data).encode()
+ self.send_response(200)
+ self.send_header("Content-Type", "application/json")
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _handle_dashboard(self, base):
+ html_path = os.path.join(base, "static", "dashboard.html")
+ try:
+ with open(html_path, "r", encoding="utf-8") as f:
+ content = f.read()
+ body = content.encode("utf-8")
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.end_headers()
+ self.wfile.write(body)
+ except FileNotFoundError:
+ self.send_response(404)
+ self.send_header("Content-Type", "application/json")
+ self.end_headers()
+ self.wfile.write(json.dumps({"error": "dashboard not found"}).encode())
+
+ def log_message(self, format, *args):
+ pass
+
+ return Handler
diff --git a/docker-compose.yml b/docker-compose.yml
index 659d65a..0477a35 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -3,6 +3,9 @@ services:
image: lagortinez/baikal-sync:latest
container_name: baikal-sync
restart: always
+ ports:
+ - "8081:8081"
+ - "8082:8082"
environment:
- ICS_URL=${ICS_URL}
- BAIKAL_URL=${BAIKAL_URL}
diff --git a/health.py b/health.py
index d20cebc..b220e91 100644
--- a/health.py
+++ b/health.py
@@ -1,10 +1,83 @@
import json
import threading
+import time
+from collections import deque
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from io import StringIO
+class SyncSession:
+ def __init__(self):
+ self._lock = threading.Lock()
+ self.start_time = time.time()
+ self.syncs_total = 0
+ self.syncs_failed = 0
+ self.syncs_skipped = 0
+ self.total_added = 0
+ self.total_updated = 0
+ self.total_deleted = 0
+ self.total_duration = 0.0
+ self.total_latency_ms = 0.0
+ self.non_skip_count = 0
+ self.bandwidth_bytes = 0
+ self.bandwidth_saved_bytes = 0
+ self.history = deque(maxlen=50)
+
+ def record(self, ok: bool, duration: float, added: int, updated: int, deleted: int, skipped: bool, ics_latency_ms: float, msg: str, ics_download_size: int = 0) -> None:
+ with self._lock:
+ self.syncs_total += 1
+ ts = time.strftime("%H:%M:%S", time.gmtime())
+ if not ok:
+ self.syncs_failed += 1
+ if skipped:
+ self.syncs_skipped += 1
+ self.total_added += added
+ self.total_updated += updated
+ self.total_deleted += deleted
+ if not skipped:
+ self.non_skip_count += 1
+ self.total_duration += duration
+ self.total_latency_ms += ics_latency_ms
+ if ics_download_size:
+ self.bandwidth_bytes += ics_download_size
+ self.history.append({
+ "time": ts,
+ "ok": ok,
+ "duration": round(duration, 2),
+ "added": added,
+ "updated": updated,
+ "deleted": deleted,
+ "skipped": skipped,
+ "latency_ms": round(ics_latency_ms),
+ "msg": msg,
+ })
+
+ def get_status(self) -> dict:
+ with self._lock:
+ uptime = time.time() - self.start_time
+ avg_dur = round(self.total_duration / self.non_skip_count, 2) if self.non_skip_count > 0 else 0
+ avg_lat = round(self.total_latency_ms / self.non_skip_count) if self.non_skip_count > 0 else 0
+ return {
+ "uptime_sec": round(uptime),
+ "syncs_total": self.syncs_total,
+ "syncs_failed": self.syncs_failed,
+ "syncs_skipped": self.syncs_skipped,
+ "total_added": self.total_added,
+ "total_updated": self.total_updated,
+ "total_deleted": self.total_deleted,
+ "avg_duration": avg_dur,
+ "avg_latency_ms": avg_lat,
+ "bandwidth_bytes": self.bandwidth_bytes,
+ "bandwidth_saved_bytes": self.bandwidth_saved_bytes,
+ "history": list(self.history),
+ }
+
+ def add_saved_bandwidth(self, bytes_saved: int) -> None:
+ with self._lock:
+ self.bandwidth_saved_bytes += bytes_saved
+
+
class HealthServer:
def __init__(self, port: int = 8081):
self.port = port
diff --git a/static/dashboard.html b/static/dashboard.html
new file mode 100644
index 0000000..a5e34f4
--- /dev/null
+++ b/static/dashboard.html
@@ -0,0 +1,731 @@
+
+
+
+
+
+Baikal Sync Dashboard
+
+
+
+
+
+
+
+ Baikal Sync
+
+
+
+
+
+
+
+
+
+
Session Breakdown
+
+
+
+
+
+
+
+
+
+
+
+
Configuration
+
+
+
+
+
+
+
+
+
+
diff --git a/sync_calendar.py b/sync_calendar.py
index b265ab0..7ce64de 100644
--- a/sync_calendar.py
+++ b/sync_calendar.py
@@ -12,7 +12,8 @@ from config import validate, HEADERS, Config
from state import SyncState
from diff import parse_ics_events, compute_diff, parse_ics_events_with_data
from apply import apply_adds, apply_updates, apply_deletes
-from health import HealthServer
+from health import HealthServer, SyncSession
+from dashboard import DashboardServer
logger = logging.getLogger(__name__)
shutdown_event = threading.Event()
@@ -51,8 +52,23 @@ def find_calendar(client, config):
return None
-def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
+def sync_once(
+ state: SyncState,
+ health: HealthServer,
+ session: SyncSession,
+ config: Config,
+ dashboard: DashboardServer,
+) -> bool:
start_time = time.time()
+ ics_latency_ms = 0
+ added = 0
+ updated = 0
+ deleted = 0
+ skipped = False
+ ics_download_size = 0
+ msg = ""
+
+ dashboard.set_syncing(True)
logger.info("Starting sync cycle...")
try:
@@ -68,17 +84,35 @@ def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
if remote_etag and cached_etag == remote_etag:
logger.info("No changes detected (ETag match). Skipping sync.")
+ duration = time.time() - start_time
+ skipped = True
+ msg = "no changes (ETag)"
+ session.record(True, duration, 0, 0, 0, True, 0, msg)
+ health.update_status(datetime.now(timezone.utc), duration, True, 0)
+ dashboard.set_syncing(False)
return True
+ dl_start = time.time()
response = requests.get(config.ics_url, headers=HEADERS, timeout=30)
response.raise_for_status()
ics_text = response.text
+ dl_end = time.time()
+ ics_latency_ms = round((dl_end - dl_start) * 1000)
+ ics_download_size = len(ics_text)
ics_hash = hashlib.sha256(ics_text.encode("utf-8")).hexdigest()
if cached_hash == ics_hash:
logger.info("No changes detected (hash match). Skipping sync.")
if remote_etag:
state.set_ics_cache(ics_hash, remote_etag)
+ duration = time.time() - start_time
+ skipped = True
+ msg = "no changes (hash)"
+ session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
+ if ics_download_size:
+ pass
+ health.update_status(datetime.now(timezone.utc), duration, True, 0)
+ dashboard.set_syncing(False)
return True
state.set_ics_cache(ics_hash, remote_etag)
@@ -101,30 +135,29 @@ def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
if not to_add and not to_update and not to_delete:
logger.info("Calendar is already in sync.")
duration = time.time() - start_time
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- True,
- len(ics_uids),
- )
+ skipped = True
+ msg = "already in sync"
+ session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
+ health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
+ dashboard.set_event_count(len(ics_uids))
+ dashboard.set_syncing(False)
return True
if is_first_run and not to_delete:
logger.info(
- "First run detected: %d events in ICS. Registering state without re-adding to calendar.",
+ "First run detected: %d events in ICS. Registering state without re-adding.",
len(to_add),
)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
duration = time.time() - start_time
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- True,
- len(ics_uids),
- )
+ msg = f"first run, registered {len(to_add)} events"
+ session.record(True, duration, len(to_add), 0, 0, False, ics_latency_ms, msg, ics_download_size)
+ health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
+ dashboard.set_event_count(len(ics_uids))
+ dashboard.set_syncing(False)
return True
logger.info(
@@ -145,12 +178,10 @@ def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
if not calendar:
logger.error("Failed to find calendar")
duration = time.time() - start_time
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- False,
- 0,
- )
+ msg = "calendar not found"
+ session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
+ health.update_status(datetime.now(timezone.utc), duration, False, 0)
+ dashboard.set_syncing(False)
return False
snapshot = state.snapshot()
@@ -162,19 +193,26 @@ def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
try:
logger.info("Phase 1: Adding %d events...", len(add_events))
+ s_a, e_a = 0, 0
if add_events:
- s, e = apply_adds(calendar, add_events)
- logger.info("Added %d/%d events (%d errors)", s, len(add_events), e)
+ s_a, e_a = apply_adds(calendar, add_events)
+ logger.info("Added %d/%d events (%d errors)", s_a, len(add_events), e_a)
logger.info("Phase 2: Updating %d events...", len(update_events))
+ s_u, e_u = 0, 0
if update_events:
- s, e = apply_updates(calendar, update_events)
- logger.info("Updated %d/%d events (%d errors)", s, len(update_events), e)
+ s_u, e_u = apply_updates(calendar, update_events)
+ logger.info("Updated %d/%d events (%d errors)", s_u, len(update_events), e_u)
logger.info("Phase 3: Deleting %d events...", len(delete_uids))
+ s_d, e_d = 0, 0
if delete_uids:
- s, e = apply_deletes(calendar, delete_uids)
- logger.info("Deleted %d/%d events (%d errors)", s, len(delete_uids), e)
+ s_d, e_d = apply_deletes(calendar, delete_uids)
+ logger.info("Deleted %d/%d events (%d errors)", s_d, len(delete_uids), e_d)
+
+ added = s_a
+ updated = s_u
+ deleted = s_d
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
@@ -184,36 +222,31 @@ def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
total = len(ics_uids)
duration = time.time() - start_time
+ msg = f"+{added} / ~{updated} / -{deleted}"
+ session.record(True, duration, added, updated, deleted, False, ics_latency_ms, msg, ics_download_size)
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- True,
- total,
- )
+ health.update_status(datetime.now(timezone.utc), duration, True, total)
+ dashboard.set_event_count(total)
+ dashboard.set_syncing(False)
return True
except Exception as exc:
logger.error("Sync failed: %s. Rolling back state.", exc)
state.restore_snapshot(snapshot)
duration = time.time() - start_time
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- False,
- 0,
- )
+ msg = str(exc)[:80]
+ session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
+ health.update_status(datetime.now(timezone.utc), duration, False, 0)
+ dashboard.set_syncing(False)
return False
except Exception as exc:
logger.error("Sync error: %s", exc)
duration = time.time() - start_time
- health.update_status(
- datetime.now(timezone.utc),
- duration,
- False,
- 0,
- )
+ msg = str(exc)[:80]
+ session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
+ health.update_status(datetime.now(timezone.utc), duration, False, 0)
+ dashboard.set_syncing(False)
return False
@@ -233,6 +266,19 @@ def main():
health.start()
logger.info("Health endpoint on :8081")
+ session = SyncSession()
+ dashboard = DashboardServer(8082, health, session)
+ dashboard.update_config({
+ "ics_url": config.ics_url,
+ "baikal_url": config.baikal_url,
+ "baikal_user": config.baikal_user,
+ "baikal_pass": config.baikal_pass,
+ "sync_frequency": config.sync_frequency,
+ "calendar_id": os.environ.get("CALENDAR_ID", ""),
+ })
+ dashboard.start()
+ logger.info("Dashboard on :8082")
+
backoff = 0
def handle_signal(signum, frame):
@@ -243,7 +289,7 @@ def main():
signal.signal(signal.SIGINT, handle_signal)
while not shutdown_event.is_set():
- success = sync_once(state, health, config)
+ success = sync_once(state, health, session, config, dashboard)
if success:
backoff = 0
@@ -253,11 +299,14 @@ def main():
sleep_time = backoff * 60
logger.info("Sync failed. Backing off %d minutes...", backoff)
+ dashboard.set_next_sync_in(sleep_time)
+ dashboard.set_backoff_min(backoff)
logger.info("Next sync in %d seconds...", sleep_time)
shutdown_event.wait(sleep_time)
state.close()
health.stop()
+ dashboard.stop()
logger.info("Shutdown complete.")