mirror of https://gitlab.com/bashrc2/epicyon
Snake case
parent
47e648e516
commit
1c62dcd906
54
media.py
54
media.py
|
@ -90,13 +90,13 @@ def replaceTwitter(post_json_object: {}, replacementDomain: str,
|
|||
replacementDomain, system_language)
|
||||
|
||||
|
||||
def _removeMetaData(imageFilename: str, outputFilename: str) -> None:
|
||||
def _removeMetaData(image_filename: str, outputFilename: str) -> None:
|
||||
"""Attempts to do this with pure python didn't work well,
|
||||
so better to use a dedicated tool if one is installed
|
||||
"""
|
||||
copyfile(imageFilename, outputFilename)
|
||||
copyfile(image_filename, outputFilename)
|
||||
if not os.path.isfile(outputFilename):
|
||||
print('ERROR: unable to remove metadata from ' + imageFilename)
|
||||
print('ERROR: unable to remove metadata from ' + image_filename)
|
||||
return
|
||||
if os.path.isfile('/usr/bin/exiftool'):
|
||||
print('Removing metadata from ' + outputFilename + ' using exiftool')
|
||||
|
@ -160,10 +160,10 @@ def _spoofMetaData(base_dir: str, nickname: str, domain: str,
|
|||
return
|
||||
|
||||
|
||||
def convertImageToLowBandwidth(imageFilename: str) -> None:
|
||||
def convertImageToLowBandwidth(image_filename: str) -> None:
|
||||
"""Converts an image to a low bandwidth version
|
||||
"""
|
||||
low_bandwidthFilename = imageFilename + '.low'
|
||||
low_bandwidthFilename = image_filename + '.low'
|
||||
if os.path.isfile(low_bandwidthFilename):
|
||||
try:
|
||||
os.remove(low_bandwidthFilename)
|
||||
|
@ -174,7 +174,7 @@ def convertImageToLowBandwidth(imageFilename: str) -> None:
|
|||
cmd = \
|
||||
'/usr/bin/convert +noise Multiplicative ' + \
|
||||
'-evaluate median 10% -dither Floyd-Steinberg ' + \
|
||||
'-monochrome ' + imageFilename + ' ' + low_bandwidthFilename
|
||||
'-monochrome ' + image_filename + ' ' + low_bandwidthFilename
|
||||
print('Low bandwidth image conversion: ' + cmd)
|
||||
subprocess.call(cmd, shell=True)
|
||||
# wait for conversion to happen
|
||||
|
@ -188,43 +188,43 @@ def convertImageToLowBandwidth(imageFilename: str) -> None:
|
|||
break
|
||||
if os.path.isfile(low_bandwidthFilename):
|
||||
try:
|
||||
os.remove(imageFilename)
|
||||
os.remove(image_filename)
|
||||
except OSError:
|
||||
print('EX: convertImageToLowBandwidth unable to delete ' +
|
||||
imageFilename)
|
||||
os.rename(low_bandwidthFilename, imageFilename)
|
||||
if os.path.isfile(imageFilename):
|
||||
print('Image converted to low bandwidth ' + imageFilename)
|
||||
image_filename)
|
||||
os.rename(low_bandwidthFilename, image_filename)
|
||||
if os.path.isfile(image_filename):
|
||||
print('Image converted to low bandwidth ' + image_filename)
|
||||
else:
|
||||
print('Low bandwidth converted image not found: ' +
|
||||
low_bandwidthFilename)
|
||||
|
||||
|
||||
def processMetaData(base_dir: str, nickname: str, domain: str,
|
||||
imageFilename: str, outputFilename: str,
|
||||
image_filename: str, outputFilename: str,
|
||||
city: str, content_license_url: str) -> None:
|
||||
"""Handles image metadata. This tries to spoof the metadata
|
||||
if possible, but otherwise just removes it
|
||||
"""
|
||||
# first remove the metadata
|
||||
_removeMetaData(imageFilename, outputFilename)
|
||||
_removeMetaData(image_filename, outputFilename)
|
||||
|
||||
# now add some spoofed data to misdirect surveillance capitalists
|
||||
_spoofMetaData(base_dir, nickname, domain, outputFilename, city,
|
||||
content_license_url)
|
||||
|
||||
|
||||
def _isMedia(imageFilename: str) -> bool:
|
||||
def _isMedia(image_filename: str) -> bool:
|
||||
"""Is the given file a media file?
|
||||
"""
|
||||
if not os.path.isfile(imageFilename):
|
||||
print('WARN: Media file does not exist ' + imageFilename)
|
||||
if not os.path.isfile(image_filename):
|
||||
print('WARN: Media file does not exist ' + image_filename)
|
||||
return False
|
||||
permittedMedia = get_media_extensions()
|
||||
for m in permittedMedia:
|
||||
if imageFilename.endswith('.' + m):
|
||||
if image_filename.endswith('.' + m):
|
||||
return True
|
||||
print('WARN: ' + imageFilename + ' is not a permitted media type')
|
||||
print('WARN: ' + image_filename + ' is not a permitted media type')
|
||||
return False
|
||||
|
||||
|
||||
|
@ -295,20 +295,20 @@ def _updateEtag(mediaFilename: str) -> None:
|
|||
|
||||
def attachMedia(base_dir: str, http_prefix: str,
|
||||
nickname: str, domain: str, port: int,
|
||||
postJson: {}, imageFilename: str,
|
||||
postJson: {}, image_filename: str,
|
||||
mediaType: str, description: str,
|
||||
city: str, low_bandwidth: bool,
|
||||
content_license_url: str) -> {}:
|
||||
"""Attaches media to a json object post
|
||||
The description can be None
|
||||
"""
|
||||
if not _isMedia(imageFilename):
|
||||
if not _isMedia(image_filename):
|
||||
return postJson
|
||||
|
||||
fileExtension = None
|
||||
acceptedTypes = get_media_extensions()
|
||||
for mType in acceptedTypes:
|
||||
if imageFilename.endswith('.' + mType):
|
||||
if image_filename.endswith('.' + mType):
|
||||
if mType == 'jpg':
|
||||
mType = 'jpeg'
|
||||
if mType == 'mp3':
|
||||
|
@ -344,7 +344,7 @@ def attachMedia(base_dir: str, http_prefix: str,
|
|||
attachmentJson['blurhash'] = _getBlurHash()
|
||||
# find the dimensions of the image and add them as metadata
|
||||
attachImageWidth, attachImageHeight = \
|
||||
getImageDimensions(imageFilename)
|
||||
getImageDimensions(image_filename)
|
||||
if attachImageWidth and attachImageHeight:
|
||||
attachmentJson['width'] = attachImageWidth
|
||||
attachmentJson['height'] = attachImageHeight
|
||||
|
@ -354,12 +354,12 @@ def attachMedia(base_dir: str, http_prefix: str,
|
|||
if base_dir:
|
||||
if mediaType.startswith('image/'):
|
||||
if low_bandwidth:
|
||||
convertImageToLowBandwidth(imageFilename)
|
||||
convertImageToLowBandwidth(image_filename)
|
||||
processMetaData(base_dir, nickname, domain,
|
||||
imageFilename, mediaFilename, city,
|
||||
image_filename, mediaFilename, city,
|
||||
content_license_url)
|
||||
else:
|
||||
copyfile(imageFilename, mediaFilename)
|
||||
copyfile(image_filename, mediaFilename)
|
||||
_updateEtag(mediaFilename)
|
||||
|
||||
return postJson
|
||||
|
@ -408,12 +408,12 @@ def pathIsAudio(path: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def getImageDimensions(imageFilename: str) -> (int, int):
|
||||
def getImageDimensions(image_filename: str) -> (int, int):
|
||||
"""Returns the dimensions of an image file
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(['identify', '-format', '"%wx%h"',
|
||||
imageFilename], stdout=subprocess.PIPE)
|
||||
image_filename], stdout=subprocess.PIPE)
|
||||
except BaseException:
|
||||
print('EX: getImageDimensions unable to run identify command')
|
||||
return None, None
|
||||
|
|
24
person.py
24
person.py
|
@ -93,19 +93,19 @@ def generateRSAKey() -> (str, str):
|
|||
|
||||
def setProfileImage(base_dir: str, http_prefix: str,
|
||||
nickname: str, domain: str,
|
||||
port: int, imageFilename: str, imageType: str,
|
||||
port: int, image_filename: str, imageType: str,
|
||||
resolution: str, city: str,
|
||||
content_license_url: str) -> bool:
|
||||
"""Saves the given image file as an avatar or background
|
||||
image for the given person
|
||||
"""
|
||||
imageFilename = imageFilename.replace('\n', '').replace('\r', '')
|
||||
if not isImageFile(imageFilename):
|
||||
image_filename = image_filename.replace('\n', '').replace('\r', '')
|
||||
if not isImageFile(image_filename):
|
||||
print('Profile image must be png, jpg, gif or svg format')
|
||||
return False
|
||||
|
||||
if imageFilename.startswith('~/'):
|
||||
imageFilename = imageFilename.replace('~/', str(Path.home()) + '/')
|
||||
if image_filename.startswith('~/'):
|
||||
image_filename = image_filename.replace('~/', str(Path.home()) + '/')
|
||||
|
||||
domain = removeDomainPort(domain)
|
||||
fullDomain = get_full_domain(domain, port)
|
||||
|
@ -127,20 +127,20 @@ def setProfileImage(base_dir: str, http_prefix: str,
|
|||
|
||||
mediaType = 'image/png'
|
||||
iconFilename = iconFilenameBase + '.png'
|
||||
if imageFilename.endswith('.jpg') or \
|
||||
imageFilename.endswith('.jpeg'):
|
||||
if image_filename.endswith('.jpg') or \
|
||||
image_filename.endswith('.jpeg'):
|
||||
mediaType = 'image/jpeg'
|
||||
iconFilename = iconFilenameBase + '.jpg'
|
||||
elif imageFilename.endswith('.gif'):
|
||||
elif image_filename.endswith('.gif'):
|
||||
mediaType = 'image/gif'
|
||||
iconFilename = iconFilenameBase + '.gif'
|
||||
elif imageFilename.endswith('.webp'):
|
||||
elif image_filename.endswith('.webp'):
|
||||
mediaType = 'image/webp'
|
||||
iconFilename = iconFilenameBase + '.webp'
|
||||
elif imageFilename.endswith('.avif'):
|
||||
elif image_filename.endswith('.avif'):
|
||||
mediaType = 'image/avif'
|
||||
iconFilename = iconFilenameBase + '.avif'
|
||||
elif imageFilename.endswith('.svg'):
|
||||
elif image_filename.endswith('.svg'):
|
||||
mediaType = 'image/svg+xml'
|
||||
iconFilename = iconFilenameBase + '.svg'
|
||||
profileFilename = base_dir + '/accounts/' + handle + '/' + iconFilename
|
||||
|
@ -154,7 +154,7 @@ def setProfileImage(base_dir: str, http_prefix: str,
|
|||
saveJson(personJson, personFilename)
|
||||
|
||||
cmd = \
|
||||
'/usr/bin/convert ' + imageFilename + ' -size ' + \
|
||||
'/usr/bin/convert ' + image_filename + ' -size ' + \
|
||||
resolution + ' -quality 50 ' + profileFilename
|
||||
subprocess.call(cmd, shell=True)
|
||||
processMetaData(base_dir, nickname, domain,
|
||||
|
|
12
session.py
12
session.py
|
@ -440,7 +440,7 @@ def postImage(session, attachImageFilename: str, federation_list: [],
|
|||
|
||||
|
||||
def downloadImage(session, base_dir: str, url: str,
|
||||
imageFilename: str, debug: bool,
|
||||
image_filename: str, debug: bool,
|
||||
force: bool = False) -> bool:
|
||||
"""Downloads an image with an expected mime type
|
||||
"""
|
||||
|
@ -472,7 +472,7 @@ def downloadImage(session, base_dir: str, url: str,
|
|||
print('downloadImage: no session headers')
|
||||
return False
|
||||
|
||||
if not os.path.isfile(imageFilename) or force:
|
||||
if not os.path.isfile(image_filename) or force:
|
||||
try:
|
||||
if debug:
|
||||
print('Downloading image url: ' + url)
|
||||
|
@ -485,14 +485,14 @@ def downloadImage(session, base_dir: str, url: str,
|
|||
print('Image download failed with status ' +
|
||||
str(result.status_code))
|
||||
# remove partial download
|
||||
if os.path.isfile(imageFilename):
|
||||
if os.path.isfile(image_filename):
|
||||
try:
|
||||
os.remove(imageFilename)
|
||||
os.remove(image_filename)
|
||||
except OSError:
|
||||
print('EX: downloadImage unable to delete ' +
|
||||
imageFilename)
|
||||
image_filename)
|
||||
else:
|
||||
with open(imageFilename, 'wb') as f:
|
||||
with open(image_filename, 'wb') as f:
|
||||
f.write(result.content)
|
||||
if debug:
|
||||
print('Image downloaded from ' + url)
|
||||
|
|
40
shares.py
40
shares.py
|
@ -302,7 +302,7 @@ def _indicateNewShareAvailable(base_dir: str, http_prefix: str,
|
|||
|
||||
def addShare(base_dir: str,
|
||||
http_prefix: str, nickname: str, domain: str, port: int,
|
||||
displayName: str, summary: str, imageFilename: str,
|
||||
displayName: str, summary: str, image_filename: str,
|
||||
itemQty: float, itemType: str, itemCategory: str, location: str,
|
||||
duration: str, debug: bool, city: str,
|
||||
price: str, currency: str,
|
||||
|
@ -336,20 +336,20 @@ def addShare(base_dir: str,
|
|||
# has an image for this share been uploaded?
|
||||
imageUrl = None
|
||||
moveImage = False
|
||||
if not imageFilename:
|
||||
if not image_filename:
|
||||
sharesImageFilename = \
|
||||
acct_dir(base_dir, nickname, domain) + '/upload'
|
||||
formats = get_image_extensions()
|
||||
for ext in formats:
|
||||
if os.path.isfile(sharesImageFilename + '.' + ext):
|
||||
imageFilename = sharesImageFilename + '.' + ext
|
||||
image_filename = sharesImageFilename + '.' + ext
|
||||
moveImage = True
|
||||
|
||||
domain_full = get_full_domain(domain, port)
|
||||
|
||||
# copy or move the image for the shared item to its destination
|
||||
if imageFilename:
|
||||
if os.path.isfile(imageFilename):
|
||||
if image_filename:
|
||||
if os.path.isfile(image_filename):
|
||||
if not os.path.isdir(base_dir + '/sharefiles'):
|
||||
os.mkdir(base_dir + '/sharefiles')
|
||||
if not os.path.isdir(base_dir + '/sharefiles/' + nickname):
|
||||
|
@ -357,19 +357,19 @@ def addShare(base_dir: str,
|
|||
itemIDfile = base_dir + '/sharefiles/' + nickname + '/' + itemID
|
||||
formats = get_image_extensions()
|
||||
for ext in formats:
|
||||
if not imageFilename.endswith('.' + ext):
|
||||
if not image_filename.endswith('.' + ext):
|
||||
continue
|
||||
if low_bandwidth:
|
||||
convertImageToLowBandwidth(imageFilename)
|
||||
convertImageToLowBandwidth(image_filename)
|
||||
processMetaData(base_dir, nickname, domain,
|
||||
imageFilename, itemIDfile + '.' + ext,
|
||||
image_filename, itemIDfile + '.' + ext,
|
||||
city, content_license_url)
|
||||
if moveImage:
|
||||
try:
|
||||
os.remove(imageFilename)
|
||||
os.remove(image_filename)
|
||||
except OSError:
|
||||
print('EX: addShare unable to delete ' +
|
||||
str(imageFilename))
|
||||
str(image_filename))
|
||||
imageUrl = \
|
||||
http_prefix + '://' + domain_full + \
|
||||
'/sharefiles/' + nickname + '/' + itemID + '.' + ext
|
||||
|
@ -555,7 +555,7 @@ def sendShareViaServer(base_dir, session,
|
|||
fromNickname: str, password: str,
|
||||
fromDomain: str, fromPort: int,
|
||||
http_prefix: str, displayName: str,
|
||||
summary: str, imageFilename: str,
|
||||
summary: str, image_filename: str,
|
||||
itemQty: float, itemType: str, itemCategory: str,
|
||||
location: str, duration: str,
|
||||
cached_webfingers: {}, person_cache: {},
|
||||
|
@ -647,13 +647,13 @@ def sendShareViaServer(base_dir, session,
|
|||
|
||||
authHeader = createBasicAuthHeader(fromNickname, password)
|
||||
|
||||
if imageFilename:
|
||||
if image_filename:
|
||||
headers = {
|
||||
'host': fromDomain,
|
||||
'Authorization': authHeader
|
||||
}
|
||||
postResult = \
|
||||
postImage(session, imageFilename, [],
|
||||
postImage(session, image_filename, [],
|
||||
inboxUrl.replace('/' + postToBox, '/shares'),
|
||||
headers)
|
||||
|
||||
|
@ -775,7 +775,7 @@ def sendWantedViaServer(base_dir, session,
|
|||
fromNickname: str, password: str,
|
||||
fromDomain: str, fromPort: int,
|
||||
http_prefix: str, displayName: str,
|
||||
summary: str, imageFilename: str,
|
||||
summary: str, image_filename: str,
|
||||
itemQty: float, itemType: str, itemCategory: str,
|
||||
location: str, duration: str,
|
||||
cached_webfingers: {}, person_cache: {},
|
||||
|
@ -867,13 +867,13 @@ def sendWantedViaServer(base_dir, session,
|
|||
|
||||
authHeader = createBasicAuthHeader(fromNickname, password)
|
||||
|
||||
if imageFilename:
|
||||
if image_filename:
|
||||
headers = {
|
||||
'host': fromDomain,
|
||||
'Authorization': authHeader
|
||||
}
|
||||
postResult = \
|
||||
postImage(session, imageFilename, [],
|
||||
postImage(session, image_filename, [],
|
||||
inboxUrl.replace('/' + postToBox, '/wanted'),
|
||||
headers)
|
||||
|
||||
|
@ -1073,9 +1073,9 @@ def outboxShareUpload(base_dir: str, http_prefix: str,
|
|||
location = ''
|
||||
if message_json['object'].get('location'):
|
||||
location = message_json['object']['location']
|
||||
imageFilename = None
|
||||
if message_json['object'].get('imageFilename'):
|
||||
imageFilename = message_json['object']['imageFilename']
|
||||
image_filename = None
|
||||
if message_json['object'].get('image_filename'):
|
||||
image_filename = message_json['object']['image_filename']
|
||||
if debug:
|
||||
print('Adding shared item')
|
||||
pprint(message_json)
|
||||
|
@ -1084,7 +1084,7 @@ def outboxShareUpload(base_dir: str, http_prefix: str,
|
|||
http_prefix, nickname, domain, port,
|
||||
message_json['object']['displayName'],
|
||||
message_json['object']['summary'],
|
||||
imageFilename,
|
||||
image_filename,
|
||||
itemQty,
|
||||
message_json['object']['itemType'],
|
||||
message_json['object']['category'],
|
||||
|
|
4
utils.py
4
utils.py
|
@ -360,7 +360,7 @@ def get_image_extensions() -> []:
|
|||
return ('png', 'jpg', 'jpeg', 'gif', 'webp', 'avif', 'svg', 'ico')
|
||||
|
||||
|
||||
def get_image_mime_type(imageFilename: str) -> str:
|
||||
def get_image_mime_type(image_filename: str) -> str:
|
||||
"""Returns the mime type for the given image
|
||||
"""
|
||||
extensionsToMime = {
|
||||
|
@ -373,7 +373,7 @@ def get_image_mime_type(imageFilename: str) -> str:
|
|||
'ico': 'x-icon'
|
||||
}
|
||||
for ext, mimeExt in extensionsToMime.items():
|
||||
if imageFilename.endswith('.' + ext):
|
||||
if image_filename.endswith('.' + ext):
|
||||
return 'image/' + mimeExt
|
||||
return 'image/png'
|
||||
|
||||
|
|
Loading…
Reference in New Issue