feat: add core modules (config, state, diff, apply, health)
This commit is contained in:
@@ -0,0 +1,107 @@
|
|||||||
|
import logging
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_uid(ical_data: bytes) -> str:
|
||||||
|
for line in ical_data.decode("utf-8", errors="replace").split("\r\n"):
|
||||||
|
if line.upper().startswith("UID:"):
|
||||||
|
return line[4:].strip()
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _find_event_by_uid(calendar: Any, uid: str) -> Any | None:
|
||||||
|
try:
|
||||||
|
for event in calendar.events():
|
||||||
|
content = event.data.decode("utf-8", errors="replace")
|
||||||
|
for line in content.split("\r\n"):
|
||||||
|
if line.upper().startswith("UID:"):
|
||||||
|
if line[4:].strip() == uid:
|
||||||
|
return event
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Error scanning events for UID %s: %s", uid, exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _add_event(calendar: Any, uid: str, ical_data: bytes) -> bool:
|
||||||
|
try:
|
||||||
|
calendar.add_event(ical_data)
|
||||||
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Failed to add event %s: %s", uid, exc)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _delete_event(calendar: Any, uid: str) -> bool:
|
||||||
|
try:
|
||||||
|
event = _find_event_by_uid(calendar, uid)
|
||||||
|
if event:
|
||||||
|
event.delete()
|
||||||
|
return True
|
||||||
|
logger.error("Event with UID %s not found", uid)
|
||||||
|
return False
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Failed to delete event %s: %s", uid, exc)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _update_event(calendar: Any, uid: str, ical_data: bytes) -> bool:
|
||||||
|
if not _delete_event(calendar, uid):
|
||||||
|
return False
|
||||||
|
return _add_event(calendar, uid, ical_data)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_adds(calendar: Any, events: dict[str, bytes], max_workers: int = 10) -> tuple[int, int]:
|
||||||
|
success = 0
|
||||||
|
errors = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures = {
|
||||||
|
executor.submit(_add_event, calendar, uid, data): uid
|
||||||
|
for uid, data in events.items()
|
||||||
|
}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
if future.result():
|
||||||
|
success += 1
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
return success, errors
|
||||||
|
|
||||||
|
|
||||||
|
def apply_updates(calendar: Any, events: dict[str, bytes], max_workers: int = 10) -> tuple[int, int]:
|
||||||
|
success = 0
|
||||||
|
errors = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures = {
|
||||||
|
executor.submit(_update_event, calendar, uid, data): uid
|
||||||
|
for uid, data in events.items()
|
||||||
|
}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
if future.result():
|
||||||
|
success += 1
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
return success, errors
|
||||||
|
|
||||||
|
|
||||||
|
def apply_deletes(calendar: Any, uids: list[str], max_workers: int = 10) -> tuple[int, int]:
|
||||||
|
success = 0
|
||||||
|
errors = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures = {
|
||||||
|
executor.submit(_delete_event, calendar, uid): uid
|
||||||
|
for uid in uids
|
||||||
|
}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
if future.result():
|
||||||
|
success += 1
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
return success, errors
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
import os
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Unraid-Sync/1.0"}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Config:
|
||||||
|
ics_url: str
|
||||||
|
baikal_url: str
|
||||||
|
baikal_user: str
|
||||||
|
baikal_pass: str
|
||||||
|
sync_frequency: int
|
||||||
|
|
||||||
|
|
||||||
|
def validate() -> Config:
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
ics_url = os.environ.get("ICS_URL")
|
||||||
|
if not ics_url:
|
||||||
|
errors.append("ICS_URL is required")
|
||||||
|
|
||||||
|
baikal_url = os.environ.get("BAIKAL_URL")
|
||||||
|
if not baikal_url:
|
||||||
|
errors.append("BAIKAL_URL is required")
|
||||||
|
|
||||||
|
baikal_user = os.environ.get("BAIKAL_USER")
|
||||||
|
if not baikal_user:
|
||||||
|
errors.append("BAIKAL_USER is required")
|
||||||
|
|
||||||
|
baikal_pass = os.environ.get("BAIKAL_PASS")
|
||||||
|
if not baikal_pass:
|
||||||
|
errors.append("BAIKAL_PASS is required")
|
||||||
|
|
||||||
|
sync_freq_raw = os.environ.get("SYNC_FREQUENCY", "5")
|
||||||
|
try:
|
||||||
|
sync_frequency = int(sync_freq_raw)
|
||||||
|
if sync_frequency <= 0:
|
||||||
|
raise ValueError
|
||||||
|
except ValueError:
|
||||||
|
errors.append("SYNC_FREQUENCY must be a positive integer")
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
raise ValueError("\n".join(errors))
|
||||||
|
|
||||||
|
return Config(
|
||||||
|
ics_url=ics_url,
|
||||||
|
baikal_url=baikal_url,
|
||||||
|
baikal_user=baikal_user,
|
||||||
|
baikal_pass=baikal_pass,
|
||||||
|
sync_frequency=sync_frequency,
|
||||||
|
)
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
from icalendar import Calendar
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ics_events(ics_text: str) -> dict[str, str]:
|
||||||
|
cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text)
|
||||||
|
result = {}
|
||||||
|
for component in cal.walk():
|
||||||
|
if component.name != "VEVENT":
|
||||||
|
continue
|
||||||
|
uid = component.get("UID")
|
||||||
|
if uid is None:
|
||||||
|
logger.warning("Skipping event with no UID")
|
||||||
|
continue
|
||||||
|
uid_str = str(uid)
|
||||||
|
try:
|
||||||
|
event_bytes = component.to_ical()
|
||||||
|
file_hash = hashlib.sha256(event_bytes).hexdigest()
|
||||||
|
result[uid_str] = file_hash
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to process event %s: %s", uid_str, e)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def compute_diff(ics_uids: dict[str, str], known_uids: dict[str, str]) -> dict:
|
||||||
|
to_add = []
|
||||||
|
to_update = []
|
||||||
|
to_delete = []
|
||||||
|
|
||||||
|
for uid, file_hash in ics_uids.items():
|
||||||
|
if uid not in known_uids:
|
||||||
|
to_add.append((uid, file_hash))
|
||||||
|
elif known_uids[uid] != file_hash:
|
||||||
|
to_update.append((uid, file_hash))
|
||||||
|
|
||||||
|
for uid in known_uids:
|
||||||
|
if uid not in ics_uids:
|
||||||
|
to_delete.append(uid)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"to_add": to_add,
|
||||||
|
"to_update": to_update,
|
||||||
|
"to_delete": to_delete,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ics_events_with_data(ics_text: str) -> dict[str, bytes]:
|
||||||
|
cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text)
|
||||||
|
result = {}
|
||||||
|
for component in cal.walk():
|
||||||
|
if component.name != "VEVENT":
|
||||||
|
continue
|
||||||
|
uid = component.get("UID")
|
||||||
|
if uid is None:
|
||||||
|
logger.warning("Skipping event with no UID")
|
||||||
|
continue
|
||||||
|
uid_str = str(uid)
|
||||||
|
try:
|
||||||
|
result[uid_str] = component.to_ical()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Failed to process event %s: %s", uid_str, e)
|
||||||
|
return result
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
import json
|
||||||
|
import threading
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||||
|
from io import StringIO
|
||||||
|
|
||||||
|
|
||||||
|
class HealthServer:
|
||||||
|
def __init__(self, port: int = 8081):
|
||||||
|
self.port = port
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
self.last_sync = None
|
||||||
|
self.last_sync_duration = 0.0
|
||||||
|
self.last_sync_success = None
|
||||||
|
self.event_count = 0
|
||||||
|
self.syncs_total = 0
|
||||||
|
self.syncs_failed = 0
|
||||||
|
self.server = None
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
handler = self._make_handler()
|
||||||
|
|
||||||
|
self.server = HTTPServer(("0.0.0.0", self.port), handler)
|
||||||
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if self.server:
|
||||||
|
self.server.shutdown()
|
||||||
|
self.server = None
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
|
def update_status(self, last_sync: datetime, duration: float, success: bool, event_count: int) -> None:
|
||||||
|
with self.lock:
|
||||||
|
self.last_sync = last_sync
|
||||||
|
self.last_sync_duration = duration
|
||||||
|
self.last_sync_success = success
|
||||||
|
self.event_count = event_count
|
||||||
|
self.syncs_total += 1
|
||||||
|
if not success:
|
||||||
|
self.syncs_failed += 1
|
||||||
|
|
||||||
|
def _make_handler(self):
|
||||||
|
server_self = self
|
||||||
|
|
||||||
|
class Handler(BaseHTTPRequestHandler):
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == "/health":
|
||||||
|
self._handle_health(server_self)
|
||||||
|
elif self.path == "/metrics":
|
||||||
|
self._handle_metrics(server_self)
|
||||||
|
else:
|
||||||
|
self.send_response(404)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"error": "not found"}).encode())
|
||||||
|
|
||||||
|
def _handle_health(self, srv):
|
||||||
|
with srv.lock:
|
||||||
|
status = "ok"
|
||||||
|
result = "none"
|
||||||
|
|
||||||
|
if srv.last_sync_success is not None:
|
||||||
|
if srv.last_sync_success:
|
||||||
|
result = "success"
|
||||||
|
else:
|
||||||
|
result = "failure"
|
||||||
|
status = "error"
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"status": status,
|
||||||
|
"last_sync": srv.last_sync.isoformat() if srv.last_sync else None,
|
||||||
|
"last_sync_duration_sec": srv.last_sync_duration,
|
||||||
|
"last_sync_result": result,
|
||||||
|
"event_count": srv.event_count,
|
||||||
|
"syncs_total": srv.syncs_total,
|
||||||
|
"syncs_failed": srv.syncs_failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
body = json.dumps(payload).encode()
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(body)
|
||||||
|
|
||||||
|
def _handle_metrics(self, srv):
|
||||||
|
with srv.lock:
|
||||||
|
last_success = 1 if srv.last_sync_success else 0
|
||||||
|
duration = srv.last_sync_duration
|
||||||
|
evt_count = srv.event_count
|
||||||
|
total = srv.syncs_total
|
||||||
|
failed = srv.syncs_failed
|
||||||
|
|
||||||
|
buf = StringIO()
|
||||||
|
buf.write(f"baikal_sync_last_duration_seconds {duration}\n")
|
||||||
|
buf.write(f"baikal_sync_events_total {evt_count}\n")
|
||||||
|
buf.write(f"baikal_sync_total {total}\n")
|
||||||
|
buf.write(f"baikal_sync_failures_total {failed}\n")
|
||||||
|
buf.write(f"baikal_sync_last_success {last_success}\n")
|
||||||
|
|
||||||
|
body = buf.getvalue().encode()
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(body)
|
||||||
|
|
||||||
|
def log_message(self, format, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return Handler
|
||||||
@@ -0,0 +1,106 @@
|
|||||||
|
import sqlite3
|
||||||
|
import threading
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
|
||||||
|
class SyncState:
|
||||||
|
def __init__(self, db_path="./sync.db"):
|
||||||
|
self.db_path = db_path
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._conn = sqlite3.connect(db_path, timeout=30)
|
||||||
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
self._conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS events (
|
||||||
|
uid TEXT PRIMARY KEY,
|
||||||
|
content_hash TEXT NOT NULL,
|
||||||
|
synced_at TIMESTAMP NOT NULL
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
self._conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS ics_cache (
|
||||||
|
id INTEGER PRIMARY KEY CHECK(id=1),
|
||||||
|
ics_hash TEXT,
|
||||||
|
etag TEXT,
|
||||||
|
fetched_at TIMESTAMP
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
self._conn.commit()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_event_uids(self) -> set[str]:
|
||||||
|
with self._lock:
|
||||||
|
cur = self._conn.execute("SELECT uid FROM events")
|
||||||
|
return {row[0] for row in cur.fetchall()}
|
||||||
|
|
||||||
|
def get_event_hash(self, uid: str) -> str | None:
|
||||||
|
with self._lock:
|
||||||
|
cur = self._conn.execute("SELECT content_hash FROM events WHERE uid = ?", (uid,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
return row[0] if row else None
|
||||||
|
|
||||||
|
def upsert_event(self, uid: str, content_hash: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("""
|
||||||
|
INSERT OR REPLACE INTO events (uid, content_hash, synced_at)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
""", (uid, content_hash, datetime.now(timezone.utc).isoformat()))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def delete_event(self, uid: str) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("DELETE FROM events WHERE uid = ?", (uid,))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def clear_events(self) -> int:
|
||||||
|
with self._lock:
|
||||||
|
cur = self._conn.execute("SELECT COUNT(*) FROM events")
|
||||||
|
count = cur.fetchone()[0]
|
||||||
|
self._conn.execute("DELETE FROM events")
|
||||||
|
self._conn.commit()
|
||||||
|
return count
|
||||||
|
|
||||||
|
def get_ics_cache(self) -> tuple[str | None, str | None, str | None]:
|
||||||
|
with self._lock:
|
||||||
|
cur = self._conn.execute("SELECT ics_hash, etag, fetched_at FROM ics_cache WHERE id = 1")
|
||||||
|
row = cur.fetchone()
|
||||||
|
if row:
|
||||||
|
return (row[0], row[1], row[2])
|
||||||
|
return (None, None, None)
|
||||||
|
|
||||||
|
def set_ics_cache(self, ics_hash: str, etag: str | None) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("""
|
||||||
|
INSERT INTO ics_cache (id, ics_hash, etag, fetched_at)
|
||||||
|
VALUES (1, ?, ?, ?)
|
||||||
|
ON CONFLICT(id) DO UPDATE SET ics_hash = ?, etag = ?, fetched_at = ?
|
||||||
|
""", (ics_hash, etag, datetime.now(timezone.utc).isoformat(),
|
||||||
|
ics_hash, etag, datetime.now(timezone.utc).isoformat()))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def clear_ics_cache(self) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("DELETE FROM ics_cache")
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def snapshot(self) -> dict:
|
||||||
|
with self._lock:
|
||||||
|
cur = self._conn.execute("SELECT uid, content_hash FROM events")
|
||||||
|
rows = cur.fetchall()
|
||||||
|
return {
|
||||||
|
"uids": [r[0] for r in rows],
|
||||||
|
"hashes": {r[0]: r[1] for r in rows}
|
||||||
|
}
|
||||||
|
|
||||||
|
def restore_snapshot(self, data: dict) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("DELETE FROM events")
|
||||||
|
for uid, content_hash in data.get("hashes", {}).items():
|
||||||
|
self._conn.execute("""
|
||||||
|
INSERT INTO events (uid, content_hash, synced_at)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
""", (uid, content_hash, datetime.now(timezone.utc).isoformat()))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.close()
|
||||||
Reference in New Issue
Block a user