Unit test for caldav report

main
Bob Mottram 2022-02-24 16:55:37 +00:00
parent 3aac49eb64
commit 44ba3bf35c
5 changed files with 60 additions and 21 deletions

29
auth.py
View File

@ -94,16 +94,25 @@ def authorize_basic(base_dir: str, path: str, auth_header: str,
'contain a space character')
return False
if not has_users_path(path):
if debug:
print('DEBUG: basic auth - ' +
'path for Authorization does not contain a user')
return False
path_users_section = path.split('/users/')[1]
if '/' not in path_users_section:
if debug:
print('DEBUG: basic auth - this is not a users endpoint')
return False
nickname_from_path = path_users_section.split('/')[0]
if not path.startswith('/calendars/'):
if debug:
print('DEBUG: basic auth - ' +
'path for Authorization does not contain a user')
return False
if path.startswith('/calendars/'):
path_users_section = path.split('/calendars/')[1]
nickname_from_path = path_users_section
if '/' in nickname_from_path:
nickname_from_path = nickname_from_path.split('/')[0]
if '?' in nickname_from_path:
nickname_from_path = nickname_from_path.split('?')[0]
else:
path_users_section = path.split('/users/')[1]
if '/' not in path_users_section:
if debug:
print('DEBUG: basic auth - this is not a users endpoint')
return False
nickname_from_path = path_users_section.split('/')[0]
if is_system_account(nickname_from_path):
print('basic auth - attempted login using system account ' +
nickname_from_path + ' in path')

View File

@ -16832,13 +16832,15 @@ class PubServer(BaseHTTPRequestHandler):
'_GET', 'end benchmarks',
self.server.debug)
def _dav_handler(self, endpoint_type: str):
def _dav_handler(self, endpoint_type: str, debug: bool):
calling_domain = self.server.domain_full
if not self._has_accept(calling_domain):
self._400()
return
accept_str = self.headers['Accept']
if 'application/xml' not in accept_str:
if debug:
print(endpoint_type.upper() + ' is not of xml type')
self._400()
return
if not self.headers.get('Content-length'):
@ -16855,6 +16857,8 @@ class PubServer(BaseHTTPRequestHandler):
print(endpoint_type.upper() + ' without /calendars ' + self.path)
self._404()
return
if debug:
print(endpoint_type.upper() + ' checking authorization')
if not self._is_authorized():
print(endpoint_type.upper() + ' not authorized')
self._403()
@ -16955,16 +16959,16 @@ class PubServer(BaseHTTPRequestHandler):
self._200()
def do_PROPFIND(self):
self._dav_handler('propfind')
self._dav_handler('propfind', self.server.debug)
def do_PUT(self):
self._dav_handler('put')
self._dav_handler('put', self.server.debug)
def do_REPORT(self):
self._dav_handler('report')
self._dav_handler('report', self.server.debug)
def do_DELETE(self):
self._dav_handler('delete')
self._dav_handler('delete', self.server.debug)
def do_HEAD(self):
calling_domain = self.server.domain_full

View File

@ -975,7 +975,7 @@ def dav_put_response(base_dir: str, nickname: str, domain: str,
'END:VEVENT' not in xml_str:
return None
etag = md5(xml_str).hexdigest()
etag = md5(xml_str.encode('utf-8')).hexdigest()
if recent_dav_etags.get(nickname):
if etag in recent_dav_etags[nickname]:
return 'Not modified'
@ -1091,7 +1091,8 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
str(search_date.month) + '?day=' + str(search_date.day)
if ical_events:
if 'VEVENT' in ical_events:
etag = md5(ical_events).hexdigest()
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' <d:response>\n' + \
' <d:href>' + events_href + \
@ -1124,7 +1125,8 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
str(query_start_month)
if ical_events:
if 'VEVENT' in ical_events:
etag = md5(ical_events).hexdigest()
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' <d:response>\n' + \
' <d:href>' + events_href + \
@ -1172,6 +1174,8 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
if ical_events:
if 'VEVENT' in ical_events:
all_events += ical_events
ical_events_encoded = ical_events.encode('utf-8')
local_etag = md5(ical_events_encoded).hexdigest()
responses += \
' <d:response>\n' + \
' <d:href>' + events_href + \
@ -1179,7 +1183,7 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
' <d:propstat>\n' + \
' <d:prop>\n' + \
' <d:getetag>"' + \
etag + '"</d:getetag>\n' + \
local_etag + '"</d:getetag>\n' + \
' <c:calendar-data>' + \
ical_events + \
' </c:calendar-data>\n' + \
@ -1188,7 +1192,8 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
'</d:status>\n' + \
' </d:propstat>\n' + \
' </d:response>\n'
etag = md5(all_events).hexdigest()
ical_events_encoded = all_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
# today's calendar events
if not ical_events:
@ -1204,7 +1209,8 @@ def dav_report_response(base_dir: str, nickname: str, domain: str,
str(search_date.month) + '?day=' + str(search_date.day)
if ical_events:
if 'VEVENT' in ical_events:
etag = md5(ical_events).hexdigest()
ical_events_encoded = ical_events.encode('utf-8')
etag = md5(ical_events_encoded).hexdigest()
responses = \
' <d:response>\n' + \
' <d:href>' + events_href + '</d:href>\n' + \

View File

@ -689,6 +689,8 @@ def get_method(method_name: str, xml_str: str,
headers = {
'Accept': 'application/xml'
}
else:
headers['Accept'] = 'application/xml'
session_params = {}
session_headers = {}
if headers:

View File

@ -176,6 +176,8 @@ from shares import send_share_via_server
from shares import get_shared_items_catalog_via_server
from blocking import load_cw_lists
from blocking import add_cw_from_lists
from happening import dav_month_via_server
TEST_SERVER_GROUP_RUNNING = False
TEST_SERVER_ALICE_RUNNING = False
@ -3172,6 +3174,22 @@ def test_client_to_server(base_dir: str):
show_test_boxes('bob', bob_inbox_path, bob_outbox_path)
assert len([name for name in os.listdir(alice_inbox_path)
if os.path.isfile(os.path.join(alice_inbox_path, name))]) == 0
print('\n\nEVENT: Bob checks his calendar via caldav')
if os.path.isfile(bob_dir + '/basic_auth_fail.txt'):
os.remove(bob_dir + '/basic_auth_fail.txt')
result = \
dav_month_via_server(session_bob, http_prefix,
'bob', bob_domain, bob_port, True,
test_date.year, test_date.month,
'bobpass')
print('response: ' + str(result))
if os.path.isfile(bob_dir + '/basic_auth_fail.txt'):
with open(bob_dir + '/basic_auth_fail.txt', 'r') as fp_fail:
print(fp_fail.read())
assert 'VCALENDAR' in str(result)
assert 'VEVENT' in str(result)
print('\n\nEVENT: Bob likes the post')
send_like_via_server(bob_dir, session_bob,
'bob', 'bobpass',