Snake case

merge-requests/30/head
Bob Mottram 2021-12-31 10:13:21 +00:00
parent 9dc9595853
commit 59c88dd2e0
1 changed files with 159 additions and 163 deletions

View File

@ -53,47 +53,45 @@ def create_session(proxy_type: str):
return session return session
def url_exists(session, url: str, timeoutSec: int = 3, def url_exists(session, url: str, timeout_sec: int = 3,
http_prefix: str = 'https', domain: str = 'testdomain') -> bool: http_prefix: str = 'https', domain: str = 'testdomain') -> bool:
if not isinstance(url, str): if not isinstance(url, str):
print('url: ' + str(url)) print('url: ' + str(url))
print('ERROR: url_exists failed, url should be a string') print('ERROR: url_exists failed, url should be a string')
return False return False
sessionParams = {} session_params = {}
sessionHeaders = {} session_headers = {}
sessionHeaders['User-Agent'] = 'Epicyon/' + __version__ session_headers['User-Agent'] = 'Epicyon/' + __version__
if domain: if domain:
sessionHeaders['User-Agent'] += \ session_headers['User-Agent'] += \
'; +' + http_prefix + '://' + domain + '/' '; +' + http_prefix + '://' + domain + '/'
if not session: if not session:
print('WARN: url_exists failed, no session specified') print('WARN: url_exists failed, no session specified')
return True return True
try: try:
result = session.get(url, headers=sessionHeaders, result = session.get(url, headers=session_headers,
params=sessionParams, params=session_params,
timeout=timeoutSec) timeout=timeout_sec)
if result: if result:
if result.status_code == 200 or \ if result.status_code == 200 or \
result.status_code == 304: result.status_code == 304:
return True return True
else: print('url_exists for ' + url + ' returned ' +
print('url_exists for ' + url + ' returned ' + str(result.status_code))
str(result.status_code))
except BaseException: except BaseException:
print('EX: url_exists GET failed ' + str(url)) print('EX: url_exists GET failed ' + str(url))
pass
return False return False
def _get_json_request(session, url: str, domain_full: str, sessionHeaders: {}, def _get_json_request(session, url: str, domain_full: str, session_headers: {},
sessionParams: {}, timeoutSec: int, session_params: {}, timeout_sec: int,
signing_priv_key_pem: str, quiet: bool, debug: bool, signing_priv_key_pem: str, quiet: bool, debug: bool,
returnJson: bool) -> {}: return_json: bool) -> {}:
"""http GET for json """http GET for json
""" """
try: try:
result = session.get(url, headers=sessionHeaders, result = session.get(url, headers=session_headers,
params=sessionParams, timeout=timeoutSec) params=session_params, timeout=timeout_sec)
if result.status_code != 200: if result.status_code != 200:
if result.status_code == 401: if result.status_code == 401:
print("WARN: get_json " + url + ' rejected by secure mode') print("WARN: get_json " + url + ' rejected by secure mode')
@ -107,26 +105,26 @@ def _get_json_request(session, url: str, domain_full: str, sessionHeaders: {},
print('WARN: get_json url: ' + url + print('WARN: get_json url: ' + url +
' failed with error code ' + ' failed with error code ' +
str(result.status_code) + str(result.status_code) +
' headers: ' + str(sessionHeaders)) ' headers: ' + str(session_headers))
if returnJson: if return_json:
return result.json() return result.json()
return result.content return result.content
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
sessionHeaders2 = sessionHeaders.copy() session_headers2 = session_headers.copy()
if sessionHeaders2.get('Authorization'): if session_headers2.get('Authorization'):
sessionHeaders2['Authorization'] = 'REDACTED' session_headers2['Authorization'] = 'REDACTED'
if debug and not quiet: if debug and not quiet:
print('ERROR: get_json failed, url: ' + str(url) + ', ' + print('ERROR: get_json failed, url: ' + str(url) + ', ' +
'headers: ' + str(sessionHeaders2) + ', ' + 'headers: ' + str(session_headers2) + ', ' +
'params: ' + str(sessionParams) + ', ' + str(ex)) 'params: ' + str(session_params) + ', ' + str(ex))
except ValueError as ex: except ValueError as ex:
sessionHeaders2 = sessionHeaders.copy() session_headers2 = session_headers.copy()
if sessionHeaders2.get('Authorization'): if session_headers2.get('Authorization'):
sessionHeaders2['Authorization'] = 'REDACTED' session_headers2['Authorization'] = 'REDACTED'
if debug and not quiet: if debug and not quiet:
print('ERROR: get_json failed, url: ' + str(url) + ', ' + print('ERROR: get_json failed, url: ' + str(url) + ', ' +
'headers: ' + str(sessionHeaders2) + ', ' + 'headers: ' + str(session_headers2) + ', ' +
'params: ' + str(sessionParams) + ', ' + str(ex)) 'params: ' + str(session_params) + ', ' + str(ex))
except SocketError as ex: except SocketError as ex:
if not quiet: if not quiet:
if ex.errno == errno.ECONNRESET: if ex.errno == errno.ECONNRESET:
@ -135,8 +133,8 @@ def _get_json_request(session, url: str, domain_full: str, sessionHeaders: {},
return None return None
def _get_json_signed(session, url: str, domain_full: str, sessionHeaders: {}, def _get_json_signed(session, url: str, domain_full: str, session_headers: {},
sessionParams: {}, timeoutSec: int, session_params: {}, timeout_sec: int,
signing_priv_key_pem: str, quiet: bool, signing_priv_key_pem: str, quiet: bool,
debug: bool) -> {}: debug: bool) -> {}:
"""Authorized fetch - a signed version of GET """Authorized fetch - a signed version of GET
@ -149,9 +147,9 @@ def _get_json_signed(session, url: str, domain_full: str, sessionHeaders: {},
print('Invalid url: ' + url) print('Invalid url: ' + url)
return None return None
http_prefix = url.split('://')[0] http_prefix = url.split('://')[0]
toDomainFull = url.split('://')[1] to_domain_full = url.split('://')[1]
if '/' in toDomainFull: if '/' in to_domain_full:
toDomainFull = toDomainFull.split('/')[0] to_domain_full = to_domain_full.split('/')[0]
if ':' in domain_full: if ':' in domain_full:
domain = domain_full.split(':')[0] domain = domain_full.split(':')[0]
@ -163,72 +161,72 @@ def _get_json_signed(session, url: str, domain_full: str, sessionHeaders: {},
else: else:
port = 80 port = 80
if ':' in toDomainFull: if ':' in to_domain_full:
toDomain = toDomainFull.split(':')[0] to_domain = to_domain_full.split(':')[0]
toPort = toDomainFull.split(':')[1] to_port = to_domain_full.split(':')[1]
else: else:
toDomain = toDomainFull to_domain = to_domain_full
if http_prefix == 'https': if http_prefix == 'https':
toPort = 443 to_port = 443
else: else:
toPort = 80 to_port = 80
if debug: if debug:
print('Signed GET domain: ' + domain + ' ' + str(port)) print('Signed GET domain: ' + domain + ' ' + str(port))
print('Signed GET toDomain: ' + toDomain + ' ' + str(toPort)) print('Signed GET to_domain: ' + to_domain + ' ' + str(to_port))
print('Signed GET url: ' + url) print('Signed GET url: ' + url)
print('Signed GET http_prefix: ' + http_prefix) print('Signed GET http_prefix: ' + http_prefix)
messageStr = '' message_str = ''
withDigest = False with_digest = False
if toDomainFull + '/' in url: if to_domain_full + '/' in url:
path = '/' + url.split(toDomainFull + '/')[1] path = '/' + url.split(to_domain_full + '/')[1]
else: else:
path = '/actor' path = '/actor'
content_type = 'application/activity+json' content_type = 'application/activity+json'
if sessionHeaders.get('Accept'): if session_headers.get('Accept'):
content_type = sessionHeaders['Accept'] content_type = session_headers['Accept']
signatureHeaderJson = \ signature_header_json = \
create_signed_header(None, signing_priv_key_pem, 'actor', domain, port, create_signed_header(None, signing_priv_key_pem, 'actor', domain, port,
toDomain, toPort, path, http_prefix, withDigest, to_domain, to_port, path, http_prefix,
messageStr, content_type) with_digest, message_str, content_type)
if debug: if debug:
print('Signed GET signatureHeaderJson ' + str(signatureHeaderJson)) print('Signed GET signature_header_json ' + str(signature_header_json))
# update the session headers from the signature headers # update the session headers from the signature headers
sessionHeaders['Host'] = signatureHeaderJson['host'] session_headers['Host'] = signature_header_json['host']
sessionHeaders['Date'] = signatureHeaderJson['date'] session_headers['Date'] = signature_header_json['date']
sessionHeaders['Accept'] = signatureHeaderJson['accept'] session_headers['Accept'] = signature_header_json['accept']
sessionHeaders['Signature'] = signatureHeaderJson['signature'] session_headers['Signature'] = signature_header_json['signature']
sessionHeaders['Content-Length'] = '0' session_headers['Content-Length'] = '0'
if debug: if debug:
print('Signed GET sessionHeaders ' + str(sessionHeaders)) print('Signed GET session_headers ' + str(session_headers))
returnJson = True return_json = True
if 'json' not in content_type: if 'json' not in content_type:
returnJson = False return_json = False
return _get_json_request(session, url, domain_full, sessionHeaders, return _get_json_request(session, url, domain_full, session_headers,
sessionParams, timeoutSec, None, quiet, session_params, timeout_sec, None, quiet,
debug, returnJson) debug, return_json)
def get_json(signing_priv_key_pem: str, def get_json(signing_priv_key_pem: str,
session, url: str, headers: {}, params: {}, debug: bool, session, url: str, headers: {}, params: {}, debug: bool,
version: str = '1.2.0', http_prefix: str = 'https', version: str = '1.2.0', http_prefix: str = 'https',
domain: str = 'testdomain', domain: str = 'testdomain',
timeoutSec: int = 20, quiet: bool = False) -> {}: timeout_sec: int = 20, quiet: bool = False) -> {}:
if not isinstance(url, str): if not isinstance(url, str):
if debug and not quiet: if debug and not quiet:
print('url: ' + str(url)) print('url: ' + str(url))
print('ERROR: get_json failed, url should be a string') print('ERROR: get_json failed, url should be a string')
return None return None
sessionParams = {} session_params = {}
sessionHeaders = {} session_headers = {}
if headers: if headers:
sessionHeaders = headers session_headers = headers
if params: if params:
sessionParams = params session_params = params
sessionHeaders['User-Agent'] = 'Epicyon/' + version session_headers['User-Agent'] = 'Epicyon/' + version
if domain: if domain:
sessionHeaders['User-Agent'] += \ session_headers['User-Agent'] += \
'; +' + http_prefix + '://' + domain + '/' '; +' + http_prefix + '://' + domain + '/'
if not session: if not session:
if not quiet: if not quiet:
@ -240,35 +238,34 @@ def get_json(signing_priv_key_pem: str,
if signing_priv_key_pem: if signing_priv_key_pem:
return _get_json_signed(session, url, domain, return _get_json_signed(session, url, domain,
sessionHeaders, sessionParams, session_headers, session_params,
timeoutSec, signing_priv_key_pem, timeout_sec, signing_priv_key_pem,
quiet, debug) quiet, debug)
else: return _get_json_request(session, url, domain, session_headers,
return _get_json_request(session, url, domain, sessionHeaders, session_params, timeout_sec,
sessionParams, timeoutSec, None, quiet, debug, True)
None, quiet, debug, True)
def download_html(signing_priv_key_pem: str, def download_html(signing_priv_key_pem: str,
session, url: str, headers: {}, params: {}, debug: bool, session, url: str, headers: {}, params: {}, debug: bool,
version: str = '1.2.0', http_prefix: str = 'https', version: str = '1.2.0', http_prefix: str = 'https',
domain: str = 'testdomain', domain: str = 'testdomain',
timeoutSec: int = 20, quiet: bool = False) -> {}: timeout_sec: int = 20, quiet: bool = False) -> {}:
if not isinstance(url, str): if not isinstance(url, str):
if debug and not quiet: if debug and not quiet:
print('url: ' + str(url)) print('url: ' + str(url))
print('ERROR: download_html failed, url should be a string') print('ERROR: download_html failed, url should be a string')
return None return None
sessionParams = {} session_params = {}
sessionHeaders = {} session_headers = {}
if headers: if headers:
sessionHeaders = headers session_headers = headers
if params: if params:
sessionParams = params session_params = params
sessionHeaders['Accept'] = 'text/html' session_headers['Accept'] = 'text/html'
sessionHeaders['User-Agent'] = 'Epicyon/' + version session_headers['User-Agent'] = 'Epicyon/' + version
if domain: if domain:
sessionHeaders['User-Agent'] += \ session_headers['User-Agent'] += \
'; +' + http_prefix + '://' + domain + '/' '; +' + http_prefix + '://' + domain + '/'
if not session: if not session:
if not quiet: if not quiet:
@ -281,46 +278,45 @@ def download_html(signing_priv_key_pem: str,
if signing_priv_key_pem: if signing_priv_key_pem:
return _get_json_signed(session, url, domain, return _get_json_signed(session, url, domain,
sessionHeaders, sessionParams, session_headers, session_params,
timeoutSec, signing_priv_key_pem, timeout_sec, signing_priv_key_pem,
quiet, debug) quiet, debug)
else: return _get_json_request(session, url, domain, session_headers,
return _get_json_request(session, url, domain, sessionHeaders, session_params, timeout_sec,
sessionParams, timeoutSec, None, quiet, debug, False)
None, quiet, debug, False)
def post_json(http_prefix: str, domain_full: str, def post_json(http_prefix: str, domain_full: str,
session, post_json_object: {}, federation_list: [], session, post_json_object: {}, federation_list: [],
inboxUrl: str, headers: {}, timeoutSec: int = 60, inbox_url: str, headers: {}, timeout_sec: int = 60,
quiet: bool = False) -> str: quiet: bool = False) -> str:
"""Post a json message to the inbox of another person """Post a json message to the inbox of another person
""" """
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
if not url_permitted(inboxUrl, federation_list): if not url_permitted(inbox_url, federation_list):
if not quiet: if not quiet:
print('post_json: ' + inboxUrl + ' not permitted') print('post_json: ' + inbox_url + ' not permitted')
return None return None
sessionHeaders = headers session_headers = headers
sessionHeaders['User-Agent'] = 'Epicyon/' + __version__ session_headers['User-Agent'] = 'Epicyon/' + __version__
sessionHeaders['User-Agent'] += \ session_headers['User-Agent'] += \
'; +' + http_prefix + '://' + domain_full + '/' '; +' + http_prefix + '://' + domain_full + '/'
try: try:
postResult = \ post_result = \
session.post(url=inboxUrl, session.post(url=inbox_url,
data=json.dumps(post_json_object), data=json.dumps(post_json_object),
headers=headers, timeout=timeoutSec) headers=headers, timeout=timeout_sec)
except requests.Timeout as ex: except requests.Timeout as ex:
if not quiet: if not quiet:
print('ERROR: post_json timeout ' + inboxUrl + ' ' + print('ERROR: post_json timeout ' + inbox_url + ' ' +
json.dumps(post_json_object) + ' ' + str(headers)) json.dumps(post_json_object) + ' ' + str(headers))
print(ex) print(ex)
return '' return ''
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
if not quiet: if not quiet:
print('ERROR: post_json requests failed ' + inboxUrl + ' ' + print('ERROR: post_json requests failed ' + inbox_url + ' ' +
json.dumps(post_json_object) + ' ' + str(headers) + json.dumps(post_json_object) + ' ' + str(headers) +
' ' + str(ex)) ' ' + str(ex))
return None return None
@ -330,32 +326,32 @@ def post_json(http_prefix: str, domain_full: str,
return None return None
except ValueError as ex: except ValueError as ex:
if not quiet: if not quiet:
print('ERROR: post_json failed ' + inboxUrl + ' ' + print('ERROR: post_json failed ' + inbox_url + ' ' +
json.dumps(post_json_object) + ' ' + str(headers) + json.dumps(post_json_object) + ' ' + str(headers) +
' ' + str(ex)) ' ' + str(ex))
return None return None
if postResult: if post_result:
return postResult.text return post_result.text
return None return None
def post_json_string(session, post_jsonStr: str, def post_json_string(session, post_jsonStr: str,
federation_list: [], federation_list: [],
inboxUrl: str, inbox_url: str,
headers: {}, headers: {},
debug: bool, debug: bool,
timeoutSec: int = 30, timeout_sec: int = 30,
quiet: bool = False) -> (bool, bool, int): quiet: bool = False) -> (bool, bool, int):
"""Post a json message string to the inbox of another person """Post a json message string to the inbox of another person
The second boolean returned is true if the send is unauthorized The second boolean returned is true if the send if unauthorized
NOTE: Here we post a string rather than the original json so that NOTE: Here we post a string rather than the original json so that
conversions between string and json format don't invalidate conversions between string and json format don't invalidate
the message body digest of http signatures the message body digest of http signatures
""" """
try: try:
postResult = \ post_result = \
session.post(url=inboxUrl, data=post_jsonStr, session.post(url=inbox_url, data=post_jsonStr,
headers=headers, timeout=timeoutSec) headers=headers, timeout=timeout_sec)
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
if not quiet: if not quiet:
print('WARN: error during post_json_string requests ' + str(ex)) print('WARN: error during post_json_string requests ' + str(ex))
@ -364,78 +360,78 @@ def post_json_string(session, post_jsonStr: str,
if not quiet and ex.errno == errno.ECONNRESET: if not quiet and ex.errno == errno.ECONNRESET:
print('WARN: connection was reset during post_json_string') print('WARN: connection was reset during post_json_string')
if not quiet: if not quiet:
print('ERROR: post_json_string failed ' + inboxUrl + ' ' + print('ERROR: post_json_string failed ' + inbox_url + ' ' +
post_jsonStr + ' ' + str(headers)) post_jsonStr + ' ' + str(headers))
return None, None, 0 return None, None, 0
except ValueError as ex: except ValueError as ex:
if not quiet: if not quiet:
print('WARN: error during post_json_string ' + str(ex)) print('WARN: error during post_json_string ' + str(ex))
return None, None, 0 return None, None, 0
if postResult.status_code < 200 or postResult.status_code > 202: if post_result.status_code < 200 or post_result.status_code > 202:
if postResult.status_code >= 400 and \ if post_result.status_code >= 400 and \
postResult.status_code <= 405 and \ post_result.status_code <= 405 and \
postResult.status_code != 404: post_result.status_code != 404:
if not quiet: if not quiet:
print('WARN: Post to ' + inboxUrl + print('WARN: Post to ' + inbox_url +
' is unauthorized. Code ' + ' is unauthorized. Code ' +
str(postResult.status_code)) str(post_result.status_code))
return False, True, postResult.status_code return False, True, post_result.status_code
else:
if not quiet: if not quiet:
print('WARN: Failed to post to ' + inboxUrl + print('WARN: Failed to post to ' + inbox_url +
' with headers ' + str(headers) + ' with headers ' + str(headers) +
' status code ' + str(postResult.status_code)) ' status code ' + str(post_result.status_code))
return False, False, postResult.status_code return False, False, post_result.status_code
return True, False, 0 return True, False, 0
def post_image(session, attachImageFilename: str, federation_list: [], def post_image(session, attach_image_filename: str, federation_list: [],
inboxUrl: str, headers: {}) -> str: inbox_url: str, headers: {}) -> str:
"""Post an image to the inbox of another person or outbox via c2s """Post an image to the inbox of another person or outbox via c2s
""" """
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
if not url_permitted(inboxUrl, federation_list): if not url_permitted(inbox_url, federation_list):
print('post_json: ' + inboxUrl + ' not permitted') print('post_json: ' + inbox_url + ' not permitted')
return None return None
if not is_image_file(attachImageFilename): if not is_image_file(attach_image_filename):
print('Image must be png, jpg, webp, avif, gif or svg') print('Image must be png, jpg, webp, avif, gif or svg')
return None return None
if not os.path.isfile(attachImageFilename): if not os.path.isfile(attach_image_filename):
print('Image not found: ' + attachImageFilename) print('Image not found: ' + attach_image_filename)
return None return None
content_type = 'image/jpeg' content_type = 'image/jpeg'
if attachImageFilename.endswith('.png'): if attach_image_filename.endswith('.png'):
content_type = 'image/png' content_type = 'image/png'
elif attachImageFilename.endswith('.gif'): elif attach_image_filename.endswith('.gif'):
content_type = 'image/gif' content_type = 'image/gif'
elif attachImageFilename.endswith('.webp'): elif attach_image_filename.endswith('.webp'):
content_type = 'image/webp' content_type = 'image/webp'
elif attachImageFilename.endswith('.avif'): elif attach_image_filename.endswith('.avif'):
content_type = 'image/avif' content_type = 'image/avif'
elif attachImageFilename.endswith('.svg'): elif attach_image_filename.endswith('.svg'):
content_type = 'image/svg+xml' content_type = 'image/svg+xml'
headers['Content-type'] = content_type headers['Content-type'] = content_type
with open(attachImageFilename, 'rb') as avFile: with open(attach_image_filename, 'rb') as av_file:
mediaBinary = avFile.read() media_binary = av_file.read()
try: try:
postResult = session.post(url=inboxUrl, data=mediaBinary, post_result = session.post(url=inbox_url, data=media_binary,
headers=headers) headers=headers)
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
print('WARN: error during post_image requests ' + str(ex)) print('WARN: error during post_image requests ' + str(ex))
return None return None
except SocketError as ex: except SocketError as ex:
if ex.errno == errno.ECONNRESET: if ex.errno == errno.ECONNRESET:
print('WARN: connection was reset during post_image') print('WARN: connection was reset during post_image')
print('ERROR: post_image failed ' + inboxUrl + ' ' + print('ERROR: post_image failed ' + inbox_url + ' ' +
str(headers) + ' ' + str(ex)) str(headers) + ' ' + str(ex))
return None return None
except ValueError as ex: except ValueError as ex:
print('WARN: error during post_image ' + str(ex)) print('WARN: error during post_image ' + str(ex))
return None return None
if postResult: if post_result:
return postResult.text return post_result.text
return None return None
@ -448,7 +444,7 @@ def download_image(session, base_dir: str, url: str,
return None return None
# try different image types # try different image types
imageFormats = { image_formats = {
'png': 'png', 'png': 'png',
'jpg': 'jpeg', 'jpg': 'jpeg',
'jpeg': 'jpeg', 'jpeg': 'jpeg',
@ -458,16 +454,16 @@ def download_image(session, base_dir: str, url: str,
'avif': 'avif', 'avif': 'avif',
'ico': 'x-icon' 'ico': 'x-icon'
} }
sessionHeaders = None session_headers = None
for imFormat, mimeType in imageFormats.items(): for im_format, mime_type in image_formats.items():
if url.endswith('.' + imFormat) or \ if url.endswith('.' + im_format) or \
'.' + imFormat + '?' in url: '.' + im_format + '?' in url:
sessionHeaders = { session_headers = {
'Accept': 'image/' + mimeType 'Accept': 'image/' + mime_type
} }
break break
if not sessionHeaders: if not session_headers:
if debug: if debug:
print('download_image: no session headers') print('download_image: no session headers')
return False return False
@ -477,7 +473,7 @@ def download_image(session, base_dir: str, url: str,
if debug: if debug:
print('Downloading image url: ' + url) print('Downloading image url: ' + url)
result = session.get(url, result = session.get(url,
headers=sessionHeaders, headers=session_headers,
params=None) params=None)
if result.status_code < 200 or \ if result.status_code < 200 or \
result.status_code > 202: result.status_code > 202:
@ -492,29 +488,29 @@ def download_image(session, base_dir: str, url: str,
print('EX: download_image unable to delete ' + print('EX: download_image unable to delete ' +
image_filename) image_filename)
else: else:
with open(image_filename, 'wb') as f: with open(image_filename, 'wb') as im_file:
f.write(result.content) im_file.write(result.content)
if debug: if debug:
print('Image downloaded from ' + url) print('Image downloaded from ' + url)
return True return True
except Exception as ex: except BaseException as ex:
print('EX: Failed to download image: ' + print('EX: Failed to download image: ' +
str(url) + ' ' + str(ex)) str(url) + ' ' + str(ex))
return False return False
def download_image_any_mime_type(session, url: str, def download_image_any_mime_type(session, url: str,
timeoutSec: int, debug: bool): timeout_sec: int, debug: bool):
"""http GET for an image with any mime type """http GET for an image with any mime type
""" """
mimeType = None mime_type = None
content_type = None content_type = None
result = None result = None
sessionHeaders = { session_headers = {
'Accept': 'image/x-icon, image/png, image/webp, image/jpeg, image/gif' 'Accept': 'image/x-icon, image/png, image/webp, image/jpeg, image/gif'
} }
try: try:
result = session.get(url, headers=sessionHeaders, timeout=timeoutSec) result = session.get(url, headers=session_headers, timeout=timeout_sec)
except requests.exceptions.RequestException as ex: except requests.exceptions.RequestException as ex:
print('ERROR: download_image_any_mime_type failed: ' + print('ERROR: download_image_any_mime_type failed: ' +
str(url) + ', ' + str(ex)) str(url) + ', ' + str(ex))
@ -547,7 +543,7 @@ def download_image_any_mime_type(session, url: str,
if not content_type: if not content_type:
return None, None return None, None
imageFormats = { image_formats = {
'ico': 'x-icon', 'ico': 'x-icon',
'png': 'png', 'png': 'png',
'jpg': 'jpeg', 'jpg': 'jpeg',
@ -557,7 +553,7 @@ def download_image_any_mime_type(session, url: str,
'webp': 'webp', 'webp': 'webp',
'avif': 'avif' 'avif': 'avif'
} }
for imFormat, mType in imageFormats.items(): for _, m_type in image_formats.items():
if 'image/' + mType in content_type: if 'image/' + m_type in content_type:
mimeType = 'image/' + mType mime_type = 'image/' + m_type
return result.content, mimeType return result.content, mime_type