flake8 format

main
Bob Mottram 2020-04-03 16:55:55 +00:00
parent e2e2db91d4
commit 840e726753
1 changed files with 95 additions and 81 deletions

176
media.py
View File

@ -1,17 +1,15 @@
__filename__="media.py" __filename__ = "media.py"
__author__="Bob Mottram" __author__ = "Bob Mottram"
__license__="AGPL3+" __license__ = "AGPL3+"
__version__="1.1.0" __version__ = "1.1.0"
__maintainer__="Bob Mottram" __maintainer__ = "Bob Mottram"
__email__="bob@freedombone.net" __email__ = "bob@freedombone.net"
__status__="Production" __status__ = "Production"
from blurhash import blurhash_encode as blurencode from blurhash import blurhash_encode as blurencode
from PIL import Image from PIL import Image
import numpy import numpy
import os import os
import sys
import json
import datetime import datetime
from hashlib import sha1 from hashlib import sha1
from auth import createPassword from auth import createPassword
@ -19,6 +17,7 @@ from shutil import copyfile
from shutil import rmtree from shutil import rmtree
from shutil import move from shutil import move
def replaceYouTube(postJsonObject: {}) -> None: def replaceYouTube(postJsonObject: {}) -> None:
"""Replace YouTube with invidio.us """Replace YouTube with invidio.us
This denies Google some, but not all, tracking data This denies Google some, but not all, tracking data
@ -29,62 +28,72 @@ def replaceYouTube(postJsonObject: {}) -> None:
return return
if 'www.youtube.com' not in postJsonObject['object']['content']: if 'www.youtube.com' not in postJsonObject['object']['content']:
return return
postJsonObject['object']['content']= \ postJsonObject['object']['content'] = \
postJsonObject['object']['content'].replace('www.youtube.com','invidio.us') postJsonObject['object']['content'].replace('www.youtube.com',
'invidio.us')
def removeMetaData(imageFilename: str,outputFilename: str) -> None:
def removeMetaData(imageFilename: str, outputFilename: str) -> None:
"""Attempts to do this with pure python didn't work well, """Attempts to do this with pure python didn't work well,
so better to use a dedicated tool if one is installed so better to use a dedicated tool if one is installed
""" """
copyfile(imageFilename,outputFilename) copyfile(imageFilename, outputFilename)
if os.path.isfile('/usr/bin/exiftool'): if os.path.isfile('/usr/bin/exiftool'):
print('Removing metadata from '+outputFilename+' using exiftool') print('Removing metadata from ' + outputFilename + ' using exiftool')
os.system('exiftool -all= '+outputFilename) os.system('exiftool -all= ' + outputFilename)
elif os.path.isfile('/usr/bin/mogrify'): elif os.path.isfile('/usr/bin/mogrify'):
print('Removing metadata from '+outputFilename+' using mogrify') print('Removing metadata from ' + outputFilename + ' using mogrify')
os.system('/usr/bin/mogrify -strip '+outputFilename) os.system('/usr/bin/mogrify -strip '+outputFilename)
def getImageHash(imageFilename: str) -> str: def getImageHash(imageFilename: str) -> str:
return blurencode(numpy.array(Image.open(imageFilename).convert("RGB"))) return blurencode(numpy.array(Image.open(imageFilename).convert("RGB")))
def isMedia(imageFilename: str) -> bool: def isMedia(imageFilename: str) -> bool:
permittedMedia=['png','jpg','gif','webp','mp4','ogv','mp3','ogg'] permittedMedia = ('png', 'jpg', 'gif', 'webp',
'mp4', 'ogv', 'mp3', 'ogg')
for m in permittedMedia: for m in permittedMedia:
if imageFilename.endswith('.'+m): if imageFilename.endswith('.' + m):
return True return True
print('WARN: '+imageFilename+' is not a permitted media type') print('WARN: ' + imageFilename + ' is not a permitted media type')
return False return False
def createMediaDirs(baseDir: str,mediaPath: str) -> None:
if not os.path.isdir(baseDir+'/media'): def createMediaDirs(baseDir: str, mediaPath: str) -> None:
os.mkdir(baseDir+'/media') if not os.path.isdir(baseDir + '/media'):
if not os.path.isdir(baseDir+'/'+mediaPath): os.mkdir(baseDir + '/media')
os.mkdir(baseDir+'/'+mediaPath) if not os.path.isdir(baseDir + '/' + mediaPath):
os.mkdir(baseDir + '/' + mediaPath)
def getMediaPath() -> str: def getMediaPath() -> str:
currTime=datetime.datetime.utcnow() currTime = datetime.datetime.utcnow()
weeksSinceEpoch=int((currTime - datetime.datetime(1970,1,1)).days/7) weeksSinceEpoch = int((currTime - datetime.datetime(1970, 1, 1)).days / 7)
return 'media/'+str(weeksSinceEpoch) return 'media/' + str(weeksSinceEpoch)
def getAttachmentMediaType(filename: str) -> str: def getAttachmentMediaType(filename: str) -> str:
"""Returns the type of media for the given file """Returns the type of media for the given file
image, video or audio image, video or audio
""" """
mediaType=None mediaType = None
imageTypes=['png','jpg','jpeg','gif','webp'] imageTypes = ('png', 'jpg', 'jpeg',
'gif', 'webp')
for mType in imageTypes: for mType in imageTypes:
if filename.endswith('.'+mType): if filename.endswith('.' + mType):
return 'image' return 'image'
videoTypes=['mp4','webm','ogv'] videoTypes = ('mp4', 'webm', 'ogv')
for mType in videoTypes: for mType in videoTypes:
if filename.endswith('.'+mType): if filename.endswith('.' + mType):
return 'video' return 'video'
audioTypes=['mp3','ogg'] audioTypes = ('mp3', 'ogg')
for mType in audioTypes: for mType in audioTypes:
if filename.endswith('.'+mType): if filename.endswith('.' + mType):
return 'audio' return 'audio'
return mediaType return mediaType
def updateEtag(mediaFilename: str) -> None: def updateEtag(mediaFilename: str) -> None:
""" calculate the etag, which is a sha1 of the data """ calculate the etag, which is a sha1 of the data
""" """
@ -97,104 +106,109 @@ def updateEtag(mediaFilename: str) -> None:
return return
# read the binary data # read the binary data
data=None data = None
try: try:
with open(mediaFilename, 'rb') as mediaFile: with open(mediaFilename, 'rb') as mediaFile:
data=mediaFile.read() data = mediaFile.read()
except: except BaseException:
pass pass
if not data: if not data:
return return
# calculate hash # calculate hash
etag=sha1(data).hexdigest() etag = sha1(data).hexdigest()
# save the hash # save the hash
try: try:
with open(mediaFilename+'.etag', 'w') as etagFile: with open(mediaFilename + '.etag', 'w') as etagFile:
etagFile.write(etag) etagFile.write(etag)
except: except BaseException:
pass pass
def attachMedia(baseDir: str,httpPrefix: str,domain: str,port: int, \
postJson: {},imageFilename: str, \ def attachMedia(baseDir: str, httpPrefix: str, domain: str, port: int,
mediaType: str,description: str, \ postJson: {}, imageFilename: str,
mediaType: str, description: str,
useBlurhash: bool) -> {}: useBlurhash: bool) -> {}:
"""Attaches media to a json object post """Attaches media to a json object post
The description can be None The description can be None
Blurhash is optional, since low power systems may take a long time to calculate it Blurhash is optional, since low power systems may take a long
time to calculate it
""" """
if not isMedia(imageFilename): if not isMedia(imageFilename):
return postJson return postJson
fileExtension=None fileExtension = None
acceptedTypes=['png','jpg','gif','webp','mp4','webm','ogv','mp3','ogg'] acceptedTypes = ('png', 'jpg', 'gif', 'webp',
'mp4', 'webm', 'ogv', 'mp3', 'ogg')
for mType in acceptedTypes: for mType in acceptedTypes:
if imageFilename.endswith('.'+mType): if imageFilename.endswith('.' + mType):
if mType=='jpg': if mType == 'jpg':
mType='jpeg' mType = 'jpeg'
if mType=='mp3': if mType == 'mp3':
mType='mpeg' mType = 'mpeg'
fileExtension=mType fileExtension = mType
if not fileExtension: if not fileExtension:
return postJson return postJson
mediaType=mediaType+'/'+fileExtension mediaType = mediaType + '/' + fileExtension
print('Attached media type: '+mediaType) print('Attached media type: ' + mediaType)
if fileExtension=='jpeg': if fileExtension == 'jpeg':
fileExtension='jpg' fileExtension = 'jpg'
if mediaType=='audio/mpeg': if mediaType == 'audio/mpeg':
fileExtension='mp3' fileExtension = 'mp3'
if port: if port:
if port!=80 and port!=443: if port != 80 and port != 443:
if ':' not in domain: if ':' not in domain:
domain=domain+':'+str(port) domain = domain + ':' + str(port)
mPath=getMediaPath() mPath = getMediaPath()
mediaPath=mPath+'/'+createPassword(32)+'.'+fileExtension mediaPath = mPath + '/' + createPassword(32) + '.' + fileExtension
if baseDir: if baseDir:
createMediaDirs(baseDir,mPath) createMediaDirs(baseDir, mPath)
mediaFilename=baseDir+'/'+mediaPath mediaFilename = baseDir + '/' + mediaPath
attachmentJson={ attachmentJson = {
'mediaType': mediaType, 'mediaType': mediaType,
'name': description, 'name': description,
'type': 'Document', 'type': 'Document',
'url': httpPrefix+'://'+domain+'/'+mediaPath 'url': httpPrefix + '://' + domain + '/' + mediaPath
} }
if mediaType.startswith('image/'): if mediaType.startswith('image/'):
attachmentJson['focialPoint']=[0.0, 0.0] attachmentJson['focialPoint'] = [0.0, 0.0]
if useBlurhash: if useBlurhash:
attachmentJson['blurhash']=getImageHash(imageFilename) attachmentJson['blurhash'] = getImageHash(imageFilename)
postJson['attachment']=[attachmentJson] postJson['attachment'] = [attachmentJson]
if baseDir: if baseDir:
if mediaType.startswith('image/'): if mediaType.startswith('image/'):
removeMetaData(imageFilename,mediaFilename) removeMetaData(imageFilename, mediaFilename)
else: else:
copyfile(imageFilename,mediaFilename) copyfile(imageFilename, mediaFilename)
updateEtag(mediaFilename) updateEtag(mediaFilename)
return postJson return postJson
def archiveMedia(baseDir: str,archiveDirectory: str,maxWeeks=4) -> None:
def archiveMedia(baseDir: str, archiveDirectory: str, maxWeeks=4) -> None:
"""Any media older than the given number of weeks gets archived """Any media older than the given number of weeks gets archived
""" """
currTime=datetime.datetime.utcnow() currTime = datetime.datetime.utcnow()
weeksSinceEpoch=int((currTime - datetime.datetime(1970,1,1)).days/7) weeksSinceEpoch = int((currTime - datetime.datetime(1970, 1, 1)).days/7)
minWeek=weeksSinceEpoch-maxWeeks minWeek = weeksSinceEpoch-maxWeeks
if archiveDirectory: if archiveDirectory:
if not os.path.isdir(archiveDirectory): if not os.path.isdir(archiveDirectory):
os.mkdir(archiveDirectory) os.mkdir(archiveDirectory)
if not os.path.isdir(archiveDirectory+'/media'): if not os.path.isdir(archiveDirectory + '/media'):
os.mkdir(archiveDirectory+'/media') os.mkdir(archiveDirectory + '/media')
for subdir, dirs, files in os.walk(baseDir+'/media'): for subdir, dirs, files in os.walk(baseDir + '/media'):
for weekDir in dirs: for weekDir in dirs:
if int(weekDir)<minWeek: if int(weekDir) < minWeek:
if archiveDirectory: if archiveDirectory:
move(os.path.join(baseDir+'/media', weekDir),archiveDirectory+'/media') move(os.path.join(baseDir + '/media', weekDir),
archiveDirectory + '/media')
else: else:
# archive to /dev/null # archive to /dev/null
rmtree(os.path.join(baseDir+'/media', weekDir)) rmtree(os.path.join(baseDir + '/media', weekDir))