266 lines
8.1 KiB
Python
266 lines
8.1 KiB
Python
import os
|
|
import signal
|
|
import time
|
|
import hashlib
|
|
import logging
|
|
import threading
|
|
import requests
|
|
import caldav
|
|
from datetime import datetime, timezone
|
|
|
|
from config import validate, HEADERS, Config
|
|
from state import SyncState
|
|
from diff import parse_ics_events, compute_diff, parse_ics_events_with_data
|
|
from apply import apply_adds, apply_updates, apply_deletes
|
|
from health import HealthServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
shutdown_event = threading.Event()
|
|
|
|
|
|
def setup_logging():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
|
|
def find_calendar(client, config):
|
|
principal = client.principal()
|
|
calendars = principal.calendars()
|
|
calendar_id = os.environ.get("CALENDAR_ID")
|
|
|
|
if calendar_id:
|
|
for cal in calendars:
|
|
if calendar_id in str(cal.url):
|
|
return cal
|
|
logger.error("Calendar with ID '%s' not found", calendar_id)
|
|
for c in calendars:
|
|
logger.error(" Available: %s", c.url)
|
|
return None
|
|
|
|
target = config.baikal_url.rstrip("/")
|
|
for cal in calendars:
|
|
if target in str(cal.url) or str(cal.url).rstrip("/") == target:
|
|
return cal
|
|
|
|
if calendars:
|
|
return calendars[0]
|
|
|
|
return None
|
|
|
|
|
|
def sync_once(state: SyncState, health: HealthServer, config: Config) -> bool:
|
|
start_time = time.time()
|
|
logger.info("Starting sync cycle...")
|
|
|
|
try:
|
|
remote_etag = None
|
|
try:
|
|
r = requests.head(config.ics_url, headers=HEADERS, timeout=15, allow_redirects=True)
|
|
if r.status_code < 400:
|
|
remote_etag = r.headers.get("ETag")
|
|
except Exception:
|
|
pass
|
|
|
|
cached_hash, cached_etag, _ = state.get_ics_cache()
|
|
|
|
if remote_etag and cached_etag == remote_etag:
|
|
logger.info("No changes detected (ETag match). Skipping sync.")
|
|
return True
|
|
|
|
response = requests.get(config.ics_url, headers=HEADERS, timeout=30)
|
|
response.raise_for_status()
|
|
ics_text = response.text
|
|
ics_hash = hashlib.sha256(ics_text.encode("utf-8")).hexdigest()
|
|
|
|
if cached_hash == ics_hash:
|
|
logger.info("No changes detected (hash match). Skipping sync.")
|
|
if remote_etag:
|
|
state.set_ics_cache(ics_hash, remote_etag)
|
|
return True
|
|
|
|
state.set_ics_cache(ics_hash, remote_etag)
|
|
logger.info("ICS changed. Downloaded %d bytes, hash %s", len(ics_text), ics_hash[:12])
|
|
|
|
ics_uids = parse_ics_events(ics_text)
|
|
known_uids = {}
|
|
for uid in state.get_event_uids():
|
|
h = state.get_event_hash(uid)
|
|
if h:
|
|
known_uids[uid] = h
|
|
|
|
deltas = compute_diff(ics_uids, known_uids)
|
|
to_add = deltas["to_add"]
|
|
to_update = deltas["to_update"]
|
|
to_delete = deltas["to_delete"]
|
|
|
|
is_first_run = not state.get_event_uids() and not cached_hash
|
|
|
|
if not to_add and not to_update and not to_delete:
|
|
logger.info("Calendar is already in sync.")
|
|
duration = time.time() - start_time
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
True,
|
|
len(ics_uids),
|
|
)
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
return True
|
|
|
|
if is_first_run and not to_delete:
|
|
logger.info(
|
|
"First run detected: %d events in ICS. Registering state without re-adding to calendar.",
|
|
len(to_add),
|
|
)
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
duration = time.time() - start_time
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
True,
|
|
len(ics_uids),
|
|
)
|
|
return True
|
|
|
|
logger.info(
|
|
"Delta: %d to add, %d to update, %d to delete",
|
|
len(to_add),
|
|
len(to_update),
|
|
len(to_delete),
|
|
)
|
|
|
|
client = caldav.DAVClient(
|
|
url=config.baikal_url,
|
|
username=config.baikal_user,
|
|
password=config.baikal_pass,
|
|
headers=HEADERS,
|
|
ssl_verify_cert=True,
|
|
)
|
|
calendar = find_calendar(client, config)
|
|
if not calendar:
|
|
logger.error("Failed to find calendar")
|
|
duration = time.time() - start_time
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
False,
|
|
0,
|
|
)
|
|
return False
|
|
|
|
snapshot = state.snapshot()
|
|
events_data = parse_ics_events_with_data(ics_text)
|
|
|
|
add_events = {uid: events_data[uid] for uid, _ in to_add if uid in events_data}
|
|
update_events = {uid: events_data[uid] for uid, _ in to_update if uid in events_data}
|
|
delete_uids = to_delete
|
|
|
|
try:
|
|
logger.info("Phase 1: Adding %d events...", len(add_events))
|
|
if add_events:
|
|
s, e = apply_adds(calendar, add_events)
|
|
logger.info("Added %d/%d events (%d errors)", s, len(add_events), e)
|
|
|
|
logger.info("Phase 2: Updating %d events...", len(update_events))
|
|
if update_events:
|
|
s, e = apply_updates(calendar, update_events)
|
|
logger.info("Updated %d/%d events (%d errors)", s, len(update_events), e)
|
|
|
|
logger.info("Phase 3: Deleting %d events...", len(delete_uids))
|
|
if delete_uids:
|
|
s, e = apply_deletes(calendar, delete_uids)
|
|
logger.info("Deleted %d/%d events (%d errors)", s, len(delete_uids), e)
|
|
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
|
|
for uid in delete_uids:
|
|
state.delete_event(uid)
|
|
|
|
total = len(ics_uids)
|
|
duration = time.time() - start_time
|
|
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
True,
|
|
total,
|
|
)
|
|
return True
|
|
|
|
except Exception as exc:
|
|
logger.error("Sync failed: %s. Rolling back state.", exc)
|
|
state.restore_snapshot(snapshot)
|
|
duration = time.time() - start_time
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
False,
|
|
0,
|
|
)
|
|
return False
|
|
|
|
except Exception as exc:
|
|
logger.error("Sync error: %s", exc)
|
|
duration = time.time() - start_time
|
|
health.update_status(
|
|
datetime.now(timezone.utc),
|
|
duration,
|
|
False,
|
|
0,
|
|
)
|
|
return False
|
|
|
|
|
|
def main():
|
|
setup_logging()
|
|
logger.info("Starting Baikal Sync service...")
|
|
|
|
try:
|
|
config = validate()
|
|
except ValueError as exc:
|
|
logger.error("Configuration error: %s", exc)
|
|
raise SystemExit(1)
|
|
|
|
logger.info("Sync frequency: %d minutes", config.sync_frequency)
|
|
state = SyncState("./sync.db")
|
|
health = HealthServer(8081)
|
|
health.start()
|
|
logger.info("Health endpoint on :8081")
|
|
|
|
backoff = 0
|
|
|
|
def handle_signal(signum, frame):
|
|
logger.info("Received signal %s. Shutting down...", signum)
|
|
shutdown_event.set()
|
|
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
while not shutdown_event.is_set():
|
|
success = sync_once(state, health, config)
|
|
|
|
if success:
|
|
backoff = 0
|
|
sleep_time = config.sync_frequency * 60
|
|
else:
|
|
backoff = max(1, min(backoff * 2 if backoff > 0 else 1, 30))
|
|
sleep_time = backoff * 60
|
|
logger.info("Sync failed. Backing off %d minutes...", backoff)
|
|
|
|
logger.info("Next sync in %d seconds...", sleep_time)
|
|
shutdown_event.wait(sleep_time)
|
|
|
|
state.close()
|
|
health.stop()
|
|
logger.info("Shutdown complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|