forked from indymedia/epicyon
				
			
		
			
				
	
	
		
			259 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "posts.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __credits__ = ['lamia']
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.1.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@freedombone.net"
 | |
| __status__ = "Production"
 | |
| 
 | |
| # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
 | |
| 
 | |
| try:
 | |
|     from Cryptodome.PublicKey import RSA
 | |
|     from Cryptodome.Hash import SHA256
 | |
|     from Cryptodome.Signature import pkcs1_15
 | |
| except ImportError:
 | |
|     from Crypto.PublicKey import RSA
 | |
|     from Crypto.Hash import SHA256
 | |
|     # from Crypto.Signature import PKCS1_v1_5
 | |
|     from Crypto.Signature import pkcs1_15
 | |
| 
 | |
| import base64
 | |
| from time import gmtime, strftime
 | |
| import datetime
 | |
| from utils import getFullDomain
 | |
| 
 | |
| 
 | |
| def messageContentDigest(messageBodyJsonStr: str) -> str:
 | |
|     msg = messageBodyJsonStr.encode('utf-8')
 | |
|     digestStr = SHA256.new(msg).digest()
 | |
|     return base64.b64encode(digestStr).decode('utf-8')
 | |
| 
 | |
| 
 | |
| def signPostHeaders(dateStr: str, privateKeyPem: str,
 | |
|                     nickname: str,
 | |
|                     domain: str, port: int,
 | |
|                     toDomain: str, toPort: int,
 | |
|                     path: str,
 | |
|                     httpPrefix: str,
 | |
|                     messageBodyJsonStr: str) -> str:
 | |
|     """Returns a raw signature string that can be plugged into a header and
 | |
|     used to verify the authenticity of an HTTP transmission.
 | |
|     """
 | |
|     domain = getFullDomain(domain, port)
 | |
| 
 | |
|     toDomain = getFullDomain(toDomain, toPort)
 | |
| 
 | |
|     if not dateStr:
 | |
|         dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
 | |
|     keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
 | |
|     if not messageBodyJsonStr:
 | |
|         headers = {
 | |
|             '(request-target)': f'post {path}',
 | |
|             'host': toDomain,
 | |
|             'date': dateStr,
 | |
|             'content-type': 'application/json'
 | |
|         }
 | |
|     else:
 | |
|         bodyDigest = messageContentDigest(messageBodyJsonStr)
 | |
|         contentLength = len(messageBodyJsonStr)
 | |
|         headers = {
 | |
|             '(request-target)': f'post {path}',
 | |
|             'host': toDomain,
 | |
|             'date': dateStr,
 | |
|             'digest': f'SHA-256={bodyDigest}',
 | |
|             'content-type': 'application/activity+json',
 | |
|             'content-length': str(contentLength)
 | |
|         }
 | |
|     privateKeyPem = RSA.import_key(privateKeyPem)
 | |
|     # headers.update({
 | |
|     #     '(request-target)': f'post {path}',
 | |
|     # })
 | |
|     # build a digest for signing
 | |
|     signedHeaderKeys = headers.keys()
 | |
|     signedHeaderText = ''
 | |
|     for headerKey in signedHeaderKeys:
 | |
|         signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
 | |
|     signedHeaderText = signedHeaderText.strip()
 | |
|     headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
 | |
| 
 | |
|     # Sign the digest
 | |
|     rawSignature = pkcs1_15.new(privateKeyPem).sign(headerDigest)
 | |
|     signature = base64.b64encode(rawSignature).decode('ascii')
 | |
| 
 | |
|     # Put it into a valid HTTP signature format
 | |
|     signatureDict = {
 | |
|         'keyId': keyID,
 | |
|         'algorithm': 'rsa-sha256',
 | |
|         'headers': ' '.join(signedHeaderKeys),
 | |
|         'signature': signature
 | |
|     }
 | |
|     signatureHeader = ','.join(
 | |
|         [f'{k}="{v}"' for k, v in signatureDict.items()])
 | |
|     return signatureHeader
 | |
| 
 | |
| 
 | |
| def createSignedHeader(privateKeyPem: str, nickname: str,
 | |
|                        domain: str, port: int,
 | |
|                        toDomain: str, toPort: int,
 | |
|                        path: str, httpPrefix: str, withDigest: bool,
 | |
|                        messageBodyJsonStr: str) -> {}:
 | |
|     """Note that the domain is the destination, not the sender
 | |
|     """
 | |
|     contentType = 'application/activity+json'
 | |
|     headerDomain = getFullDomain(toDomain, toPort)
 | |
| 
 | |
|     dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
 | |
|     if not withDigest:
 | |
|         headers = {
 | |
|             '(request-target)': f'post {path}',
 | |
|             'host': headerDomain,
 | |
|             'date': dateStr
 | |
|         }
 | |
|         signatureHeader = \
 | |
|             signPostHeaders(dateStr, privateKeyPem, nickname,
 | |
|                             domain, port, toDomain, toPort,
 | |
|                             path, httpPrefix, None)
 | |
|     else:
 | |
|         bodyDigest = messageContentDigest(messageBodyJsonStr)
 | |
|         contentLength = len(messageBodyJsonStr)
 | |
|         headers = {
 | |
|             '(request-target)': f'post {path}',
 | |
|             'host': headerDomain,
 | |
|             'date': dateStr,
 | |
|             'digest': f'SHA-256={bodyDigest}',
 | |
|             'content-length': str(contentLength),
 | |
|             'content-type': contentType
 | |
|         }
 | |
|         signatureHeader = \
 | |
|             signPostHeaders(dateStr, privateKeyPem, nickname,
 | |
|                             domain, port,
 | |
|                             toDomain, toPort,
 | |
|                             path, httpPrefix, messageBodyJsonStr)
 | |
