mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			148 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "session.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.1.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@freedombone.net"
 | |
| __status__ = "Production"
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import requests
 | |
| from utils import urlPermitted
 | |
| import json
 | |
| 
 | |
| baseDirectory=None
 | |
| 
 | |
| def createSession(onionRoute: bool):
 | |
|     session = requests.session()
 | |
|     if onionRoute:
 | |
|         session.proxies = {}
 | |
|         session.proxies['http'] = 'socks5h://localhost:9050'
 | |
|         session.proxies['https'] = 'socks5h://localhost:9050'
 | |
|     return session
 | |
| 
 | |
| def getJson(session,url: str,headers: {},params: {}, \
 | |
|             version='1.0.0',httpPrefix='https',domain='testdomain') -> {}:
 | |
|     if not isinstance(url, str):
 | |
|         print('url: '+str(url))
 | |
|         print('ERROR: getJson url should be a string')
 | |
|         return None
 | |
|     sessionParams={}
 | |
|     sessionHeaders={}
 | |
|     if headers:
 | |
|         sessionHeaders=headers
 | |
|     if params:
 | |
|         sessionParams=params
 | |
|     sessionHeaders['User-Agent']='Epicyon/'+version
 | |
|     if domain:
 | |
|         sessionHeaders['User-Agent']+='; +'+httpPrefix+'://'+domain+'/'
 | |
|     #"Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5)"
 | |
|     if not session:
 | |
|         print('WARN: no session specified for getJson')
 | |
|     #session.cookies.clear()
 | |
|     try:
 | |
|         result=session.get(url, headers=sessionHeaders, params=sessionParams)
 | |
|         return result.json()
 | |
|     except Exception as e:
 | |
|         print('ERROR: getJson failed')
 | |
|         print('url: '+str(url))
 | |
|         print('headers: '+str(sessionHeaders))
 | |
|         print('params: '+str(sessionParams))
 | |
|         print(e)
 | |
|     return None
 | |
| 
 | |
| def postJson(session,postJsonObject: {},federationList: [], \
 | |
|              inboxUrl: str,headers: {},capability: str) -> str:
 | |
|     """Post a json message to the inbox of another person
 | |
|     Supplying a capability, such as "inbox:write"
 | |
|     """
 | |
| 
 | |
|     # always allow capability requests
 | |
|     if not capability.startswith('cap'):    
 | |
|         # check that we are posting to a permitted domain
 | |
|         if not urlPermitted(inboxUrl,federationList,capability):
 | |
|             print('postJson: '+inboxUrl+' not permitted')
 | |
|             return None
 | |
| 
 | |
|     postResult= \
 | |
|         session.post(url=inboxUrl, \
 | |
|                      data=json.dumps(postJsonObject), \
 | |
|                      headers=headers)
 | |
|     if postResult:
 | |
|         return postResult.text
 | |
|     return None
 | |
| 
 | |
| def postJsonString(session,postJsonStr: str, \
 | |
|                    federationList: [], \
 | |
|                    inboxUrl: str, \
 | |
|                    headers: {}, \
 | |
|                    capability: str, \
 | |
|                    debug: bool) -> (bool,bool):
 | |
|     """Post a json message string to the inbox of another person
 | |
|     Supplying a capability, such as "inbox:write"
 | |
|     The second boolean returned is true if the send is unauthorized
 | |
|     NOTE: Here we post a string rather than the original json so that
 | |
|     conversions between string and json format don't invalidate
 | |
|     the message body digest of http signatures
 | |
|     """
 | |
| 
 | |
|     # always allow capability requests
 | |
|     if not capability.startswith('cap'):    
 | |
|         # check that we are posting to a permitted domain
 | |
|         if not urlPermitted(inboxUrl,federationList,capability):
 | |
|             print('postJson: '+inboxUrl+' not permitted by capabilities')
 | |
|             return None,None
 | |
| 
 | |
|     postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
 | |
|     if postResult.status_code<200 or postResult.status_code>202:
 | |
|         #if postResult.status_code==400:
 | |
|         #    headers['content-type']='application/ld+json'
 | |
|         #    postResult = session.post(url = inboxUrl, data = postJsonStr, headers=headers)
 | |
|         #    if not (postResult.status_code<200 or postResult.status_code>202):
 | |
|         #        return True
 | |
|         if postResult.status_code>=400 and postResult.status_code<=405 and \
 | |
|            postResult.status_code!=404:
 | |
|             print('WARN: >>> Post to '+inboxUrl+' is unauthorized. Code '+str(postResult.status_code)+' <<<')
 | |
|             return False,True
 | |
|         else:
 | |
|             print('WARN: Failed to post to '+inboxUrl+' with headers '+str(headers))
 | |
|             print('status code '+str(postResult.status_code))
 | |
|             return False,False
 | |
|     return True,False
 | |
| 
 | |
| def postImage(session,attachImageFilename: str,federationList: [], \
 | |
|               inboxUrl: str,headers: {},capability: str) -> str:
 | |
|     """Post an image to the inbox of another person or outbox via c2s
 | |
|     Supplying a capability, such as "inbox:write"
 | |
|     """
 | |
|     # always allow capability requests
 | |
|     if not capability.startswith('cap'):    
 | |
|         # check that we are posting to a permitted domain
 | |
|         if not urlPermitted(inboxUrl,federationList,capability):
 | |
|             print('postJson: '+inboxUrl+' not permitted')
 | |
|             return None
 | |
| 
 | |
|     if not (attachImageFilename.endswith('.jpg') or \
 | |
|             attachImageFilename.endswith('.jpeg') or \
 | |
|             attachImageFilename.endswith('.png') or \
 | |
|             attachImageFilename.endswith('.gif')):
 | |
|         print('Image must be png, jpg, or gif')
 | |
|         return None
 | |
|     if not os.path.isfile(attachImageFilename):
 | |
|         print('Image not found: '+attachImageFilename)
 | |
|         return None
 | |
|     contentType='image/jpeg'
 | |
|     if attachImageFilename.endswith('.png'):
 | |
|         contentType='image/png'
 | |
|     if attachImageFilename.endswith('.gif'):
 | |
|         contentType='image/gif'
 | |
|     headers['Content-type']=contentType
 | |
| 
 | |
|     with open(attachImageFilename, 'rb') as avFile:
 | |
|         mediaBinary = avFile.read()
 | |
|         postResult = session.post(url=inboxUrl, data=mediaBinary, headers=headers)
 | |
|         if postResult:
 | |
|             return postResult.text
 | |
|     return None
 |