Refactoring for signed GET

main
Bob Mottram 2021-08-31 10:10:49 +01:00
parent 746f58fd7f
commit 894a8ce3fc
1 changed files with 104 additions and 37 deletions

View File

@ -11,6 +11,7 @@ import os
import requests
from utils import urlPermitted
from utils import isImageFile
from httpsig import createSignedHeader
import json
from socket import error as SocketError
import errno
@ -84,49 +85,23 @@ def urlExists(session, url: str, timeoutSec: int = 3,
return False
def _getJsonSigned(url: str, sessionHeaders: {},
sessionParams: {}, timeoutSec: int) -> {}:
"""Authorized fetch
def _getJsonRequest(session, url: str, domainFull: str, sessionHeaders: {},
sessionParams: {}, timeoutSec: int,
privateKeyPem: str, quiet: bool, debug: bool) -> {}:
"""http GET for json
"""
# TODO
return None
def getJson(session, url: str, headers: {}, params: {}, debug: bool,
version: str = '1.2.0', httpPrefix: str = 'https',
domain: str = 'testdomain',
timeoutSec: int = 20, quiet: bool = False) -> {}:
if not isinstance(url, str):
if debug and not quiet:
print('url: ' + str(url))
print('ERROR: getJson failed, url should be a string')
return None
sessionParams = {}
sessionHeaders = {}
if headers:
sessionHeaders = headers
if params:
sessionParams = params
sessionHeaders['User-Agent'] = 'Epicyon/' + version
if domain:
sessionHeaders['User-Agent'] += \
'; +' + httpPrefix + '://' + domain + '/'
if not session:
if not quiet:
print('WARN: getJson failed, no session specified for getJson')
return None
if debug:
HTTPConnection.debuglevel = 1
try:
result = session.get(url, headers=sessionHeaders,
params=sessionParams, timeout=timeoutSec)
if result.status_code != 200:
if result.status_code == 401:
print("WARN: getJson requires secure fetch url: " + url)
return _getJsonSigned(url, sessionHeaders,
sessionParams, timeoutSec)
if not privateKeyPem:
print("WARN: getJson requires secure fetch url: " + url)
else:
return _getJsonSigned(session, url, domainFull,
sessionHeaders, sessionParams,
timeoutSec, privateKeyPem,
quiet, debug)
elif result.status_code == 403:
print('WARN: getJson Forbidden url: ' + url)
elif result.status_code == 404:
@ -161,6 +136,98 @@ def getJson(session, url: str, headers: {}, params: {}, debug: bool,
return None
def _getJsonSigned(session, url: str, domainFull: str, sessionHeaders: {},
sessionParams: {}, timeoutSec: int,
privateKeyPem: str, quiet: bool, debug: bool) -> {}:
"""Authorized fetch
"""
if not domainFull:
if debug:
print('No sending domain for signed GET')
return None
if '://' not in url:
return None
httpPrefix = url.split('://')[0]
toDomainFull = url.split('://')[1]
if '/' in toDomainFull:
toDomainFull = toDomainFull.split('/')[0]
if ':' in domainFull:
domain = domainFull.split(':')[0]
port = domainFull.split(':')[1]
else:
domain = domainFull
if httpPrefix == 'https':
port = 443
else:
port = 80
if ':' in toDomainFull:
toDomain = toDomainFull.split(':')[0]
toPort = toDomainFull.split(':')[1]
else:
toDomain = toDomainFull
if httpPrefix == 'https':
toPort = 443
else:
toPort = 80
# instance actor
nickname = domain
if debug:
print('Signed GET privateKeyPem: ' + privateKeyPem)
print('Signed GET nickname: ' + nickname)
print('Signed GET domain: ' + domain + ' ' + str(port))
print('Signed GET toDomain: ' + toDomain + ' ' + str(toPort))
print('Signed GET url: ' + url)
print('Signed GET httpPrefix: ' + httpPrefix)
signatureHeaderJson = \
createSignedHeader(privateKeyPem, nickname, domain, port,
toDomain, toPort,
url, httpPrefix, False, '')
for key, value in signatureHeaderJson.items():
if key == 'Accept' or key == 'User-Agent':
continue
sessionHeaders[key] = value
return _getJsonRequest(session, url, domainFull, sessionHeaders,
sessionParams, timeoutSec,
None, quiet, debug)
def getJson(session, url: str, headers: {}, params: {}, debug: bool,
version: str = '1.2.0', httpPrefix: str = 'https',
domain: str = 'testdomain',
timeoutSec: int = 20, quiet: bool = False) -> {}:
if not isinstance(url, str):
if debug and not quiet:
print('url: ' + str(url))
print('ERROR: getJson failed, url should be a string')
return None
sessionParams = {}
sessionHeaders = {}
if headers:
sessionHeaders = headers
if params:
sessionParams = params
sessionHeaders['User-Agent'] = 'Epicyon/' + version
if domain:
sessionHeaders['User-Agent'] += \
'; +' + httpPrefix + '://' + domain + '/'
if not session:
if not quiet:
print('WARN: getJson failed, no session specified for getJson')
return None
if debug:
HTTPConnection.debuglevel = 1
privateKeyPem = 'TODO instance actor private key'
return _getJsonRequest(session, url, domain, sessionHeaders,
sessionParams, timeoutSec,
privateKeyPem, quiet, debug)
def postJson(httpPrefix: str, domainFull: str,
session, postJsonObject: {}, federationList: [],
inboxUrl: str, headers: {}, timeoutSec: int = 60,