Files
baikal-sync/sync_calendar.py
T

266 lines
8.1 KiB
Python

import os
import signal
import time
import hashlib
import logging
import threading
import requests
import caldav
from datetime import datetime, timezone
from config import validate, HEADERS, Config
from state import SyncState
from diff import parse_ics_events, compute_diff, parse_ics_events_with_data
from apply import apply_adds, apply_updates, apply_deletes
from health import HealthServer
logger = logging.getLogger(__name__)
shutdown_event = threading.Event()
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def find_calendar(client, config):
principal = client.principal()
calendars = principal.calendars()
calendar_id = os.environ.get("CALENDAR_ID")
if calendar_id:
for cal in calendars:
if calendar_id in str(cal.url):
return cal
logger.error("Calendar with ID '%s' not found", calendar_id)
for c in calendars:
logger.error(" Available: %s", c.url)
return None
target = config.baikal_url.rstrip("/")
for cal in calendars:
if target in str(cal.url) or str(cal.url).rstrip("/") == target:
return cal
if calendars:
return calendars[0]
return None
def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
start_time = time.time()
logger.info("Starting sync cycle...")
try:
remote_etag = None
try:
r = requests.head(config.ics_url, headers=HEADERS, timeout=15, allow_redirects=True)
if r.status_code < 400:
remote_etag = r.headers.get("ETag")
except Exception:
pass
cached_hash, cached_etag, _ = state.get_ics_cache()
if remote_etag and cached_etag == remote_etag:
logger.info("No changes detected (ETag match). Skipping sync.")
return True
response = requests.get(config.ics_url, headers=HEADERS, timeout=30)
response.raise_for_status()
ics_text = response.text
ics_hash = hashlib.sha256(ics_text.encode("utf-8")).hexdigest()
if cached_hash == ics_hash:
logger.info("No changes detected (hash match). Skipping sync.")
if remote_etag:
state.set_ics_cache(ics_hash, remote_etag)
return True
state.set_ics_cache(ics_hash, remote_etag)
logger.info("ICS changed. Downloaded %d bytes, hash %s", len(ics_text), ics_hash[:12])
ics_uids = parse_ics_events(ics_text)
known_uids = {}
for uid in state.get_event_uids():
h = state.get_event_hash(uid)
if h:
known_uids[uid] = h
deltas = compute_diff(ics_uids, known_uids)
to_add = deltas["to_add"]
to_update = deltas["to_update"]
to_delete = deltas["to_delete"]
is_first_run = not state.get_event_uids() and not cached_hash
if not to_add and not to_update and not to_delete:
logger.info("Calendar is already in sync.")
duration = time.time() - start_time
health.update_status(
datetime.now(timezone.utc),
duration,
True,
len(ics_uids),
)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
return True
if is_first_run and not to_delete:
logger.info(
"First run detected: %d events in ICS. Registering state without re-adding to calendar.",
len(to_add),
)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
duration = time.time() - start_time
health.update_status(
datetime.now(timezone.utc),
duration,
True,
len(ics_uids),
)
return True
logger.info(
"Delta: %d to add, %d to update, %d to delete",
len(to_add),
len(to_update),
len(to_delete),
)
client = caldav.DAVClient(
url=config.baikal_url,
username=config.baikal_user,
password=config.baikal_pass,
headers=HEADERS,
ssl_verify_cert=True,
)
calendar = find_calendar(client, config)
if not calendar:
logger.error("Failed to find calendar")
duration = time.time() - start_time
health.update_status(
datetime.now(timezone.utc),
duration,
False,
0,
)
return False
snapshot = state.snapshot()
events_data = parse_ics_events_with_data(ics_text)
add_events = {uid: events_data[uid] for uid, _ in to_add if uid in events_data}
update_events = {uid: events_data[uid] for uid, _ in to_update if uid in events_data}
delete_uids = to_delete
try:
logger.info("Phase 1: Adding %d events...", len(add_events))
if add_events:
s, e = apply_adds(calendar, add_events)
logger.info("Added %d/%d events (%d errors)", s, len(add_events), e)
logger.info("Phase 2: Updating %d events...", len(update_events))
if update_events:
s, e = apply_updates(calendar, update_events)
logger.info("Updated %d/%d events (%d errors)", s, len(update_events), e)
logger.info("Phase 3: Deleting %d events...", len(delete_uids))
if delete_uids:
s, e = apply_deletes(calendar, delete_uids)
logger.info("Deleted %d/%d events (%d errors)", s, len(delete_uids), e)
for uid, h in ics_uids.items():
state.upsert_event(uid, h)
for uid in delete_uids:
state.delete_event(uid)
total = len(ics_uids)
duration = time.time() - start_time
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
health.update_status(
datetime.now(timezone.utc),
duration,
True,
total,
)
return True
except Exception as exc:
logger.error("Sync failed: %s. Rolling back state.", exc)
state.restore_snapshot(snapshot)
duration = time.time() - start_time
health.update_status(
datetime.now(timezone.utc),
duration,
False,
0,
)
return False
except Exception as exc:
logger.error("Sync error: %s", exc)
duration = time.time() - start_time
health.update_status(
datetime.now(timezone.utc),
duration,
False,
0,
)
return False
def main():
setup_logging()
logger.info("Starting Baikal Sync service...")
try:
config = validate()
except ValueError as exc:
logger.error("Configuration error: %s", exc)
raise SystemExit(1)
logger.info("Sync frequency: %d minutes", config.sync_frequency)
state = SyncState("./sync.db")
health = HealthServer(8081)
health.start()
logger.info("Health endpoint on :8081")
backoff = 0
def handle_signal(signum, frame):
logger.info("Received signal %s. Shutting down...", signum)
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
while not shutdown_event.is_set():
success = sync_once(state, health, config)
if success:
backoff = 0
sleep_time = config.sync_frequency * 60
else:
backoff = max(1, min(backoff * 2 if backoff > 0 else 1, 30))
sleep_time = backoff * 60
logger.info("Sync failed. Backing off %d minutes...", backoff)
logger.info("Next sync in %d seconds...", sleep_time)
shutdown_event.wait(sleep_time)
state.close()
health.stop()
logger.info("Shutdown complete.")
if __name__ == "__main__":
main()