feat: Implement CalDAV client and FastMCP server exposing calendar event management tools.

This commit is contained in:
Lago
2026-01-26 16:25:39 +01:00
parent 32dd756885
commit 1100396c79
6 changed files with 84 additions and 15 deletions

View File

@@ -1,7 +1,7 @@
import os import os
import caldav import caldav
from datetime import datetime from datetime import datetime, date
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any, Union
from dotenv import load_dotenv from dotenv import load_dotenv
import icalendar import icalendar
@@ -71,13 +71,14 @@ class CalDAVClient:
}) })
return results return results
def create_event(self, calendar_name: str, summary: str, start_time: datetime, end_time: datetime, description: str = "") -> str: def create_event(self, calendar_name: str, summary: str, start_time: Union[datetime, date], end_time: Union[datetime, date], description: str = "", recurrence: Optional[Dict] = None) -> str:
calendar = self._get_calendar(calendar_name) calendar = self._get_calendar(calendar_name)
event = calendar.save_event( event = calendar.save_event(
dtstart=start_time, dtstart=start_time,
dtend=end_time, dtend=end_time,
summary=summary, summary=summary,
description=description description=description,
rrule=recurrence
) )
# Re-parse to get the UID, though library might provide it # Re-parse to get the UID, though library might provide it
ical = icalendar.Calendar.from_ical(event.data) ical = icalendar.Calendar.from_ical(event.data)
@@ -86,7 +87,7 @@ class CalDAVClient:
return str(component.get("uid")) return str(component.get("uid"))
return "Event Created (UID Unknown)" return "Event Created (UID Unknown)"
def update_event(self, calendar_name: str, event_uid: str, summary: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, description: Optional[str] = None) -> bool: def update_event(self, calendar_name: str, event_uid: str, summary: Optional[str] = None, start_time: Optional[Union[datetime, date]] = None, end_time: Optional[Union[datetime, date]] = None, description: Optional[str] = None, recurrence: Optional[Dict] = None) -> bool:
calendar = self._get_calendar(calendar_name) calendar = self._get_calendar(calendar_name)
event = calendar.event_by_uid(event_uid) event = calendar.event_by_uid(event_uid)
@@ -103,10 +104,19 @@ class CalDAVClient:
component["description"] = description component["description"] = description
changed = True changed = True
if start_time: if start_time:
component["dtstart"] = icalendar.vDatetime(start_time) if isinstance(start_time, datetime):
component["dtstart"] = icalendar.vDatetime(start_time)
else:
component["dtstart"] = icalendar.vDate(start_time)
changed = True changed = True
if end_time: if end_time:
component["dtend"] = icalendar.vDatetime(end_time) if isinstance(end_time, datetime):
component["dtend"] = icalendar.vDatetime(end_time)
else:
component["dtend"] = icalendar.vDate(end_time)
changed = True
if recurrence:
component["rrule"] = icalendar.vRecur(recurrence)
changed = True changed = True
if changed: if changed:

View File

@@ -60,8 +60,19 @@ def list_events(calendar_name: Optional[str] = None, start_date: Optional[str] =
except Exception as e: except Exception as e:
return f"Error listing events: {str(e)}" return f"Error listing events: {str(e)}"
def parse_rrule(rrule_str: str) -> dict:
parts = rrule_str.split(';')
rrule = {}
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
if ',' in value:
value = value.split(',')
rrule[key] = value
return rrule
@mcp.tool() @mcp.tool()
def create_event(calendar_name: str, summary: str, start_time: str, end_time: str, description: str = "") -> str: def create_event(calendar_name: str, summary: str, start_time: str, end_time: str, description: str = "", all_day: bool = False, recurrence: Optional[str] = None) -> str:
""" """
Creates a new event. Creates a new event.
@@ -71,21 +82,29 @@ def create_event(calendar_name: str, summary: str, start_time: str, end_time: st
start_time: Start time (ISO format). start_time: Start time (ISO format).
end_time: End time (ISO format). end_time: End time (ISO format).
description: Event description. description: Event description.
all_day: Set to True for all-day events (start/end time will be treated as dates).
recurrence: RRULE string, e.g., "FREQ=DAILY;COUNT=10".
""" """
if not client: if not client:
return "Error: CalDAV client not initialized" return "Error: CalDAV client not initialized"
try: try:
dt_start = datetime.fromisoformat(start_time) if all_day:
dt_end = datetime.fromisoformat(end_time) dt_start = datetime.fromisoformat(start_time).date()
dt_end = datetime.fromisoformat(end_time).date()
else:
dt_start = datetime.fromisoformat(start_time)
dt_end = datetime.fromisoformat(end_time)
result = client.create_event(calendar_name, summary, dt_start, dt_end, description) rrule_dict = parse_rrule(recurrence) if recurrence else None
result = client.create_event(calendar_name, summary, dt_start, dt_end, description, rrule_dict)
return f"Event created: {result}" return f"Event created: {result}"
except Exception as e: except Exception as e:
return f"Error creating event: {str(e)}" return f"Error creating event: {str(e)}"
@mcp.tool() @mcp.tool()
def update_event(calendar_name: str, event_uid: str, summary: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, description: Optional[str] = None) -> str: def update_event(calendar_name: str, event_uid: str, summary: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, description: Optional[str] = None, all_day: Optional[bool] = None, recurrence: Optional[str] = None) -> str:
""" """
Updates an existing event. Updates an existing event.
@@ -96,15 +115,30 @@ def update_event(calendar_name: str, event_uid: str, summary: Optional[str] = No
start_time: New start time (ISO format, optional). start_time: New start time (ISO format, optional).
end_time: New end time (ISO format, optional). end_time: New end time (ISO format, optional).
description: New description (optional). description: New description (optional).
all_day: True/False to explicitly set type. If None, infers from input format (YYYY-MM-DD -> date).
recurrence: New RRULE string (optional).
""" """
if not client: if not client:
return "Error: CalDAV client not initialized" return "Error: CalDAV client not initialized"
try: try:
dt_start = datetime.fromisoformat(start_time) if start_time else None def parse_input_dt(s, is_all_day_flag):
dt_end = datetime.fromisoformat(end_time) if end_time else None dt = datetime.fromisoformat(s)
if is_all_day_flag is True:
return dt.date()
if is_all_day_flag is False:
return dt
# Inference: if length is 10 (YYYY-MM-DD), assume date
if len(s) == 10:
return dt.date()
return dt
dt_start = parse_input_dt(start_time, all_day) if start_time else None
dt_end = parse_input_dt(end_time, all_day) if end_time else None
success = client.update_event(calendar_name, event_uid, summary, dt_start, dt_end, description) rrule_dict = parse_rrule(recurrence) if recurrence else None
success = client.update_event(calendar_name, event_uid, summary, dt_start, dt_end, description, rrule_dict)
return "Event updated successfully" if success else "Event update failed (or no changes made)" return "Event updated successfully" if success else "Event update failed (or no changes made)"
except Exception as e: except Exception as e:
return f"Error updating event: {str(e)}" return f"Error updating event: {str(e)}"

7
test_rrule.py Normal file
View File

@@ -0,0 +1,7 @@
from icalendar import vRecur
rrule_dict_str = {'FREQ': 'WEEKLY', 'BYDAY': 'MO,WE'}
rrule_dict_list = {'FREQ': 'WEEKLY', 'BYDAY': ['MO', 'WE']}
print(f"String input result: {vRecur(rrule_dict_str).to_ical().decode('utf-8')}")
print(f"List input result: {vRecur(rrule_dict_list).to_ical().decode('utf-8')}")

18
test_rrule_fix.py Normal file
View File

@@ -0,0 +1,18 @@
from icalendar import vRecur
def parse_rrule(rrule_str: str) -> dict:
parts = rrule_str.split(';')
rrule = {}
for part in parts:
if '=' in part:
key, value = part.split('=', 1)
# Fix: split commas into list
if ',' in value:
value = value.split(',')
rrule[key] = value
return rrule
rrule_str = "FREQ=WEEKLY;BYDAY=MO,WE"
rrule_dict = parse_rrule(rrule_str)
print(f"Parsed dict: {rrule_dict}")
print(f"vRecur result: {vRecur(rrule_dict).to_ical().decode('utf-8')}")