flake8 format

main
Bob Mottram 2020-04-04 12:00:19 +01:00
parent bfe37d5ae7
commit d652485f8a
1 changed files with 78 additions and 76 deletions

View File

@ -1,84 +1,86 @@
__filename__="session.py" __filename__ = "session.py"
__author__="Bob Mottram" __author__ = "Bob Mottram"
__license__="AGPL3+" __license__ = "AGPL3+"
__version__="1.1.0" __version__ = "1.1.0"
__maintainer__="Bob Mottram" __maintainer__ = "Bob Mottram"
__email__="bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__="Production" __status__ = "Production"
import os import os
import sys
import requests import requests
from utils import urlPermitted from utils import urlPermitted
import json import json
baseDirectory=None baseDirectory = None
def createSession(onionRoute: bool): def createSession(onionRoute: bool):
session=requests.session() session = requests.session()
if onionRoute: if onionRoute:
session.proxies={} session.proxies = {}
session.proxies['http']='socks5h://localhost:9050' session.proxies['http'] = 'socks5h://localhost:9050'
session.proxies['https']='socks5h://localhost:9050' session.proxies['https'] = 'socks5h://localhost:9050'
return session return session
def getJson(session,url: str,headers: {},params: {}, \
version='1.0.0',httpPrefix='https',domain='testdomain') -> {}: def getJson(session, url: str, headers: {}, params: {},
version='1.0.0', httpPrefix='https',
domain='testdomain') -> {}:
if not isinstance(url, str): if not isinstance(url, str):
print('url: '+str(url)) print('url: ' + str(url))
print('ERROR: getJson url should be a string') print('ERROR: getJson url should be a string')
return None return None
sessionParams={} sessionParams = {}
sessionHeaders={} sessionHeaders = {}
if headers: if headers:
sessionHeaders=headers sessionHeaders = headers
if params: if params:
sessionParams=params sessionParams = params
sessionHeaders['User-Agent']='Epicyon/'+version sessionHeaders['User-Agent'] = 'Epicyon/' + version
if domain: if domain:
sessionHeaders['User-Agent']+='; +'+httpPrefix+'://'+domain+'/' sessionHeaders['User-Agent'] += \
#"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)" '; +' + httpPrefix + '://' + domain + '/'
if not session: if not session:
print('WARN: no session specified for getJson') print('WARN: no session specified for getJson')
#session.cookies.clear()
try: try:
result=session.get(url, headers=sessionHeaders, params=sessionParams) result = session.get(url, headers=sessionHeaders, params=sessionParams)
return result.json() return result.json()
except Exception as e: except Exception as e:
print('ERROR: getJson failed') print('ERROR: getJson failed')
print('url: '+str(url)) print('url: ' + str(url))
print('headers: '+str(sessionHeaders)) print('headers: ' + str(sessionHeaders))
print('params: '+str(sessionParams)) print('params: ' + str(sessionParams))
print(e) print(e)
return None return None
def postJson(session,postJsonObject: {},federationList: [], \
inboxUrl: str,headers: {},capability: str) -> str: def postJson(session, postJsonObject: {}, federationList: [],
inboxUrl: str, headers: {}, capability: str) -> str:
"""Post a json message to the inbox of another person """Post a json message to the inbox of another person
Supplying a capability, such as "inbox:write" Supplying a capability, such as "inbox:write"
""" """
# always allow capability requests # always allow capability requests
if not capability.startswith('cap'): if not capability.startswith('cap'):
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
if not urlPermitted(inboxUrl,federationList,capability): if not urlPermitted(inboxUrl, federationList, capability):
print('postJson: '+inboxUrl+' not permitted') print('postJson: ' + inboxUrl + ' not permitted')
return None return None
postResult= \ postResult = \
session.post(url=inboxUrl, \ session.post(url=inboxUrl,
data=json.dumps(postJsonObject), \ data=json.dumps(postJsonObject),
headers=headers) headers=headers)
if postResult: if postResult:
return postResult.text return postResult.text
return None return None
def postJsonString(session,postJsonStr: str, \
federationList: [], \ def postJsonString(session, postJsonStr: str,
inboxUrl: str, \ federationList: [],
headers: {}, \ inboxUrl: str,
capability: str, \ headers: {},
debug: bool) -> (bool,bool): capability: str,
debug: bool) -> (bool, bool):
"""Post a json message string to the inbox of another person """Post a json message string to the inbox of another person
Supplying a capability, such as "inbox:write" Supplying a capability, such as "inbox:write"
The second boolean returned is true if the send is unauthorized The second boolean returned is true if the send is unauthorized
@ -86,63 +88,63 @@ def postJsonString(session,postJsonStr: str, \
conversions between string and json format don't invalidate conversions between string and json format don't invalidate
the message body digest of http signatures the message body digest of http signatures
""" """
# always allow capability requests # always allow capability requests
if not capability.startswith('cap'): if not capability.startswith('cap'):
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
if not urlPermitted(inboxUrl,federationList,capability): if not urlPermitted(inboxUrl, federationList, capability):
print('postJson: '+inboxUrl+' not permitted by capabilities') print('postJson: ' + inboxUrl + ' not permitted by capabilities')
return None,None return None, None
postResult= \ postResult = \
session.post(url=inboxUrl,data=postJsonStr,headers=headers) session.post(url=inboxUrl, data=postJsonStr, headers=headers)
if postResult.status_code<200 or postResult.status_code>202: if postResult.status_code < 200 or postResult.status_code > 202:
#if postResult.status_code==400: if postResult.status_code >= 400 and \
# headers['content-type']='application/ld+json' postResult.status_code <= 405 and \
# postResult=session.post(url=inboxUrl,data=postJsonStr,headers=headers) postResult.status_code != 404:
# if not (postResult.status_code<200 or postResult.status_code>202): print('WARN: >>> Post to ' + inboxUrl +
# return True ' is unauthorized. Code ' +
if postResult.status_code>=400 and postResult.status_code<=405 and \ str(postResult.status_code) + ' <<<')
postResult.status_code!=404: return False, True
print('WARN: >>> Post to '+inboxUrl+' is unauthorized. Code '+str(postResult.status_code)+' <<<')
return False,True
else: else:
print('WARN: Failed to post to '+inboxUrl+' with headers '+str(headers)) print('WARN: Failed to post to ' + inboxUrl +
print('status code '+str(postResult.status_code)) ' with headers ' + str(headers))
return False,False print('status code ' + str(postResult.status_code))
return True,False return False, False
return True, False
def postImage(session,attachImageFilename: str,federationList: [], \
inboxUrl: str,headers: {},capability: str) -> str: def postImage(session, attachImageFilename: str, federationList: [],
inboxUrl: str, headers: {}, capability: str) -> str:
"""Post an image to the inbox of another person or outbox via c2s """Post an image to the inbox of another person or outbox via c2s
Supplying a capability, such as "inbox:write" Supplying a capability, such as "inbox:write"
""" """
# always allow capability requests # always allow capability requests
if not capability.startswith('cap'): if not capability.startswith('cap'):
# check that we are posting to a permitted domain # check that we are posting to a permitted domain
if not urlPermitted(inboxUrl,federationList,capability): if not urlPermitted(inboxUrl, federationList, capability):
print('postJson: '+inboxUrl+' not permitted') print('postJson: ' + inboxUrl + ' not permitted')
return None return None
if not (attachImageFilename.endswith('.jpg') or \ if not (attachImageFilename.endswith('.jpg') or
attachImageFilename.endswith('.jpeg') or \ attachImageFilename.endswith('.jpeg') or
attachImageFilename.endswith('.png') or \ attachImageFilename.endswith('.png') or
attachImageFilename.endswith('.gif')): attachImageFilename.endswith('.gif')):
print('Image must be png, jpg, or gif') print('Image must be png, jpg, or gif')
return None return None
if not os.path.isfile(attachImageFilename): if not os.path.isfile(attachImageFilename):
print('Image not found: '+attachImageFilename) print('Image not found: ' + attachImageFilename)
return None return None
contentType='image/jpeg' contentType = 'image/jpeg'
if attachImageFilename.endswith('.png'): if attachImageFilename.endswith('.png'):
contentType='image/png' contentType = 'image/png'
if attachImageFilename.endswith('.gif'): if attachImageFilename.endswith('.gif'):
contentType='image/gif' contentType = 'image/gif'
headers['Content-type']=contentType headers['Content-type'] = contentType
with open(attachImageFilename, 'rb') as avFile: with open(attachImageFilename, 'rb') as avFile:
mediaBinary=avFile.read() mediaBinary = avFile.read()
postResult=session.post(url=inboxUrl,data=mediaBinary,headers=headers) postResult = session.post(url=inboxUrl, data=mediaBinary,
headers=headers)
if postResult: if postResult:
return postResult.text return postResult.text
return None return None