From ae1cbe27a478b1277f3b6407cfce9ef77aca0932 Mon Sep 17 00:00:00 2001 From: Jose Lago Date: Thu, 11 Jun 2026 22:50:58 +0200 Subject: [PATCH] feat: add core modules (config, state, diff, apply, health) --- apply.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++ config.py | 53 ++++++++++++++++++++++++++ diff.py | 65 ++++++++++++++++++++++++++++++++ health.py | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ state.py | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 442 insertions(+) create mode 100644 apply.py create mode 100644 config.py create mode 100644 diff.py create mode 100644 health.py create mode 100644 state.py diff --git a/apply.py b/apply.py new file mode 100644 index 0000000..294b0cb --- /dev/null +++ b/apply.py @@ -0,0 +1,107 @@ +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any + +logger = logging.getLogger(__name__) + + +def _extract_uid(ical_data: bytes) -> str: + for line in ical_data.decode("utf-8", errors="replace").split("\r\n"): + if line.upper().startswith("UID:"): + return line[4:].strip() + return "" + + +def _find_event_by_uid(calendar: Any, uid: str) -> Any | None: + try: + for event in calendar.events(): + content = event.data.decode("utf-8", errors="replace") + for line in content.split("\r\n"): + if line.upper().startswith("UID:"): + if line[4:].strip() == uid: + return event + except Exception as exc: + logger.error("Error scanning events for UID %s: %s", uid, exc) + return None + + +def _add_event(calendar: Any, uid: str, ical_data: bytes) -> bool: + try: + calendar.add_event(ical_data) + return True + except Exception as exc: + logger.error("Failed to add event %s: %s", uid, exc) + return False + + +def _delete_event(calendar: Any, uid: str) -> bool: + try: + event = _find_event_by_uid(calendar, uid) + if event: + event.delete() + return True + logger.error("Event with UID %s not found", uid) + return False + except Exception as exc: + logger.error("Failed to delete event %s: %s", uid, exc) + return False + + +def _update_event(calendar: Any, uid: str, ical_data: bytes) -> bool: + if not _delete_event(calendar, uid): + return False + return _add_event(calendar, uid, ical_data) + + +def apply_adds(calendar: Any, events: dict[str, bytes], max_workers: int = 10) -> tuple[int, int]: + success = 0 + errors = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_add_event, calendar, uid, data): uid + for uid, data in events.items() + } + for future in as_completed(futures): + if future.result(): + success += 1 + else: + errors += 1 + + return success, errors + + +def apply_updates(calendar: Any, events: dict[str, bytes], max_workers: int = 10) -> tuple[int, int]: + success = 0 + errors = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_update_event, calendar, uid, data): uid + for uid, data in events.items() + } + for future in as_completed(futures): + if future.result(): + success += 1 + else: + errors += 1 + + return success, errors + + +def apply_deletes(calendar: Any, uids: list[str], max_workers: int = 10) -> tuple[int, int]: + success = 0 + errors = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_delete_event, calendar, uid): uid + for uid in uids + } + for future in as_completed(futures): + if future.result(): + success += 1 + else: + errors += 1 + + return success, errors diff --git a/config.py b/config.py new file mode 100644 index 0000000..8263c05 --- /dev/null +++ b/config.py @@ -0,0 +1,53 @@ +import os +from dataclasses import dataclass + + +HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Unraid-Sync/1.0"} + + +@dataclass +class Config: + ics_url: str + baikal_url: str + baikal_user: str + baikal_pass: str + sync_frequency: int + + +def validate() -> Config: + errors = [] + + ics_url = os.environ.get("ICS_URL") + if not ics_url: + errors.append("ICS_URL is required") + + baikal_url = os.environ.get("BAIKAL_URL") + if not baikal_url: + errors.append("BAIKAL_URL is required") + + baikal_user = os.environ.get("BAIKAL_USER") + if not baikal_user: + errors.append("BAIKAL_USER is required") + + baikal_pass = os.environ.get("BAIKAL_PASS") + if not baikal_pass: + errors.append("BAIKAL_PASS is required") + + sync_freq_raw = os.environ.get("SYNC_FREQUENCY", "5") + try: + sync_frequency = int(sync_freq_raw) + if sync_frequency <= 0: + raise ValueError + except ValueError: + errors.append("SYNC_FREQUENCY must be a positive integer") + + if errors: + raise ValueError("\n".join(errors)) + + return Config( + ics_url=ics_url, + baikal_url=baikal_url, + baikal_user=baikal_user, + baikal_pass=baikal_pass, + sync_frequency=sync_frequency, + ) diff --git a/diff.py b/diff.py new file mode 100644 index 0000000..d0d4051 --- /dev/null +++ b/diff.py @@ -0,0 +1,65 @@ +import hashlib +import logging +from icalendar import Calendar + +logger = logging.getLogger(__name__) + + +def parse_ics_events(ics_text: str) -> dict[str, str]: + cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text) + result = {} + for component in cal.walk(): + if component.name != "VEVENT": + continue + uid = component.get("UID") + if uid is None: + logger.warning("Skipping event with no UID") + continue + uid_str = str(uid) + try: + event_bytes = component.to_ical() + file_hash = hashlib.sha256(event_bytes).hexdigest() + result[uid_str] = file_hash + except Exception as e: + logger.warning("Failed to process event %s: %s", uid_str, e) + return result + + +def compute_diff(ics_uids: dict[str, str], known_uids: dict[str, str]) -> dict: + to_add = [] + to_update = [] + to_delete = [] + + for uid, file_hash in ics_uids.items(): + if uid not in known_uids: + to_add.append((uid, file_hash)) + elif known_uids[uid] != file_hash: + to_update.append((uid, file_hash)) + + for uid in known_uids: + if uid not in ics_uids: + to_delete.append(uid) + + return { + "to_add": to_add, + "to_update": to_update, + "to_delete": to_delete, + } + + +def parse_ics_events_with_data(ics_text: str) -> dict[str, bytes]: + cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text) + result = {} + for component in cal.walk(): + if component.name != "VEVENT": + continue + uid = component.get("UID") + if uid is None: + logger.warning("Skipping event with no UID") + continue + uid_str = str(uid) + try: + result[uid_str] = component.to_ical() + except Exception as e: + logger.warning("Failed to process event %s: %s", uid_str, e) + return result diff --git a/health.py b/health.py new file mode 100644 index 0000000..d20cebc --- /dev/null +++ b/health.py @@ -0,0 +1,111 @@ +import json +import threading +from datetime import datetime, timezone +from http.server import HTTPServer, BaseHTTPRequestHandler +from io import StringIO + + +class HealthServer: + def __init__(self, port: int = 8081): + self.port = port + self.lock = threading.Lock() + self.last_sync = None + self.last_sync_duration = 0.0 + self.last_sync_success = None + self.event_count = 0 + self.syncs_total = 0 + self.syncs_failed = 0 + self.server = None + self.thread = None + + def start(self) -> None: + handler = self._make_handler() + + self.server = HTTPServer(("0.0.0.0", self.port), handler) + self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.thread.start() + + def stop(self) -> None: + if self.server: + self.server.shutdown() + self.server = None + self.thread = None + + def update_status(self, last_sync: datetime, duration: float, success: bool, event_count: int) -> None: + with self.lock: + self.last_sync = last_sync + self.last_sync_duration = duration + self.last_sync_success = success + self.event_count = event_count + self.syncs_total += 1 + if not success: + self.syncs_failed += 1 + + def _make_handler(self): + server_self = self + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/health": + self._handle_health(server_self) + elif self.path == "/metrics": + self._handle_metrics(server_self) + else: + self.send_response(404) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": "not found"}).encode()) + + def _handle_health(self, srv): + with srv.lock: + status = "ok" + result = "none" + + if srv.last_sync_success is not None: + if srv.last_sync_success: + result = "success" + else: + result = "failure" + status = "error" + + payload = { + "status": status, + "last_sync": srv.last_sync.isoformat() if srv.last_sync else None, + "last_sync_duration_sec": srv.last_sync_duration, + "last_sync_result": result, + "event_count": srv.event_count, + "syncs_total": srv.syncs_total, + "syncs_failed": srv.syncs_failed, + } + + body = json.dumps(payload).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + + def _handle_metrics(self, srv): + with srv.lock: + last_success = 1 if srv.last_sync_success else 0 + duration = srv.last_sync_duration + evt_count = srv.event_count + total = srv.syncs_total + failed = srv.syncs_failed + + buf = StringIO() + buf.write(f"baikal_sync_last_duration_seconds {duration}\n") + buf.write(f"baikal_sync_events_total {evt_count}\n") + buf.write(f"baikal_sync_total {total}\n") + buf.write(f"baikal_sync_failures_total {failed}\n") + buf.write(f"baikal_sync_last_success {last_success}\n") + + body = buf.getvalue().encode() + self.send_response(200) + self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + pass + + return Handler diff --git a/state.py b/state.py new file mode 100644 index 0000000..5f70e2a --- /dev/null +++ b/state.py @@ -0,0 +1,106 @@ +import sqlite3 +import threading +from datetime import datetime, timezone + + +class SyncState: + def __init__(self, db_path="./sync.db"): + self.db_path = db_path + self._lock = threading.Lock() + self._conn = sqlite3.connect(db_path, timeout=30) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS events ( + uid TEXT PRIMARY KEY, + content_hash TEXT NOT NULL, + synced_at TIMESTAMP NOT NULL + ) + """) + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS ics_cache ( + id INTEGER PRIMARY KEY CHECK(id=1), + ics_hash TEXT, + etag TEXT, + fetched_at TIMESTAMP + ) + """) + self._conn.commit() + return self + + def get_event_uids(self) -> set[str]: + with self._lock: + cur = self._conn.execute("SELECT uid FROM events") + return {row[0] for row in cur.fetchall()} + + def get_event_hash(self, uid: str) -> str | None: + with self._lock: + cur = self._conn.execute("SELECT content_hash FROM events WHERE uid = ?", (uid,)) + row = cur.fetchone() + return row[0] if row else None + + def upsert_event(self, uid: str, content_hash: str) -> None: + with self._lock: + self._conn.execute(""" + INSERT OR REPLACE INTO events (uid, content_hash, synced_at) + VALUES (?, ?, ?) + """, (uid, content_hash, datetime.now(timezone.utc).isoformat())) + self._conn.commit() + + def delete_event(self, uid: str) -> None: + with self._lock: + self._conn.execute("DELETE FROM events WHERE uid = ?", (uid,)) + self._conn.commit() + + def clear_events(self) -> int: + with self._lock: + cur = self._conn.execute("SELECT COUNT(*) FROM events") + count = cur.fetchone()[0] + self._conn.execute("DELETE FROM events") + self._conn.commit() + return count + + def get_ics_cache(self) -> tuple[str | None, str | None, str | None]: + with self._lock: + cur = self._conn.execute("SELECT ics_hash, etag, fetched_at FROM ics_cache WHERE id = 1") + row = cur.fetchone() + if row: + return (row[0], row[1], row[2]) + return (None, None, None) + + def set_ics_cache(self, ics_hash: str, etag: str | None) -> None: + with self._lock: + self._conn.execute(""" + INSERT INTO ics_cache (id, ics_hash, etag, fetched_at) + VALUES (1, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET ics_hash = ?, etag = ?, fetched_at = ? + """, (ics_hash, etag, datetime.now(timezone.utc).isoformat(), + ics_hash, etag, datetime.now(timezone.utc).isoformat())) + self._conn.commit() + + def clear_ics_cache(self) -> None: + with self._lock: + self._conn.execute("DELETE FROM ics_cache") + self._conn.commit() + + def snapshot(self) -> dict: + with self._lock: + cur = self._conn.execute("SELECT uid, content_hash FROM events") + rows = cur.fetchall() + return { + "uids": [r[0] for r in rows], + "hashes": {r[0]: r[1] for r in rows} + } + + def restore_snapshot(self, data: dict) -> None: + with self._lock: + self._conn.execute("DELETE FROM events") + for uid, content_hash in data.get("hashes", {}).items(): + self._conn.execute(""" + INSERT INTO events (uid, content_hash, synced_at) + VALUES (?, ?, ?) + """, (uid, content_hash, datetime.now(timezone.utc).isoformat())) + self._conn.commit() + + def close(self) -> None: + with self._lock: + self._conn.close()