mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			116 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			116 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "linked_data_sig.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __credits__ = ['Based on ' +
 | |
|                'https://github.com/tsileo/little-boxes']
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.2.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@freedombone.net"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Security"
 | |
| 
 | |
| import base64
 | |
| import hashlib
 | |
| from datetime import datetime
 | |
| from cryptography.hazmat.backends import default_backend
 | |
| from cryptography.hazmat.primitives.serialization import load_pem_private_key
 | |
| from cryptography.hazmat.primitives.serialization import load_pem_public_key
 | |
| from cryptography.hazmat.primitives.asymmetric import padding
 | |
| from cryptography.hazmat.primitives import hashes
 | |
| from cryptography.hazmat.primitives.asymmetric import utils as hazutils
 | |
| from pyjsonld import normalize
 | |
| from context import hasValidContext
 | |
| from utils import getSHA256
 | |
| 
 | |
| 
 | |
| def _options_hash(doc: {}) -> str:
 | |
|     """Returns a hash of the signature, with a few fields removed
 | |
|     """
 | |
|     docSig = dict(doc["signature"])
 | |
| 
 | |
|     # remove fields from signature
 | |
|     for k in ["type", "id", "signatureValue"]:
 | |
|         if k in docSig:
 | |
|             del docSig[k]
 | |
| 
 | |
|     docSig["@context"] = "https://w3id.org/identity/v1"
 | |
|     options = {
 | |
|         "algorithm": "URDNA2015",
 | |
|         "format": "application/nquads"
 | |
|     }
 | |
| 
 | |
|     normalized = normalize(docSig, options)
 | |
|     h = hashlib.new("sha256")
 | |
|     h.update(normalized.encode("utf-8"))
 | |
|     return h.hexdigest()
 | |
| 
 | |
| 
 | |
| def _doc_hash(doc: {}) -> str:
 | |
|     """Returns a hash of the ActivityPub post
 | |
|     """
 | |
|     doc = dict(doc)
 | |
| 
 | |
|     # remove the signature
 | |
|     if "signature" in doc:
 | |
|         del doc["signature"]
 | |
| 
 | |
|     options = {
 | |
|         "algorithm": "URDNA2015",
 | |
|         "format": "application/nquads"
 | |
|     }
 | |
| 
 | |
|     normalized = normalize(doc, options)
 | |
|     h = hashlib.new("sha256")
 | |
|     h.update(normalized.encode("utf-8"))
 | |
|     return h.hexdigest()
 | |
| 
 | |
| 
 | |
| def verifyJsonSignature(doc: {}, publicKeyPem: str) -> bool:
 | |
|     """Returns True if the given ActivityPub post was sent
 | |
|     by an actor having the given public key
 | |
|     """
 | |
|     if not hasValidContext(doc):
 | |
|         return False
 | |
|     pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
 | |
|                                  backend=default_backend())
 | |
|     to_be_signed = _options_hash(doc) + _doc_hash(doc)
 | |
|     signature = doc["signature"]["signatureValue"]
 | |
| 
 | |
|     digest = getSHA256(to_be_signed.encode("utf-8"))
 | |
|     base64sig = base64.b64decode(signature)
 | |
| 
 | |
|     try:
 | |
|         pubkey.verify(
 | |
|             base64sig,
 | |
|             digest,
 | |
|             padding.PKCS1v15(),
 | |
|             hazutils.Prehashed(hashes.SHA256()))
 | |
|         return True
 | |
|     except BaseException:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def generateJsonSignature(doc: {}, privateKeyPem: str) -> None:
 | |
|     """Adds a json signature to the given ActivityPub post
 | |
|     """
 | |
|     if not doc.get('actor'):
 | |
|         return
 | |
|     if not hasValidContext(doc):
 | |
|         return
 | |
|     options = {
 | |
|         "type": "RsaSignature2017",
 | |
|         "creator": doc["actor"] + "#main-key",
 | |
|         "created": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
 | |
|     }
 | |
|     doc["signature"] = options
 | |
|     to_be_signed = _options_hash(doc) + _doc_hash(doc)
 | |
| 
 | |
|     key = load_pem_private_key(privateKeyPem.encode('utf-8'),
 | |
|                                None, backend=default_backend())
 | |
|     digest = getSHA256(to_be_signed.encode("utf-8"))
 | |
|     signature = key.sign(digest,
 | |
|                          padding.PKCS1v15(),
 | |
|                          hazutils.Prehashed(hashes.SHA256()))
 | |
|     sig = base64.b64encode(signature)
 | |
|     options["signatureValue"] = sig.decode("utf-8")
 |