Files
baikal-sync/diff.py
T

66 lines
1.9 KiB
Python

import hashlib
import logging
from icalendar import Calendar
logger = logging.getLogger(__name__)
def parse_ics_events(ics_text: str) -> dict[str, str]:
cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text)
result = {}
for component in cal.walk():
if component.name != "VEVENT":
continue
uid = component.get("UID")
if uid is None:
logger.warning("Skipping event with no UID")
continue
uid_str = str(uid)
try:
event_bytes = component.to_ical()
file_hash = hashlib.sha256(event_bytes).hexdigest()
result[uid_str] = file_hash
except Exception as e:
logger.warning("Failed to process event %s: %s", uid_str, e)
return result
def compute_diff(ics_uids: dict[str, str], known_uids: dict[str, str]) -> dict:
to_add = []
to_update = []
to_delete = []
for uid, file_hash in ics_uids.items():
if uid not in known_uids:
to_add.append((uid, file_hash))
elif known_uids[uid] != file_hash:
to_update.append((uid, file_hash))
for uid in known_uids:
if uid not in ics_uids:
to_delete.append(uid)
return {
"to_add": to_add,
"to_update": to_update,
"to_delete": to_delete,
}
def parse_ics_events_with_data(ics_text: str) -> dict[str, bytes]:
cal = Calendar.from_ical(ics_text.encode() if isinstance(ics_text, str) else ics_text)
result = {}
for component in cal.walk():
if component.name != "VEVENT":
continue
uid = component.get("UID")
if uid is None:
logger.warning("Skipping event with no UID")
continue
uid_str = str(uid)
try:
result[uid_str] = component.to_ical()
except Exception as e:
logger.warning("Failed to process event %s: %s", uid_str, e)
return result