mirror of https://gitlab.com/bashrc2/epicyon
Snake case
parent
38afcaad7b
commit
c1034b21d1
|
@ -1069,8 +1069,8 @@ def saveMediaInFormPOST(mediaBytes, debug: bool,
|
||||||
'zip': 'application/zip'
|
'zip': 'application/zip'
|
||||||
}
|
}
|
||||||
detectedExtension = None
|
detectedExtension = None
|
||||||
for extension, contentType in extensionList.items():
|
for extension, content_type in extensionList.items():
|
||||||
searchStr = b'Content-Type: ' + contentType.encode('utf8', 'ignore')
|
searchStr = b'Content-Type: ' + content_type.encode('utf8', 'ignore')
|
||||||
mediaLocation = mediaBytes.find(searchStr)
|
mediaLocation = mediaBytes.find(searchStr)
|
||||||
if mediaLocation > -1:
|
if mediaLocation > -1:
|
||||||
# image/video/audio binaries
|
# image/video/audio binaries
|
||||||
|
|
|
@ -18135,10 +18135,10 @@ class PubServer(BaseHTTPRequestHandler):
|
||||||
return
|
return
|
||||||
|
|
||||||
# refuse to receive non-json content
|
# refuse to receive non-json content
|
||||||
contentTypeStr = self.headers['Content-type']
|
content_typeStr = self.headers['Content-type']
|
||||||
if not contentTypeStr.startswith('application/json') and \
|
if not content_typeStr.startswith('application/json') and \
|
||||||
not contentTypeStr.startswith('application/activity+json') and \
|
not content_typeStr.startswith('application/activity+json') and \
|
||||||
not contentTypeStr.startswith('application/ld+json'):
|
not content_typeStr.startswith('application/ld+json'):
|
||||||
print("POST is not json: " + self.headers['Content-type'])
|
print("POST is not json: " + self.headers['Content-type'])
|
||||||
if self.server.debug:
|
if self.server.debug:
|
||||||
print(str(self.headers))
|
print(str(self.headers))
|
||||||
|
|
18
httpsig.py
18
httpsig.py
|
@ -70,7 +70,7 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
|
||||||
path: str,
|
path: str,
|
||||||
http_prefix: str,
|
http_prefix: str,
|
||||||
messageBodyJsonStr: str,
|
messageBodyJsonStr: str,
|
||||||
contentType: str,
|
content_type: str,
|
||||||
algorithm: str,
|
algorithm: str,
|
||||||
digestAlgorithm: str) -> str:
|
digestAlgorithm: str) -> str:
|
||||||
"""Returns a raw signature string that can be plugged into a header and
|
"""Returns a raw signature string that can be plugged into a header and
|
||||||
|
@ -93,7 +93,7 @@ def signPostHeaders(dateStr: str, privateKeyPem: str,
|
||||||
'(request-target)': f'get {path}',
|
'(request-target)': f'get {path}',
|
||||||
'host': toDomain,
|
'host': toDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'accept': contentType
|
'accept': content_type
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
bodyDigest = \
|
bodyDigest = \
|
||||||
|
@ -242,7 +242,7 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
|
||||||
toDomain: str, toPort: int,
|
toDomain: str, toPort: int,
|
||||||
path: str, http_prefix: str, withDigest: bool,
|
path: str, http_prefix: str, withDigest: bool,
|
||||||
messageBodyJsonStr: str,
|
messageBodyJsonStr: str,
|
||||||
contentType: str) -> {}:
|
content_type: str) -> {}:
|
||||||
"""Note that the domain is the destination, not the sender
|
"""Note that the domain is the destination, not the sender
|
||||||
"""
|
"""
|
||||||
algorithm = 'rsa-sha256'
|
algorithm = 'rsa-sha256'
|
||||||
|
@ -254,20 +254,20 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
|
||||||
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
|
dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
|
||||||
|
|
||||||
# Content-Type or Accept header
|
# Content-Type or Accept header
|
||||||
if not contentType:
|
if not content_type:
|
||||||
contentType = 'application/activity+json'
|
content_type = 'application/activity+json'
|
||||||
|
|
||||||
if not withDigest:
|
if not withDigest:
|
||||||
headers = {
|
headers = {
|
||||||
'(request-target)': f'get {path}',
|
'(request-target)': f'get {path}',
|
||||||
'host': headerDomain,
|
'host': headerDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'accept': contentType
|
'accept': content_type
|
||||||
}
|
}
|
||||||
signatureHeader = \
|
signatureHeader = \
|
||||||
signPostHeaders(dateStr, privateKeyPem, nickname,
|
signPostHeaders(dateStr, privateKeyPem, nickname,
|
||||||
domain, port, toDomain, toPort,
|
domain, port, toDomain, toPort,
|
||||||
path, http_prefix, None, contentType,
|
path, http_prefix, None, content_type,
|
||||||
algorithm, None)
|
algorithm, None)
|
||||||
else:
|
else:
|
||||||
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
|
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
|
||||||
|
@ -279,14 +279,14 @@ def createSignedHeader(dateStr: str, privateKeyPem: str, nickname: str,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'digest': f'{digestPrefix}={bodyDigest}',
|
'digest': f'{digestPrefix}={bodyDigest}',
|
||||||
'content-length': str(contentLength),
|
'content-length': str(contentLength),
|
||||||
'content-type': contentType
|
'content-type': content_type
|
||||||
}
|
}
|
||||||
signatureHeader = \
|
signatureHeader = \
|
||||||
signPostHeaders(dateStr, privateKeyPem, nickname,
|
signPostHeaders(dateStr, privateKeyPem, nickname,
|
||||||
domain, port,
|
domain, port,
|
||||||
toDomain, toPort,
|
toDomain, toPort,
|
||||||
path, http_prefix, messageBodyJsonStr,
|
path, http_prefix, messageBodyJsonStr,
|
||||||
contentType, algorithm, digestAlgorithm)
|
content_type, algorithm, digestAlgorithm)
|
||||||
headers['signature'] = signatureHeader
|
headers['signature'] = signatureHeader
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
|
|
34
session.py
34
session.py
|
@ -184,13 +184,13 @@ def _getJsonSigned(session, url: str, domain_full: str, sessionHeaders: {},
|
||||||
path = '/' + url.split(toDomainFull + '/')[1]
|
path = '/' + url.split(toDomainFull + '/')[1]
|
||||||
else:
|
else:
|
||||||
path = '/actor'
|
path = '/actor'
|
||||||
contentType = 'application/activity+json'
|
content_type = 'application/activity+json'
|
||||||
if sessionHeaders.get('Accept'):
|
if sessionHeaders.get('Accept'):
|
||||||
contentType = sessionHeaders['Accept']
|
content_type = sessionHeaders['Accept']
|
||||||
signatureHeaderJson = \
|
signatureHeaderJson = \
|
||||||
createSignedHeader(None, signing_priv_key_pem, 'actor', domain, port,
|
createSignedHeader(None, signing_priv_key_pem, 'actor', domain, port,
|
||||||
toDomain, toPort, path, http_prefix, withDigest,
|
toDomain, toPort, path, http_prefix, withDigest,
|
||||||
messageStr, contentType)
|
messageStr, content_type)
|
||||||
if debug:
|
if debug:
|
||||||
print('Signed GET signatureHeaderJson ' + str(signatureHeaderJson))
|
print('Signed GET signatureHeaderJson ' + str(signatureHeaderJson))
|
||||||
# update the session headers from the signature headers
|
# update the session headers from the signature headers
|
||||||
|
@ -203,7 +203,7 @@ def _getJsonSigned(session, url: str, domain_full: str, sessionHeaders: {},
|
||||||
print('Signed GET sessionHeaders ' + str(sessionHeaders))
|
print('Signed GET sessionHeaders ' + str(sessionHeaders))
|
||||||
|
|
||||||
returnJson = True
|
returnJson = True
|
||||||
if 'json' not in contentType:
|
if 'json' not in content_type:
|
||||||
returnJson = False
|
returnJson = False
|
||||||
return _getJsonRequest(session, url, domain_full, sessionHeaders,
|
return _getJsonRequest(session, url, domain_full, sessionHeaders,
|
||||||
sessionParams, timeoutSec, None, quiet,
|
sessionParams, timeoutSec, None, quiet,
|
||||||
|
@ -404,18 +404,18 @@ def postImage(session, attachImageFilename: str, federation_list: [],
|
||||||
if not os.path.isfile(attachImageFilename):
|
if not os.path.isfile(attachImageFilename):
|
||||||
print('Image not found: ' + attachImageFilename)
|
print('Image not found: ' + attachImageFilename)
|
||||||
return None
|
return None
|
||||||
contentType = 'image/jpeg'
|
content_type = 'image/jpeg'
|
||||||
if attachImageFilename.endswith('.png'):
|
if attachImageFilename.endswith('.png'):
|
||||||
contentType = 'image/png'
|
content_type = 'image/png'
|
||||||
elif attachImageFilename.endswith('.gif'):
|
elif attachImageFilename.endswith('.gif'):
|
||||||
contentType = 'image/gif'
|
content_type = 'image/gif'
|
||||||
elif attachImageFilename.endswith('.webp'):
|
elif attachImageFilename.endswith('.webp'):
|
||||||
contentType = 'image/webp'
|
content_type = 'image/webp'
|
||||||
elif attachImageFilename.endswith('.avif'):
|
elif attachImageFilename.endswith('.avif'):
|
||||||
contentType = 'image/avif'
|
content_type = 'image/avif'
|
||||||
elif attachImageFilename.endswith('.svg'):
|
elif attachImageFilename.endswith('.svg'):
|
||||||
contentType = 'image/svg+xml'
|
content_type = 'image/svg+xml'
|
||||||
headers['Content-type'] = contentType
|
headers['Content-type'] = content_type
|
||||||
|
|
||||||
with open(attachImageFilename, 'rb') as avFile:
|
with open(attachImageFilename, 'rb') as avFile:
|
||||||
mediaBinary = avFile.read()
|
mediaBinary = avFile.read()
|
||||||
|
@ -507,7 +507,7 @@ def downloadImageAnyMimeType(session, url: str, timeoutSec: int, debug: bool):
|
||||||
"""http GET for an image with any mime type
|
"""http GET for an image with any mime type
|
||||||
"""
|
"""
|
||||||
mimeType = None
|
mimeType = None
|
||||||
contentType = None
|
content_type = None
|
||||||
result = None
|
result = None
|
||||||
sessionHeaders = {
|
sessionHeaders = {
|
||||||
'Accept': 'image/x-icon, image/png, image/webp, image/jpeg, image/gif'
|
'Accept': 'image/x-icon, image/png, image/webp, image/jpeg, image/gif'
|
||||||
|
@ -537,13 +537,13 @@ def downloadImageAnyMimeType(session, url: str, timeoutSec: int, debug: bool):
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
if result.headers.get('content-type'):
|
if result.headers.get('content-type'):
|
||||||
contentType = result.headers['content-type']
|
content_type = result.headers['content-type']
|
||||||
elif result.headers.get('Content-type'):
|
elif result.headers.get('Content-type'):
|
||||||
contentType = result.headers['Content-type']
|
content_type = result.headers['Content-type']
|
||||||
elif result.headers.get('Content-Type'):
|
elif result.headers.get('Content-Type'):
|
||||||
contentType = result.headers['Content-Type']
|
content_type = result.headers['Content-Type']
|
||||||
|
|
||||||
if not contentType:
|
if not content_type:
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
imageFormats = {
|
imageFormats = {
|
||||||
|
@ -557,6 +557,6 @@ def downloadImageAnyMimeType(session, url: str, timeoutSec: int, debug: bool):
|
||||||
'avif': 'avif'
|
'avif': 'avif'
|
||||||
}
|
}
|
||||||
for imFormat, mType in imageFormats.items():
|
for imFormat, mType in imageFormats.items():
|
||||||
if 'image/' + mType in contentType:
|
if 'image/' + mType in content_type:
|
||||||
mimeType = 'image/' + mType
|
mimeType = 'image/' + mType
|
||||||
return result.content, mimeType
|
return result.content, mimeType
|
||||||
|
|
28
tests.py
28
tests.py
|
@ -409,7 +409,7 @@ def _testHttpSigNew(algorithm: str, digestAlgorithm: str):
|
||||||
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
|
bodyDigest = messageContentDigest(messageBodyJsonStr, digestAlgorithm)
|
||||||
assert bodyDigest in digestStr
|
assert bodyDigest in digestStr
|
||||||
contentLength = 18
|
contentLength = 18
|
||||||
contentType = 'application/activity+json'
|
content_type = 'application/activity+json'
|
||||||
publicKeyPem = \
|
publicKeyPem = \
|
||||||
'-----BEGIN RSA PUBLIC KEY-----\n' + \
|
'-----BEGIN RSA PUBLIC KEY-----\n' + \
|
||||||
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
|
'MIIBCgKCAQEAhAKYdtoeoy8zcAcR874L8' + \
|
||||||
|
@ -482,7 +482,7 @@ def _testHttpSigNew(algorithm: str, digestAlgorithm: str):
|
||||||
"host": domain,
|
"host": domain,
|
||||||
"date": dateStr,
|
"date": dateStr,
|
||||||
"digest": f'{digestPrefix}={bodyDigest}',
|
"digest": f'{digestPrefix}={bodyDigest}',
|
||||||
"content-type": contentType,
|
"content-type": content_type,
|
||||||
"content-length": str(contentLength)
|
"content-length": str(contentLength)
|
||||||
}
|
}
|
||||||
signatureIndexHeader, signatureHeader = \
|
signatureIndexHeader, signatureHeader = \
|
||||||
|
@ -534,7 +534,7 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
|
|
||||||
algorithm = 'rsa-sha256'
|
algorithm = 'rsa-sha256'
|
||||||
digestAlgorithm = 'rsa-sha256'
|
digestAlgorithm = 'rsa-sha256'
|
||||||
contentType = 'application/activity+json'
|
content_type = 'application/activity+json'
|
||||||
nickname = 'socrates'
|
nickname = 'socrates'
|
||||||
hostDomain = 'someother.instance'
|
hostDomain = 'someother.instance'
|
||||||
domain = 'argumentative.social'
|
domain = 'argumentative.social'
|
||||||
|
@ -563,13 +563,13 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
headers = {
|
headers = {
|
||||||
'host': headersDomain,
|
'host': headersDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'accept': contentType
|
'accept': content_type
|
||||||
}
|
}
|
||||||
signatureHeader = \
|
signatureHeader = \
|
||||||
signPostHeaders(dateStr, privateKeyPem, nickname,
|
signPostHeaders(dateStr, privateKeyPem, nickname,
|
||||||
domain, port,
|
domain, port,
|
||||||
hostDomain, port,
|
hostDomain, port,
|
||||||
boxpath, http_prefix, None, contentType,
|
boxpath, http_prefix, None, content_type,
|
||||||
algorithm, None)
|
algorithm, None)
|
||||||
else:
|
else:
|
||||||
digestPrefix = getDigestPrefix(digestAlgorithm)
|
digestPrefix = getDigestPrefix(digestAlgorithm)
|
||||||
|
@ -579,7 +579,7 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
'host': headersDomain,
|
'host': headersDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'digest': f'{digestPrefix}={bodyDigest}',
|
'digest': f'{digestPrefix}={bodyDigest}',
|
||||||
'content-type': contentType,
|
'content-type': content_type,
|
||||||
'content-length': str(contentLength)
|
'content-length': str(contentLength)
|
||||||
}
|
}
|
||||||
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
||||||
|
@ -588,7 +588,7 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
domain, port,
|
domain, port,
|
||||||
hostDomain, port,
|
hostDomain, port,
|
||||||
boxpath, http_prefix, messageBodyJsonStr,
|
boxpath, http_prefix, messageBodyJsonStr,
|
||||||
contentType, algorithm, digestAlgorithm)
|
content_type, algorithm, digestAlgorithm)
|
||||||
|
|
||||||
headers['signature'] = signatureHeader
|
headers['signature'] = signatureHeader
|
||||||
GETmethod = not withDigest
|
GETmethod = not withDigest
|
||||||
|
@ -613,7 +613,7 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
headers = {
|
headers = {
|
||||||
'host': 'bogon.domain',
|
'host': 'bogon.domain',
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'content-type': contentType
|
'content-type': content_type
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
# correct domain but fake message
|
# correct domain but fake message
|
||||||
|
@ -627,7 +627,7 @@ def _testHttpsigBase(withDigest: bool, base_dir: str):
|
||||||
'host': domain,
|
'host': domain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'digest': f'{digestPrefix}={bodyDigest}',
|
'digest': f'{digestPrefix}={bodyDigest}',
|
||||||
'content-type': contentType,
|
'content-type': content_type,
|
||||||
'content-length': str(contentLength)
|
'content-length': str(contentLength)
|
||||||
}
|
}
|
||||||
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
||||||
|
@ -5918,7 +5918,7 @@ def _testHttpsigBaseNew(withDigest: bool, base_dir: str,
|
||||||
os.mkdir(path)
|
os.mkdir(path)
|
||||||
os.chdir(path)
|
os.chdir(path)
|
||||||
|
|
||||||
contentType = 'application/activity+json'
|
content_type = 'application/activity+json'
|
||||||
nickname = 'socrates'
|
nickname = 'socrates'
|
||||||
hostDomain = 'someother.instance'
|
hostDomain = 'someother.instance'
|
||||||
domain = 'argumentative.social'
|
domain = 'argumentative.social'
|
||||||
|
@ -5947,7 +5947,7 @@ def _testHttpsigBaseNew(withDigest: bool, base_dir: str,
|
||||||
headers = {
|
headers = {
|
||||||
'host': headersDomain,
|
'host': headersDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'accept': contentType
|
'accept': content_type
|
||||||
}
|
}
|
||||||
signatureIndexHeader, signatureHeader = \
|
signatureIndexHeader, signatureHeader = \
|
||||||
signPostHeadersNew(dateStr, privateKeyPem, nickname,
|
signPostHeadersNew(dateStr, privateKeyPem, nickname,
|
||||||
|
@ -5963,7 +5963,7 @@ def _testHttpsigBaseNew(withDigest: bool, base_dir: str,
|
||||||
'host': headersDomain,
|
'host': headersDomain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'digest': f'{digestPrefix}={bodyDigest}',
|
'digest': f'{digestPrefix}={bodyDigest}',
|
||||||
'content-type': contentType,
|
'content-type': content_type,
|
||||||
'content-length': str(contentLength)
|
'content-length': str(contentLength)
|
||||||
}
|
}
|
||||||
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
||||||
|
@ -6001,7 +6001,7 @@ def _testHttpsigBaseNew(withDigest: bool, base_dir: str,
|
||||||
headers = {
|
headers = {
|
||||||
'host': 'bogon.domain',
|
'host': 'bogon.domain',
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'content-type': contentType
|
'content-type': content_type
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
# correct domain but fake message
|
# correct domain but fake message
|
||||||
|
@ -6015,7 +6015,7 @@ def _testHttpsigBaseNew(withDigest: bool, base_dir: str,
|
||||||
'host': domain,
|
'host': domain,
|
||||||
'date': dateStr,
|
'date': dateStr,
|
||||||
'digest': f'{digestPrefix}={bodyDigest}',
|
'digest': f'{digestPrefix}={bodyDigest}',
|
||||||
'content-type': contentType,
|
'content-type': content_type,
|
||||||
'content-length': str(contentLength)
|
'content-length': str(contentLength)
|
||||||
}
|
}
|
||||||
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
assert getDigestAlgorithmFromHeaders(headers) == digestAlgorithm
|
||||||
|
|
4
utils.py
4
utils.py
|
@ -378,7 +378,7 @@ def get_image_mime_type(image_filename: str) -> str:
|
||||||
return 'image/png'
|
return 'image/png'
|
||||||
|
|
||||||
|
|
||||||
def getImageExtensionFromMimeType(contentType: str) -> str:
|
def getImageExtensionFromMimeType(content_type: str) -> str:
|
||||||
"""Returns the image extension from a mime type, such as image/jpeg
|
"""Returns the image extension from a mime type, such as image/jpeg
|
||||||
"""
|
"""
|
||||||
image_media = {
|
image_media = {
|
||||||
|
@ -391,7 +391,7 @@ def getImageExtensionFromMimeType(contentType: str) -> str:
|
||||||
'x-icon': 'ico'
|
'x-icon': 'ico'
|
||||||
}
|
}
|
||||||
for mimeExt, ext in image_media.items():
|
for mimeExt, ext in image_media.items():
|
||||||
if contentType.endswith(mimeExt):
|
if content_type.endswith(mimeExt):
|
||||||
return ext
|
return ext
|
||||||
return 'png'
|
return 'png'
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue