mirror of https://gitlab.com/bashrc2/epicyon
Comments
parent
0a5c65e9a2
commit
5a5dd1e016
|
@ -24,47 +24,65 @@ except ImportError:
|
||||||
from pyjsonld import normalize
|
from pyjsonld import normalize
|
||||||
|
|
||||||
|
|
||||||
def _options_hash(doc):
|
def _options_hash(doc: {}) -> str:
|
||||||
doc = dict(doc["signature"])
|
"""Returns a hash of the signature, with a few fields removed
|
||||||
|
"""
|
||||||
|
docSig = dict(doc["signature"])
|
||||||
|
|
||||||
|
# remove fields from signature
|
||||||
for k in ["type", "id", "signatureValue"]:
|
for k in ["type", "id", "signatureValue"]:
|
||||||
if k in doc:
|
if k in docSig:
|
||||||
del doc[k]
|
del docSig[k]
|
||||||
doc["@context"] = "https://w3id.org/identity/v1"
|
|
||||||
|
docSig["@context"] = "https://w3id.org/identity/v1"
|
||||||
options = {
|
options = {
|
||||||
"algorithm": "URDNA2015",
|
"algorithm": "URDNA2015",
|
||||||
"format": "application/nquads"
|
"format": "application/nquads"
|
||||||
}
|
}
|
||||||
normalized = normalize(doc, options)
|
|
||||||
|
normalized = normalize(docSig, options)
|
||||||
h = hashlib.new("sha256")
|
h = hashlib.new("sha256")
|
||||||
h.update(normalized.encode("utf-8"))
|
h.update(normalized.encode("utf-8"))
|
||||||
return h.hexdigest()
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def _doc_hash(doc):
|
def _doc_hash(doc: {}) -> str:
|
||||||
|
"""Returns a hash of the ActivityPub post
|
||||||
|
"""
|
||||||
doc = dict(doc)
|
doc = dict(doc)
|
||||||
|
|
||||||
|
# remove the signature
|
||||||
if "signature" in doc:
|
if "signature" in doc:
|
||||||
del doc["signature"]
|
del doc["signature"]
|
||||||
|
|
||||||
options = {
|
options = {
|
||||||
"algorithm": "URDNA2015",
|
"algorithm": "URDNA2015",
|
||||||
"format": "application/nquads"
|
"format": "application/nquads"
|
||||||
}
|
}
|
||||||
|
|
||||||
normalized = normalize(doc, options)
|
normalized = normalize(doc, options)
|
||||||
h = hashlib.new("sha256")
|
h = hashlib.new("sha256")
|
||||||
h.update(normalized.encode("utf-8"))
|
h.update(normalized.encode("utf-8"))
|
||||||
return h.hexdigest()
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def verifyJsonSignature(doc: {}, publicKeyPem: str):
|
def verifyJsonSignature(doc: {}, publicKeyPem: str) -> bool:
|
||||||
|
"""Returns True if the given ActivityPub post was sent
|
||||||
|
by an actor having the given public key
|
||||||
|
"""
|
||||||
key = RSA.importKey(publicKeyPem)
|
key = RSA.importKey(publicKeyPem)
|
||||||
to_be_signed = _options_hash(doc) + _doc_hash(doc)
|
to_be_signed = _options_hash(doc) + _doc_hash(doc)
|
||||||
signature = doc["signature"]["signatureValue"]
|
signature = doc["signature"]["signatureValue"]
|
||||||
signer = PKCS1_v1_5.new(key) # type: ignore
|
signer = PKCS1_v1_5.new(key) # type: ignore
|
||||||
digest = SHA256.new()
|
digest = SHA256.new()
|
||||||
digest.update(to_be_signed.encode("utf-8"))
|
digest.update(to_be_signed.encode("utf-8"))
|
||||||
return signer.verify(digest, base64.b64decode(signature)) # type: ignore
|
base64sig = base64.b64decode(signature)
|
||||||
|
return signer.verify(digest, base64sig) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
def generateJsonSignature(doc: {}, privateKeyPem: str):
|
def generateJsonSignature(doc: {}, privateKeyPem: str) -> None:
|
||||||
|
"""Adds a json signature to the given ActivityPub post
|
||||||
|
"""
|
||||||
if not doc.get('actor'):
|
if not doc.get('actor'):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue