315 lines
11 KiB
Python
315 lines
11 KiB
Python
import os
|
|
import signal
|
|
import time
|
|
import hashlib
|
|
import logging
|
|
import threading
|
|
import requests
|
|
import caldav
|
|
from datetime import datetime, timezone
|
|
|
|
from config import validate, HEADERS, Config
|
|
from state import SyncState
|
|
from diff import parse_ics_events, compute_diff, parse_ics_events_with_data
|
|
from apply import apply_adds, apply_updates, apply_deletes
|
|
from health import HealthServer, SyncSession
|
|
from dashboard import DashboardServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
shutdown_event = threading.Event()
|
|
|
|
|
|
def setup_logging():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
|
|
def find_calendar(client, config):
|
|
principal = client.principal()
|
|
calendars = principal.calendars()
|
|
calendar_id = os.environ.get("CALENDAR_ID")
|
|
|
|
if calendar_id:
|
|
for cal in calendars:
|
|
if calendar_id in str(cal.url):
|
|
return cal
|
|
logger.error("Calendar with ID '%s' not found", calendar_id)
|
|
for c in calendars:
|
|
logger.error(" Available: %s", c.url)
|
|
return None
|
|
|
|
target = config.baikal_url.rstrip("/")
|
|
for cal in calendars:
|
|
if target in str(cal.url) or str(cal.url).rstrip("/") == target:
|
|
return cal
|
|
|
|
if calendars:
|
|
return calendars[0]
|
|
|
|
return None
|
|
|
|
|
|
def sync_once(
|
|
state: SyncState,
|
|
health: HealthServer,
|
|
session: SyncSession,
|
|
config: Config,
|
|
dashboard: DashboardServer,
|
|
) -> bool:
|
|
start_time = time.time()
|
|
ics_latency_ms = 0
|
|
added = 0
|
|
updated = 0
|
|
deleted = 0
|
|
skipped = False
|
|
ics_download_size = 0
|
|
msg = ""
|
|
|
|
dashboard.set_syncing(True)
|
|
logger.info("Starting sync cycle...")
|
|
|
|
try:
|
|
remote_etag = None
|
|
try:
|
|
r = requests.head(config.ics_url, headers=HEADERS, timeout=15, allow_redirects=True)
|
|
if r.status_code < 400:
|
|
remote_etag = r.headers.get("ETag")
|
|
except Exception:
|
|
pass
|
|
|
|
cached_hash, cached_etag, _ = state.get_ics_cache()
|
|
|
|
if remote_etag and cached_etag == remote_etag:
|
|
logger.info("No changes detected (ETag match). Skipping sync.")
|
|
duration = time.time() - start_time
|
|
skipped = True
|
|
msg = "no changes (ETag)"
|
|
session.record(True, duration, 0, 0, 0, True, 0, msg)
|
|
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
|
dashboard.set_syncing(False)
|
|
return True
|
|
|
|
dl_start = time.time()
|
|
response = requests.get(config.ics_url, headers=HEADERS, timeout=30)
|
|
response.raise_for_status()
|
|
ics_text = response.text
|
|
dl_end = time.time()
|
|
ics_latency_ms = round((dl_end - dl_start) * 1000)
|
|
ics_download_size = len(ics_text)
|
|
ics_hash = hashlib.sha256(ics_text.encode("utf-8")).hexdigest()
|
|
|
|
if cached_hash == ics_hash:
|
|
logger.info("No changes detected (hash match). Skipping sync.")
|
|
if remote_etag:
|
|
state.set_ics_cache(ics_hash, remote_etag)
|
|
duration = time.time() - start_time
|
|
skipped = True
|
|
msg = "no changes (hash)"
|
|
session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
|
|
if ics_download_size:
|
|
pass
|
|
health.update_status(datetime.now(timezone.utc), duration, True, 0)
|
|
dashboard.set_syncing(False)
|
|
return True
|
|
|
|
state.set_ics_cache(ics_hash, remote_etag)
|
|
logger.info("ICS changed. Downloaded %d bytes, hash %s", len(ics_text), ics_hash[:12])
|
|
|
|
ics_uids = parse_ics_events(ics_text)
|
|
known_uids = {}
|
|
for uid in state.get_event_uids():
|
|
h = state.get_event_hash(uid)
|
|
if h:
|
|
known_uids[uid] = h
|
|
|
|
deltas = compute_diff(ics_uids, known_uids)
|
|
to_add = deltas["to_add"]
|
|
to_update = deltas["to_update"]
|
|
to_delete = deltas["to_delete"]
|
|
|
|
is_first_run = not state.get_event_uids() and not cached_hash
|
|
|
|
if not to_add and not to_update and not to_delete:
|
|
logger.info("Calendar is already in sync.")
|
|
duration = time.time() - start_time
|
|
skipped = True
|
|
msg = "already in sync"
|
|
session.record(True, duration, 0, 0, 0, True, ics_latency_ms, msg, ics_download_size)
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
|
dashboard.set_event_count(len(ics_uids))
|
|
dashboard.set_syncing(False)
|
|
return True
|
|
|
|
if is_first_run and not to_delete:
|
|
logger.info(
|
|
"First run detected: %d events in ICS. Registering state without re-adding.",
|
|
len(to_add),
|
|
)
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
duration = time.time() - start_time
|
|
msg = f"first run, registered {len(to_add)} events"
|
|
session.record(True, duration, len(to_add), 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
|
health.update_status(datetime.now(timezone.utc), duration, True, len(ics_uids))
|
|
dashboard.set_event_count(len(ics_uids))
|
|
dashboard.set_syncing(False)
|
|
return True
|
|
|
|
logger.info(
|
|
"Delta: %d to add, %d to update, %d to delete",
|
|
len(to_add),
|
|
len(to_update),
|
|
len(to_delete),
|
|
)
|
|
|
|
client = caldav.DAVClient(
|
|
url=config.baikal_url,
|
|
username=config.baikal_user,
|
|
password=config.baikal_pass,
|
|
headers=HEADERS,
|
|
ssl_verify_cert=True,
|
|
)
|
|
calendar = find_calendar(client, config)
|
|
if not calendar:
|
|
logger.error("Failed to find calendar")
|
|
duration = time.time() - start_time
|
|
msg = "calendar not found"
|
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
|
dashboard.set_syncing(False)
|
|
return False
|
|
|
|
snapshot = state.snapshot()
|
|
events_data = parse_ics_events_with_data(ics_text)
|
|
|
|
add_events = {uid: events_data[uid] for uid, _ in to_add if uid in events_data}
|
|
update_events = {uid: events_data[uid] for uid, _ in to_update if uid in events_data}
|
|
delete_uids = to_delete
|
|
|
|
try:
|
|
logger.info("Phase 1: Adding %d events...", len(add_events))
|
|
s_a, e_a = 0, 0
|
|
if add_events:
|
|
s_a, e_a = apply_adds(calendar, add_events)
|
|
logger.info("Added %d/%d events (%d errors)", s_a, len(add_events), e_a)
|
|
|
|
logger.info("Phase 2: Updating %d events...", len(update_events))
|
|
s_u, e_u = 0, 0
|
|
if update_events:
|
|
s_u, e_u = apply_updates(calendar, update_events)
|
|
logger.info("Updated %d/%d events (%d errors)", s_u, len(update_events), e_u)
|
|
|
|
logger.info("Phase 3: Deleting %d events...", len(delete_uids))
|
|
s_d, e_d = 0, 0
|
|
if delete_uids:
|
|
s_d, e_d = apply_deletes(calendar, delete_uids)
|
|
logger.info("Deleted %d/%d events (%d errors)", s_d, len(delete_uids), e_d)
|
|
|
|
added = s_a
|
|
updated = s_u
|
|
deleted = s_d
|
|
|
|
for uid, h in ics_uids.items():
|
|
state.upsert_event(uid, h)
|
|
|
|
for uid in delete_uids:
|
|
state.delete_event(uid)
|
|
|
|
total = len(ics_uids)
|
|
duration = time.time() - start_time
|
|
msg = f"+{added} / ~{updated} / -{deleted}"
|
|
session.record(True, duration, added, updated, deleted, False, ics_latency_ms, msg, ics_download_size)
|
|
logger.info("Sync completed in %.1fs. Total events: %d", duration, total)
|
|
health.update_status(datetime.now(timezone.utc), duration, True, total)
|
|
dashboard.set_event_count(total)
|
|
dashboard.set_syncing(False)
|
|
return True
|
|
|
|
except Exception as exc:
|
|
logger.error("Sync failed: %s. Rolling back state.", exc)
|
|
state.restore_snapshot(snapshot)
|
|
duration = time.time() - start_time
|
|
msg = str(exc)[:80]
|
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
|
dashboard.set_syncing(False)
|
|
return False
|
|
|
|
except Exception as exc:
|
|
logger.error("Sync error: %s", exc)
|
|
duration = time.time() - start_time
|
|
msg = str(exc)[:80]
|
|
session.record(False, duration, 0, 0, 0, False, ics_latency_ms, msg, ics_download_size)
|
|
health.update_status(datetime.now(timezone.utc), duration, False, 0)
|
|
dashboard.set_syncing(False)
|
|
return False
|
|
|
|
|
|
def main():
|
|
setup_logging()
|
|
logger.info("Starting Baikal Sync service...")
|
|
|
|
try:
|
|
config = validate()
|
|
except ValueError as exc:
|
|
logger.error("Configuration error: %s", exc)
|
|
raise SystemExit(1)
|
|
|
|
logger.info("Sync frequency: %d minutes", config.sync_frequency)
|
|
state = SyncState("./sync.db")
|
|
health = HealthServer(8081)
|
|
health.start()
|
|
logger.info("Health endpoint on :8081")
|
|
|
|
session = SyncSession()
|
|
dashboard = DashboardServer(8082, health, session)
|
|
dashboard.update_config({
|
|
"ics_url": config.ics_url,
|
|
"baikal_url": config.baikal_url,
|
|
"baikal_user": config.baikal_user,
|
|
"baikal_pass": config.baikal_pass,
|
|
"sync_frequency": config.sync_frequency,
|
|
"calendar_id": os.environ.get("CALENDAR_ID", ""),
|
|
})
|
|
dashboard.start()
|
|
logger.info("Dashboard on :8082")
|
|
|
|
backoff = 0
|
|
|
|
def handle_signal(signum, frame):
|
|
logger.info("Received signal %s. Shutting down...", signum)
|
|
shutdown_event.set()
|
|
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
while not shutdown_event.is_set():
|
|
success = sync_once(state, health, session, config, dashboard)
|
|
|
|
if success:
|
|
backoff = 0
|
|
sleep_time = config.sync_frequency * 60
|
|
else:
|
|
backoff = max(1, min(backoff * 2 if backoff > 0 else 1, 30))
|
|
sleep_time = backoff * 60
|
|
logger.info("Sync failed. Backing off %d minutes...", backoff)
|
|
|
|
dashboard.set_next_sync_in(sleep_time)
|
|
dashboard.set_backoff_min(backoff)
|
|
logger.info("Next sync in %d seconds...", sleep_time)
|
|
shutdown_event.wait(sleep_time)
|
|
|
|
state.close()
|
|
health.stop()
|
|
dashboard.stop()
|
|
logger.info("Shutdown complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|