Files
baikal-sync/sync_calendar.py
T

323 lines
12 KiB
Python

import os
import signal
import time
import hashlib
import logging
import threading
import requests
import caldav
from datetime import datetime, timezone
from config import validate, HEADERS, Config
from state import SyncState
from diff import parse_ics_events, compute_diff, parse_ics_events_with_data
from apply import apply_adds, apply_updates, apply_deletes
from health import HealthServer, SyncSession
from dashboard import DashboardServer
logger = logging.getLogger(__name__)
shutdown_event = threading.Event()
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def find_calendar(client, config):
principal = client.principal()
calendars = principal.calendars()
calendar_id = os.environ.get("CALENDAR_ID")
if calendar_id:
for cal in calendars:
if calendar_id in str(cal.url):
return cal
logger.error("Calendar with ID '%s' not found", calendar_id)
for c in calendars:
logger.error(" Available: %s", c.url)
return None
target = config.baikal_url.rstrip("/")
for cal in calendars:
if target in str(cal.url) or str(cal.url).rstrip("/") == target:
return cal
if calendars:
return calendars[0]
return None
def sync_once(
state: SyncState,
health: HealthServer,
session: SyncSession,
config: Config,
dashboard: DashboardServer,
) -> bool:
start_time = time.time()
ics_latency_ms = 0
added = 0
updated = 0
deleted = 0
skipped = False
ics_download_size = 0
msg = ""
dashboard.set_syncing(True)
logger.info("Starting sync cycle...")
try:
remote_etag = None
try:
r = requests.head(config.ics_url, headers=HEADERS, timeout=15, allow_redirects=True)
if r.status_code < 400:
remote_etag = r.headers.get("ETag")
except Exception:
pass
cached_hash, cached_etag, _ = state.get_ics_cache()
if remote_etag and cached_etag == remote_etag:
logger.info("No changes detected (ETag match). Skipping sync.")
duration = time.time() - start_time
skipped = True
msg = "no changes (ETag)"
session.record(True, duration, 0, 0, 0, True, 0, msg)
health.update_status(datetime.now(timezone.utc), duration, True, 0)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, 0)
dashboard.set_syncing(False)
return True
dl_start = time.time()
response = requests.get(config.ics_url, headers=HEADERS, timeout=30)
response.raise_for_status()
ics_text = response.text
dl_end = time.time()
ics_latency_ms = round((dl_end - dl_start) * 1000)
ics_download_size = len(ics_text)
ics_hash = hashlib.sha256(ics_text.encode("utf-8")).hexdigest()
if cached_hash == ics_hash:
logger.info("No changes detected (hash match). Skipping sync.")
if remote_etag:
state.set_ics_cache(ics_hash, remote_etag)
duration = time.time() - start_time
skipped = True
msg = "no changes (hash)"
session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
if ics_download_size:
pass
health.update_status(datetime.now(timezone.utc), duration, True, 0)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
dashboard.set_syncing(False)
return True
state.set_ics_cache(ics_hash, remote_etag)
logger.info("ICS changed. Downloaded %d bytes, hash %s", len(ics_text), ics_hash[:12])
ics_uids = parse_ics_events(ics_text)
known_uids = {}
for uid in state.get_event_uids():
h = state.get_event_hash(uid)
if h:
known_uids[uid] = h
deltas = compute_diff(ics_uids, known_uids)
to_add = deltas["to_add"]
to_update = deltas["to_update"]
to_delete = deltas["to_delete"]
is_first_run = not state.get_event_uids() and not cached_hash
if not to_add and not to_update and not to_delete:
logger.info("Calendar is already in sync.")
duration = time.time() - start_time
skipped = True
msg = "already in sync"
session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
dashboard.set_event_count(len(ics_uids))
dashboard.set_syncing(False)
return True
if is_first_run and not to_delete:
logger.info(
"First run detected: %d events in ICS. Registering state without re-adding.",
len(to_add),
)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
duration = time.time() - start_time
msg = f"first run, registered {len(to_add)} events"
session.record(True, duration, len(to_add), 0, 0, False, ics_latency_ms, msg, ics_download_size)
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
dashboard.set_event_count(len(ics_uids))
dashboard.set_syncing(False)
return True
logger.info(
"Delta: %d to add, %d to update, %d to delete",
len(to_add),
len(to_update),
len(to_delete),
)
client = caldav.DAVClient(
url=config.baikal_url,
username=config.baikal_user,
password=config.baikal_pass,
headers=HEADERS,
ssl_verify_cert=True,
)
calendar = find_calendar(client, config)
if not calendar:
logger.error("Failed to find calendar")
duration = time.time() - start_time
msg = "calendar not found"
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
health.update_status(datetime.now(timezone.utc), duration, False, 0)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
dashboard.set_syncing(False)
return False
snapshot = state.snapshot()
events_data = parse_ics_events_with_data(ics_text)
add_events = {uid: events_data[uid] for uid, _ in to_add if uid in events_data}
update_events = {uid: events_data[uid] for uid, _ in to_update if uid in events_data}
delete_uids = to_delete
try:
logger.info("Phase 1: Adding %d events...", len(add_events))
s_a, e_a = 0, 0
if add_events:
s_a, e_a = apply_adds(calendar, add_events)
logger.info("Added %d/%d events (%d errors)", s_a, len(add_events), e_a)
logger.info("Phase 2: Updating %d events...", len(update_events))
s_u, e_u = 0, 0
if update_events:
s_u, e_u = apply_updates(calendar, update_events)
logger.info("Updated %d/%d events (%d errors)", s_u, len(update_events), e_u)
logger.info("Phase 3: Deleting %d events...", len(delete_uids))
s_d, e_d = 0, 0
if delete_uids:
s_d, e_d = apply_deletes(calendar, delete_uids)
logger.info("Deleted %d/%d events (%d errors)", s_d, len(delete_uids), e_d)
added = s_a
updated = s_u
deleted = s_d
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
for uid in delete_uids:
state.delete_event(uid)
total = len(ics_uids)
duration = time.time() - start_time
msg = f"+{added} / ~{updated} / -{deleted}"
session.record(True, duration, added, updated, deleted, False, ics_latency_ms, msg, ics_download_size)
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
health.update_status(datetime.now(timezone.utc), duration, True, total)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, True, ics_latency_ms)
dashboard.set_event_count(total)
dashboard.set_syncing(False)
return True
except Exception as exc:
logger.error("Sync failed: %s. Rolling back state.", exc)
state.restore_snapshot(snapshot)
duration = time.time() - start_time
msg = str(exc)[:80]
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
health.update_status(datetime.now(timezone.utc), duration, False, 0)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
dashboard.set_syncing(False)
return False
except Exception as exc:
logger.error("Sync error: %s", exc)
duration = time.time() - start_time
msg = str(exc)[:80]
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
health.update_status(datetime.now(timezone.utc), duration, False, 0)
dashboard.set_last_sync(datetime.now(timezone.utc), duration, False, ics_latency_ms)
dashboard.set_syncing(False)
return False
def main():
setup_logging()
logger.info("Starting Baikal Sync service...")
try:
config = validate()
except ValueError as exc:
logger.error("Configuration error: %s", exc)
raise SystemExit(1)
logger.info("Sync frequency: %d minutes", config.sync_frequency)
state = SyncState("./sync.db")
health = HealthServer(8081)
health.start()
logger.info("Health endpoint on :8081")
session = SyncSession()
dashboard = DashboardServer(8082, session)
dashboard.update_config({
"ics_url": config.ics_url,
"baikal_url": config.baikal_url,
"baikal_user": config.baikal_user,
"baikal_pass": config.baikal_pass,
"sync_frequency": config.sync_frequency,
"calendar_id": os.environ.get("CALENDAR_ID", ""),
})
dashboard.start()
logger.info("Dashboard on :8082")
backoff = 0
def handle_signal(signum, frame):
logger.info("Received signal %s. Shutting down...", signum)
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
while not shutdown_event.is_set():
success = sync_once(state, health, session, config, dashboard)
if success:
backoff = 0
sleep_time = config.sync_frequency * 60
else:
backoff = max(1, min(backoff * 2 if backoff > 0 else 1, 30))
sleep_time = backoff * 60
logger.info("Sync failed. Backing off %d minutes...", backoff)
dashboard.set_next_sync_in(sleep_time)
dashboard.set_backoff_min(backoff)
logger.info("Next sync in %d seconds...", sleep_time)
shutdown_event.wait(sleep_time)
state.close()
health.stop()
dashboard.stop()
logger.info("Shutdown complete.")
if __name__ == "__main__":
main()