From d652485f8a7826d61f5008c4e450f7494cc56185 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Sat, 4 Apr 2020 12:00:19 +0100 Subject: [PATCH] flake8 format --- session.py | 154 +++++++++++++++++++++++++++-------------------------- 1 file changed, 78 insertions(+), 76 deletions(-) diff --git a/session.py b/session.py index 9fb08e1e..49d20813 100644 --- a/session.py +++ b/session.py @@ -1,84 +1,86 @@ -__filename__="session.py" -__author__="Bob Mottram" -__license__="AGPL3+" -__version__="1.1.0" -__maintainer__="Bob Mottram" -__email__="bob@freedombone.net" -__status__="Production" +__filename__ = "session.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.1.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@freedombone.net" +__status__ = "Production" import os -import sys import requests from utils import urlPermitted import json -baseDirectory=None +baseDirectory = None + def createSession(onionRoute: bool): - session=requests.session() + session = requests.session() if onionRoute: - session.proxies={} - session.proxies['http']='socks5h://localhost:9050' - session.proxies['https']='socks5h://localhost:9050' + session.proxies = {} + session.proxies['http'] = 'socks5h://localhost:9050' + session.proxies['https'] = 'socks5h://localhost:9050' return session -def getJson(session,url: str,headers: {},params: {}, \ - version='1.0.0',httpPrefix='https',domain='testdomain') -> {}: + +def getJson(session, url: str, headers: {}, params: {}, + version='1.0.0', httpPrefix='https', + domain='testdomain') -> {}: if not isinstance(url, str): - print('url: '+str(url)) + print('url: ' + str(url)) print('ERROR: getJson url should be a string') return None - sessionParams={} - sessionHeaders={} + sessionParams = {} + sessionHeaders = {} if headers: - sessionHeaders=headers + sessionHeaders = headers if params: - sessionParams=params - sessionHeaders['User-Agent']='Epicyon/'+version + sessionParams = params + sessionHeaders['User-Agent'] = 'Epicyon/' + version if domain: - sessionHeaders['User-Agent']+='; +'+httpPrefix+'://'+domain+'/' - #"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)" + sessionHeaders['User-Agent'] += \ + '; +' + httpPrefix + '://' + domain + '/' if not session: print('WARN: no session specified for getJson') - #session.cookies.clear() try: - result=session.get(url, headers=sessionHeaders, params=sessionParams) + result = session.get(url, headers=sessionHeaders, params=sessionParams) return result.json() except Exception as e: print('ERROR: getJson failed') - print('url: '+str(url)) - print('headers: '+str(sessionHeaders)) - print('params: '+str(sessionParams)) + print('url: ' + str(url)) + print('headers: ' + str(sessionHeaders)) + print('params: ' + str(sessionParams)) print(e) return None -def postJson(session,postJsonObject: {},federationList: [], \ - inboxUrl: str,headers: {},capability: str) -> str: + +def postJson(session, postJsonObject: {}, federationList: [], + inboxUrl: str, headers: {}, capability: str) -> str: """Post a json message to the inbox of another person Supplying a capability, such as "inbox:write" """ - # always allow capability requests if not capability.startswith('cap'): # check that we are posting to a permitted domain - if not urlPermitted(inboxUrl,federationList,capability): - print('postJson: '+inboxUrl+' not permitted') + if not urlPermitted(inboxUrl, federationList, capability): + print('postJson: ' + inboxUrl + ' not permitted') return None - postResult= \ - session.post(url=inboxUrl, \ - data=json.dumps(postJsonObject), \ + postResult = \ + session.post(url=inboxUrl, + data=json.dumps(postJsonObject), headers=headers) if postResult: return postResult.text return None -def postJsonString(session,postJsonStr: str, \ - federationList: [], \ - inboxUrl: str, \ - headers: {}, \ - capability: str, \ - debug: bool) -> (bool,bool): + +def postJsonString(session, postJsonStr: str, + federationList: [], + inboxUrl: str, + headers: {}, + capability: str, + debug: bool) -> (bool, bool): """Post a json message string to the inbox of another person Supplying a capability, such as "inbox:write" The second boolean returned is true if the send is unauthorized @@ -86,63 +88,63 @@ def postJsonString(session,postJsonStr: str, \ conversions between string and json format don't invalidate the message body digest of http signatures """ - # always allow capability requests if not capability.startswith('cap'): # check that we are posting to a permitted domain - if not urlPermitted(inboxUrl,federationList,capability): - print('postJson: '+inboxUrl+' not permitted by capabilities') - return None,None + if not urlPermitted(inboxUrl, federationList, capability): + print('postJson: ' + inboxUrl + ' not permitted by capabilities') + return None, None - postResult= \ - session.post(url=inboxUrl,data=postJsonStr,headers=headers) - if postResult.status_code<200 or postResult.status_code>202: - #if postResult.status_code==400: - # headers['content-type']='application/ld+json' - # postResult=session.post(url=inboxUrl,data=postJsonStr,headers=headers) - # if not (postResult.status_code<200 or postResult.status_code>202): - # return True - if postResult.status_code>=400 and postResult.status_code<=405 and \ - postResult.status_code!=404: - print('WARN: >>> Post to '+inboxUrl+' is unauthorized. Code '+str(postResult.status_code)+' <<<') - return False,True + postResult = \ + session.post(url=inboxUrl, data=postJsonStr, headers=headers) + if postResult.status_code < 200 or postResult.status_code > 202: + if postResult.status_code >= 400 and \ + postResult.status_code <= 405 and \ + postResult.status_code != 404: + print('WARN: >>> Post to ' + inboxUrl + + ' is unauthorized. Code ' + + str(postResult.status_code) + ' <<<') + return False, True else: - print('WARN: Failed to post to '+inboxUrl+' with headers '+str(headers)) - print('status code '+str(postResult.status_code)) - return False,False - return True,False + print('WARN: Failed to post to ' + inboxUrl + + ' with headers ' + str(headers)) + print('status code ' + str(postResult.status_code)) + return False, False + return True, False -def postImage(session,attachImageFilename: str,federationList: [], \ - inboxUrl: str,headers: {},capability: str) -> str: + +def postImage(session, attachImageFilename: str, federationList: [], + inboxUrl: str, headers: {}, capability: str) -> str: """Post an image to the inbox of another person or outbox via c2s Supplying a capability, such as "inbox:write" """ # always allow capability requests if not capability.startswith('cap'): # check that we are posting to a permitted domain - if not urlPermitted(inboxUrl,federationList,capability): - print('postJson: '+inboxUrl+' not permitted') + if not urlPermitted(inboxUrl, federationList, capability): + print('postJson: ' + inboxUrl + ' not permitted') return None - if not (attachImageFilename.endswith('.jpg') or \ - attachImageFilename.endswith('.jpeg') or \ - attachImageFilename.endswith('.png') or \ + if not (attachImageFilename.endswith('.jpg') or + attachImageFilename.endswith('.jpeg') or + attachImageFilename.endswith('.png') or attachImageFilename.endswith('.gif')): print('Image must be png, jpg, or gif') return None if not os.path.isfile(attachImageFilename): - print('Image not found: '+attachImageFilename) + print('Image not found: ' + attachImageFilename) return None - contentType='image/jpeg' + contentType = 'image/jpeg' if attachImageFilename.endswith('.png'): - contentType='image/png' + contentType = 'image/png' if attachImageFilename.endswith('.gif'): - contentType='image/gif' - headers['Content-type']=contentType + contentType = 'image/gif' + headers['Content-type'] = contentType with open(attachImageFilename, 'rb') as avFile: - mediaBinary=avFile.read() - postResult=session.post(url=inboxUrl,data=mediaBinary,headers=headers) + mediaBinary = avFile.read() + postResult = session.post(url=inboxUrl, data=mediaBinary, + headers=headers) if postResult: return postResult.text return None