|     headers['signature'] = signatureHeader
 | |
|     return headers
 | |
| 
 | |
| 
 | |
| def _verifyRecentSignature(signedDateStr: str) -> bool:
 | |
|     """Checks whether the given time taken from the header is within
 | |
|     12 hours of the current time
 | |
|     """
 | |
|     currDate = datetime.datetime.utcnow()
 | |
|     dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
 | |
|     signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
 | |
|     timeDiffSec = (currDate - signedDate).seconds
 | |
|     # 12 hours tollerance
 | |
|     if timeDiffSec > 43200:
 | |
|         print('WARN: Header signed too long ago: ' + signedDateStr)
 | |
|         print(str(timeDiffSec / (60 * 60)) + ' hours')
 | |
|         return False
 | |
|     if timeDiffSec < 0:
 | |
|         print('WARN: Header signed in the future! ' + signedDateStr)
 | |
|         print(str(timeDiffSec / (60 * 60)) + ' hours')
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
 | |
|                       path: str, GETmethod: bool,
 | |
|                       messageBodyDigest: str,
 | |
|                       messageBodyJsonStr: str, debug: bool) -> bool:
 | |
|     """Returns true or false depending on if the key that we plugged in here
 | |
|     validates against the headers, method, and path.
 | |
|     publicKeyPem - the public key from an rsa key pair
 | |
|     headers - should be a dictionary of request headers
 | |
|     path - the relative url that was requested from this site
 | |
|     GETmethod - GET or POST
 | |
|     messageBodyJsonStr - the received request body (used for digest)
 | |
|     """
 | |
| 
 | |
|     if GETmethod:
 | |
|         method = 'GET'
 | |
|     else:
 | |
|         method = 'POST'
 | |
| 
 | |
|     if debug:
 | |
|         print('DEBUG: verifyPostHeaders ' + method)
 | |
| 
 | |
|     publicKeyPem = RSA.import_key(publicKeyPem)
 | |
|     # Build a dictionary of the signature values
 | |
|     signatureHeader = headers['signature']
 | |
|     signatureDict = {
 | |
|         k: v[1:-1]
 | |
|         for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
 | |
|     }
 | |
| 
 | |
|     # Unpack the signed headers and set values based on current headers and
 | |
|     # body (if a digest was included)
 | |
|     signedHeaderList = []
 | |
|     for signedHeader in signatureDict['headers'].split(' '):
 | |
|         if debug:
 | |
|             print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
 | |
|         if signedHeader == '(request-target)':
 | |
|             appendStr = f'(request-target): {method.lower()} {path}'
 | |
|             signedHeaderList.append(appendStr)
 | |
|         elif signedHeader == 'digest':
 | |
|             if messageBodyDigest:
 | |
|                 bodyDigest = messageBodyDigest
 | |
|             else:
 | |
|                 bodyDigest = messageContentDigest(messageBodyJsonStr)
 | |
|             signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
 | |
|         elif signedHeader == 'content-length':
 | |
|             if headers.get(signedHeader):
 | |
|                 appendStr = f'content-length: {headers[signedHeader]}'
 | |
|                 signedHeaderList.append(appendStr)
 | |
|             else:
 | |
|                 if headers.get('Content-Length'):
 | |
|                     contentLength = headers['Content-Length']
 | |
|                     signedHeaderList.append(f'content-length: {contentLength}')
 | |
|                 else:
 | |
|                     if headers.get('Content-length'):
 | |
|                         contentLength = headers['Content-length']
 | |
|                         appendStr = f'content-length: {contentLength}'
 | |
|                         signedHeaderList.append(appendStr)
 | |
|                     else:
 | |
|                         if debug:
 | |
|                             print('DEBUG: verifyPostHeaders ' + signedHeader +
 | |
|                                   ' not found in ' + str(headers))
 | |
|         else:
 | |
|             if headers.get(signedHeader):
 | |
|                 if signedHeader == 'date':
 | |
|                     if not _verifyRecentSignature(headers[signedHeader]):
 | |
|                         if debug:
 | |
|                             print('DEBUG: ' +
 | |
|                                   'verifyPostHeaders date is not recent ' +
 | |
|                                   headers[signedHeader])
 | |
|                         return False
 | |
|                 signedHeaderList.append(
 | |
|                     f'{signedHeader}: {headers[signedHeader]}')
 | |
|             else:
 | |
|                 signedHeaderCap = signedHeader.capitalize()
 | |
|                 if signedHeaderCap == 'Date':
 | |
|                     if not _verifyRecentSignature(headers[signedHeaderCap]):
 | |
|                         if debug:
 | |
|                             print('DEBUG: ' +
 | |
|                                   'verifyPostHeaders date is not recent ' +
 | |
|                                   headers[signedHeader])
 | |
|                         return False
 | |
|                 if headers.get(signedHeaderCap):
 | |
|                     signedHeaderList.append(
 | |
|                         f'{signedHeader}: {headers[signedHeaderCap]}')
 | |
| 
 | |
|     if debug:
 | |
|         print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
 | |
|     # Now we have our header data digest
 | |
|     signedHeaderText = '\n'.join(signedHeaderList)
 | |
|     headerDigest = SHA256.new(signedHeaderText.encode('ascii'))
 | |
| 
 | |
|     # Get the signature, verify with public key, return result
 | |
|     signature = base64.b64decode(signatureDict['signature'])
 | |
| 
 | |
|     try:
 | |
|         pkcs1_15.new(publicKeyPem).verify(headerDigest, signature)
 | |
|         return True
 | |
|     except (ValueError, TypeError):
 | |
|         if debug:
 | |
|             print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
 | |
|         return False
 |