' not in xml_str:
return None
response_str = \
'\n' + \
' \n' + \
' /calendars/' + nickname + '/\n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' HTTP/1.1 200 OK\n' + \
' \n' + \
' \n' + \
''
return response_str
def _dav_store_event(base_dir: str, nickname: str, domain: str,
event_list: [], http_prefix: str,
system_language: str) -> bool:
"""Stores a calendar event obtained via caldav PUT
"""
event_str = str(event_list)
if 'DTSTAMP:' not in event_str or \
'DTSTART:' not in event_str or \
'DTEND:' not in event_str:
return False
if 'STATUS:' not in event_str and 'DESCRIPTION:' not in event_str:
return False
timestamp = None
start_time = None
end_time = None
description = None
for line in event_list:
if line.startswith('DTSTAMP:'):
timestamp = line.split(':', 1)[1]
elif line.startswith('DTSTART:'):
start_time = line.split(':', 1)[1]
elif line.startswith('DTEND:'):
end_time = line.split(':', 1)[1]
elif line.startswith('SUMMARY:') or line.startswith('DESCRIPTION:'):
description = line.split(':', 1)[1]
elif line.startswith('LOCATION:'):
location = line.split(':', 1)[1]
if not timestamp or \
not start_time or \
not end_time or \
not description:
return False
if len(timestamp) < 15:
return False
if len(start_time) < 15:
return False
if len(end_time) < 15:
return False
# check that the description is valid
if is_filtered(base_dir, nickname, domain, description):
return False
# convert to the expected time format
published = _dav_date_from_string(timestamp)
if not published:
return False
start_time = _dav_date_from_string(start_time)
if not start_time:
return False
end_time = _dav_date_from_string(end_time)
if not end_time:
return False
post_id = ''
post_context = get_individual_post_context()
# create the status number from DTSTAMP
status_number, published = get_status_number(published)
# get the post id
actor = http_prefix + "://" + domain + "/users/" + nickname
actor2 = http_prefix + "://" + domain + "/@" + nickname
post_id = actor + "/statuses/" + status_number
next_str = post_id + "/replies?only_other_accounts=true&page=true"
content = \
'@' + nickname + \
'' + remove_html(description) + '
'
event_json = {
"@context": post_context,
"id": post_id + "/activity",
"type": "Create",
"actor": actor,
"published": published,
"to": [actor],
"cc": [],
"object": {
"id": post_id,
"conversation": post_id,
"type": "Note",
"summary": None,
"inReplyTo": None,
"published": published,
"url": actor + "/" + status_number,
"attributedTo": actor,
"to": [actor],
"cc": [],
"sensitive": False,
"atomUri": post_id,
"inReplyToAtomUri": None,
"commentsEnabled": False,
"rejectReplies": True,
"mediaType": "text/html",
"content": content,
"contentMap": {
system_language: content
},
"attachment": [],
"tag": [
{
"href": actor2,
"name": "@" + nickname + "@" + domain,
"type": "Mention"
},
{
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Event",
"name": content,
"startTime": start_time,
"endTime": end_time
}
],
"replies": {
"id": post_id + "/replies",
"type": "Collection",
"first": {
"type": "CollectionPage",
"next": next_str,
"partOf": post_id + "/replies",
"items": []
}
}
}
}
if location:
event_json['object']['tag'].append({
"@context": "https://www.w3.org/ns/activitystreams",
"type": "Place",
"name": location
})
handle = nickname + '@' + domain
outbox_dir = base_dir + '/accounts/' + handle + '/outbox'
if not os.path.isdir(outbox_dir):
return False
filename = outbox_dir + '/' + post_id.replace('/', '#') + '.json'
save_json(event_json, filename)
save_event_post(base_dir, handle, post_id, event_json)
return True
def _dav_update_recent_etags(etag: str, nickname: str,
recent_dav_etags: {}) -> None:
"""Updates the recent etags for each account
"""
# update the recent caldav etags for each account
if not recent_dav_etags.get(nickname):
recent_dav_etags[nickname] = [etag]
else:
# only keep a limited number of recent etags
while len(recent_dav_etags[nickname]) > 32:
recent_dav_etags[nickname].pop(0)
# append the etag to the recent list
if etag not in recent_dav_etags[nickname]:
recent_dav_etags[nickname].append(etag)
def dav_put_response(base_dir: str, nickname: str, domain: str,
depth: int, xml_str: str, http_prefix: str,
system_language: str,
recent_dav_etags: {}) -> str:
"""Returns the response to caldav PUT
"""
if '\n' not in xml_str:
return None
if 'BEGIN:VCALENDAR' not in xml_str or \
'END:VCALENDAR' not in xml_str:
return None
if 'BEGIN:VEVENT' not in xml_str or \
'END:VEVENT' not in xml_str:
return None
etag = md5(xml_str.encode('utf-8')).hexdigest()
if recent_dav_etags.get(nickname):
if etag in recent_dav_etags[nickname]:
return 'Not modified'
stored_count = 0
reading_event = False
lines_list = xml_str.split('\n')
event_list = []
for line in lines_list:
line = line.strip()
if not reading_event:
if line == 'BEGIN:VEVENT':
reading_event = True
event_list = []
else:
if line == 'END:VEVENT':
if event_list:
_dav_store_event(base_dir, nickname, domain,
event_list, http_prefix,
system_language)
stored_count += 1
reading_event = False
else:
event_list.append(line)
if stored_count == 0:
return None
_dav_update_recent_etags(etag, nickname, recent_dav_etags)
return 'ETag:' + etag
def dav_report_response(base_dir: str, nickname: str, domain: str,
depth: int, xml_str: str,
person_cache: {}, http_prefix: str,
curr_etag: str, recent_dav_etags: {},
domain_full: str, system_language: str) -> str:
"""Returns the response to caldav REPORT
"""
if '' not in xml_str:
if '' not in xml_str:
return None
if curr_etag:
if recent_dav_etags.get(nickname):
if curr_etag in recent_dav_etags[nickname]:
return "Not modified"
xml_str_lower = xml_str.lower()
query_start_time = None
query_end_time = None
if ':time-range' in xml_str_lower:
time_range_str = xml_str_lower.split(':time-range')[1]
if 'start=' in time_range_str and 'end=' in time_range_str:
start_time_str = time_range_str.split('start=')[1]
if start_time_str.startswith("'"):
query_start_time_str = start_time_str.split("'")[1]
query_start_time = _dav_date_from_string(query_start_time_str)
elif start_time_str.startswith('"'):
query_start_time_str = start_time_str.split('"')[1]
query_start_time = _dav_date_from_string(query_start_time_str)
end_time_str = time_range_str.split('end=')[1]
if end_time_str.startswith("'"):
query_end_time_str = end_time_str.split("'")[1]
query_end_time = _dav_date_from_string(query_end_time_str)
elif end_time_str.startswith('"'):
query_end_time_str = end_time_str.split('"')[1]
query_end_time = _dav_date_from_string(query_end_time_str)
text_match = ''
if ':text-match' in xml_str_lower:
match_str = xml_str_lower.split(':text-match')[1]
if '>' in match_str and '<' in match_str:
text_match = match_str.split('>')[1]
if '<' in text_match:
text_match = text_match.split('<')[0]
else:
text_match = ''
ical_events = None
etag = None
events_href = ''
responses = ''
search_date = datetime.now()
if query_start_time and query_end_time:
query_start_year = int(query_start_time.split('-')[0])
query_start_month = int(query_start_time.split('-')[1])
query_start_day = query_start_time.split('-')[2]
query_start_day = int(query_start_day.split('T')[0])
query_end_year = int(query_end_time.split('-')[0])
query_end_month = int(query_end_time.split('-')[1])
query_end_day = query_end_time.split('-')[2]
query_end_day = int(query_end_day.split('T')[0])
if query_start_year == query_end_year and \
query_start_month == query_end_month:
if query_start_day == query_end_day:
# calendar for one day
search_date = \
datetime(year=query_start_year,
month=query_start_month,
day=query_start_day)
ical_events = \
get_todays_events_icalendar(base_dir, nickname, domain,
search_date.year,
search_date.month,
search_date.day, person_cache,
http_prefix, text_match,
system_language)
events_href = \
http_prefix + '://' + domain_full + '/users/' + \
nickname + '/calendar?year=' + \
str(search_date.year) + '?month=' + \
str(search_date.month) + '?day=' + str(search_date.day)
if ical_events:
if 'VEVENT' in ical_events:
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' \n' + \
' ' + events_href + \
'\n' + \
' \n' + \
' \n' + \
' "' + \
etag + '"\n' + \
' ' + \
ical_events + \
' \n' + \
' \n' + \
' HTTP/1.1 200 OK' + \
'\n' + \
' \n' + \
' \n'
elif query_start_day == 1 and query_start_day >= 28:
# calendar for a month
ical_events = \
get_month_events_icalendar(base_dir, nickname, domain,
query_start_year,
query_start_month,
person_cache,
http_prefix,
text_match)
events_href = \
http_prefix + '://' + domain_full + '/users/' + \
nickname + '/calendar?year=' + \
str(query_start_year) + '?month=' + \
str(query_start_month)
if ical_events:
if 'VEVENT' in ical_events:
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' \n' + \
' ' + events_href + \
'\n' + \
' \n' + \
' \n' + \
' "' + \
etag + '"\n' + \
' ' + \
ical_events + \
' \n' + \
' \n' + \
' HTTP/1.1 200 OK' + \
'\n' + \
' \n' + \
' \n'
if not responses:
all_events = ''
for year in range(query_start_year, query_end_year+1):
if query_start_year == query_end_year:
start_month_number = query_start_month
end_month_number = query_end_month
elif year == query_end_year:
start_month_number = 1
end_month_number = query_end_month
elif year == query_start_year:
start_month_number = query_start_month
end_month_number = 12
else:
start_month_number = 1
end_month_number = 12
for month in range(start_month_number, end_month_number+1):
ical_events = \
get_month_events_icalendar(base_dir,
nickname, domain,
year, month,
person_cache,
http_prefix,
text_match)
events_href = \
http_prefix + '://' + domain_full + '/users/' + \
nickname + '/calendar?year=' + \
str(year) + '?month=' + \
str(month)
if ical_events:
if 'VEVENT' in ical_events:
all_events += ical_events
ical_events_encoded = ical_events.encode('utf-8')
local_etag = md5(ical_events_encoded).hexdigest()
responses += \
' \n' + \
' ' + events_href + \
'\n' + \
' \n' + \
' \n' + \
' "' + \
local_etag + '"\n' + \
' ' + \
ical_events + \
' \n' + \
' \n' + \
' HTTP/1.1 200 OK' + \
'\n' + \
' \n' + \
' \n'
ical_events_encoded = all_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
# today's calendar events
if not ical_events:
ical_events = \
get_todays_events_icalendar(base_dir, nickname, domain,
search_date.year, search_date.month,
search_date.day, person_cache,
http_prefix, text_match,
system_language)
events_href = \
http_prefix + '://' + domain_full + '/users/' + \
nickname + '/calendar?year=' + \
str(search_date.year) + '?month=' + \
str(search_date.month) + '?day=' + str(search_date.day)
if ical_events:
if 'VEVENT' in ical_events:
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' \n' + \
' ' + events_href + '\n' + \
' \n' + \
' \n' + \
' "' + etag + \
'"\n' + \
' ' + ical_events + \
' \n' + \
' \n' + \
' HTTP/1.1 200 OK\n' + \
' \n' + \
' \n'
if not ical_events or not etag:
return None
if 'VEVENT' not in ical_events:
return None
if etag == curr_etag:
return "Not modified"
response_str = \
'\n' + \
'\n' + \
responses + ''
_dav_update_recent_etags(etag, nickname, recent_dav_etags)
return response_str
def dav_delete_response(base_dir: str, nickname: str, domain: str,
depth: int, path: str,
http_prefix: str, debug: bool,
recent_posts_cache: {}) -> str:
"""Returns the response to caldav DELETE
"""
token = path.split('/calendars/' + nickname + '/')[1]
token_year, token_month_number, token_post_id = \
_dav_decode_token(token)
if not token_year:
return None
post_filename = locate_post(base_dir, nickname, domain, token_post_id)
if not post_filename:
print('Calendar post not found ' + token_post_id)
return None
post_json_object = load_json(post_filename)
if not _is_happening_post(post_json_object):
print(token_post_id + ' is not a calendar post')
return None
remove_calendar_event(base_dir, nickname, domain,
token_year, token_month_number,
token_post_id)
delete_post(base_dir, http_prefix,
nickname, domain, post_filename,
debug, recent_posts_cache, True)
return 'Ok'
def dav_month_via_server(session, http_prefix: str,
nickname: str, domain: str, port: int,
debug: bool,
year: int, month: int,
password: str) -> str:
"""Gets the icalendar for a month via caldav
"""
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/xml',
'Authorization': auth_header
}
domain_full = get_full_domain(domain, port)
params = {}
url = http_prefix + '://' + domain_full + '/calendars/' + nickname
month_str = str(month)
if month < 10:
month_str = '0' + month_str
xml_str = \
'\n' + \
'\n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
''
result = \
get_method("REPORT", xml_str, session, url, params, headers, debug)
return result
def dav_day_via_server(session, http_prefix: str,
nickname: str, domain: str, port: int,
debug: bool,
year: int, month: int, day: int,
password: str) -> str:
"""Gets the icalendar for a day via caldav
"""
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/xml',
'Authorization': auth_header
}
domain_full = get_full_domain(domain, port)
params = {}
url = http_prefix + '://' + domain_full + '/calendars/' + nickname
month_str = str(month)
if month < 10:
month_str = '0' + month_str
day_str = str(day)
if day < 10:
day_str = '0' + day_str
xml_str = \
'\n' + \
'\n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
' \n' + \
''
result = \
get_method("REPORT", xml_str, session, url, params, headers, debug)
return result