Snake case

merge-requests/26/head
Bob Mottram 2022-01-03 10:27:55 +00:00
parent 551f24706b
commit d778105091
9 changed files with 259 additions and 250 deletions

View File

@ -43,7 +43,7 @@ def set_matrix_address(actor_json: {}, matrix_address: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -51,10 +51,10 @@ def set_matrix_address(actor_json: {}, matrix_address: str) -> None:
continue
if not property_value['name'].lower().startswith('matrix'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
if '@' not in matrix_address:
return
@ -81,9 +81,9 @@ def set_matrix_address(actor_json: {}, matrix_address: str) -> None:
property_value['value'] = matrix_address
return
newMatrixAddress = {
new_matrix_address = {
"name": "Matrix",
"type": "PropertyValue",
"value": matrix_address
}
actor_json['attachment'].append(newMatrixAddress)
actor_json['attachment'].append(new_matrix_address)

281
media.py
View File

@ -53,107 +53,107 @@ def _get_blur_hash() -> str:
def _replace_silo_domain(post_json_object: {},
siloDomain: str, replacementDomain: str,
silo_domain: str, replacement_domain: str,
system_language: str) -> None:
"""Replace a silo domain with a replacement domain
"""
if not replacementDomain:
if not replacement_domain:
return
if not has_object_dict(post_json_object):
return
if not post_json_object['object'].get('content'):
return
contentStr = get_base_content_from_post(post_json_object, system_language)
if siloDomain not in contentStr:
content_str = get_base_content_from_post(post_json_object, system_language)
if silo_domain not in content_str:
return
contentStr = contentStr.replace(siloDomain, replacementDomain)
post_json_object['object']['content'] = contentStr
content_str = content_str.replace(silo_domain, replacement_domain)
post_json_object['object']['content'] = content_str
if post_json_object['object'].get('contentMap'):
post_json_object['object']['contentMap'][system_language] = contentStr
post_json_object['object']['contentMap'][system_language] = content_str
def replace_you_tube(post_json_object: {}, replacementDomain: str,
def replace_you_tube(post_json_object: {}, replacement_domain: str,
system_language: str) -> None:
"""Replace YouTube with a replacement domain
This denies Google some, but not all, tracking data
"""
_replace_silo_domain(post_json_object, 'www.youtube.com',
replacementDomain, system_language)
replacement_domain, system_language)
def replace_twitter(post_json_object: {}, replacementDomain: str,
def replace_twitter(post_json_object: {}, replacement_domain: str,
system_language: str) -> None:
"""Replace Twitter with a replacement domain
This allows you to view twitter posts without having a twitter account
"""
_replace_silo_domain(post_json_object, 'twitter.com',
replacementDomain, system_language)
replacement_domain, system_language)
def _remove_meta_data(image_filename: str, outputFilename: str) -> None:
def _remove_meta_data(image_filename: str, output_filename: str) -> None:
"""Attempts to do this with pure python didn't work well,
so better to use a dedicated tool if one is installed
"""
copyfile(image_filename, outputFilename)
if not os.path.isfile(outputFilename):
copyfile(image_filename, output_filename)
if not os.path.isfile(output_filename):
print('ERROR: unable to remove metadata from ' + image_filename)
return
if os.path.isfile('/usr/bin/exiftool'):
print('Removing metadata from ' + outputFilename + ' using exiftool')
os.system('exiftool -all= ' + outputFilename) # nosec
print('Removing metadata from ' + output_filename + ' using exiftool')
os.system('exiftool -all= ' + output_filename) # nosec
elif os.path.isfile('/usr/bin/mogrify'):
print('Removing metadata from ' + outputFilename + ' using mogrify')
os.system('/usr/bin/mogrify -strip ' + outputFilename) # nosec
print('Removing metadata from ' + output_filename + ' using mogrify')
os.system('/usr/bin/mogrify -strip ' + output_filename) # nosec
def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
outputFilename: str, spoofCity: str,
output_filename: str, spoof_city: str,
content_license_url: str) -> None:
"""Spoof image metadata using a decoy model for a given city
"""
if not os.path.isfile(outputFilename):
print('ERROR: unable to spoof metadata within ' + outputFilename)
if not os.path.isfile(output_filename):
print('ERROR: unable to spoof metadata within ' + output_filename)
return
# get the random seed used to generate a unique pattern for this account
decoySeedFilename = acct_dir(base_dir, nickname, domain) + '/decoyseed'
decoySeed = 63725
if os.path.isfile(decoySeedFilename):
with open(decoySeedFilename, 'r') as fp:
decoySeed = int(fp.read())
decoy_seed_filename = acct_dir(base_dir, nickname, domain) + '/decoyseed'
decoy_seed = 63725
if os.path.isfile(decoy_seed_filename):
with open(decoy_seed_filename, 'r') as fp_seed:
decoy_seed = int(fp_seed.read())
else:
decoySeed = randint(10000, 10000000000000000)
decoy_seed = randint(10000, 10000000000000000)
try:
with open(decoySeedFilename, 'w+') as fp:
fp.write(str(decoySeed))
with open(decoy_seed_filename, 'w+') as fp_seed:
fp_seed.write(str(decoy_seed))
except OSError:
print('EX: unable to write ' + decoySeedFilename)
print('EX: unable to write ' + decoy_seed_filename)
if os.path.isfile('/usr/bin/exiftool'):
print('Spoofing metadata in ' + outputFilename + ' using exiftool')
curr_timeAdjusted = \
print('Spoofing metadata in ' + output_filename + ' using exiftool')
curr_time_adjusted = \
datetime.datetime.utcnow() - \
datetime.timedelta(minutes=randint(2, 120))
published = curr_timeAdjusted.strftime("%Y:%m:%d %H:%M:%S+00:00")
(latitude, longitude, latitudeRef, longitudeRef,
camMake, camModel, camSerialNumber) = \
spoof_geolocation(base_dir, spoofCity, curr_timeAdjusted,
decoySeed, None, None)
published = curr_time_adjusted.strftime("%Y:%m:%d %H:%M:%S+00:00")
(latitude, longitude, latitude_ref, longitude_ref,
cam_make, cam_model, cam_serial_number) = \
spoof_geolocation(base_dir, spoof_city, curr_time_adjusted,
decoy_seed, None, None)
if os.system('exiftool -artist=@"' + nickname + '@' + domain + '" ' +
'-Make="' + camMake + '" ' +
'-Model="' + camModel + '" ' +
'-Comment="' + str(camSerialNumber) + '" ' +
'-Make="' + cam_make + '" ' +
'-Model="' + cam_model + '" ' +
'-Comment="' + str(cam_serial_number) + '" ' +
'-DateTimeOriginal="' + published + '" ' +
'-FileModifyDate="' + published + '" ' +
'-CreateDate="' + published + '" ' +
'-GPSLongitudeRef=' + longitudeRef + ' ' +
'-GPSLongitudeRef=' + longitude_ref + ' ' +
'-GPSAltitude=0 ' +
'-GPSLongitude=' + str(longitude) + ' ' +
'-GPSLatitudeRef=' + latitudeRef + ' ' +
'-GPSLatitudeRef=' + latitude_ref + ' ' +
'-GPSLatitude=' + str(latitude) + ' ' +
'-copyright="' + content_license_url + '" ' +
'-Comment="" ' +
outputFilename) != 0: # nosec
output_filename) != 0: # nosec
print('ERROR: exiftool failed to run')
else:
print('ERROR: exiftool is not installed')
@ -163,54 +163,54 @@ def _spoof_meta_data(base_dir: str, nickname: str, domain: str,
def convert_image_to_low_bandwidth(image_filename: str) -> None:
"""Converts an image to a low bandwidth version
"""
low_bandwidthFilename = image_filename + '.low'
if os.path.isfile(low_bandwidthFilename):
low_bandwidth_filename = image_filename + '.low'
if os.path.isfile(low_bandwidth_filename):
try:
os.remove(low_bandwidthFilename)
os.remove(low_bandwidth_filename)
except OSError:
print('EX: convert_image_to_low_bandwidth unable to delete ' +
low_bandwidthFilename)
low_bandwidth_filename)
cmd = \
'/usr/bin/convert +noise Multiplicative ' + \
'-evaluate median 10% -dither Floyd-Steinberg ' + \
'-monochrome ' + image_filename + ' ' + low_bandwidthFilename
'-monochrome ' + image_filename + ' ' + low_bandwidth_filename
print('Low bandwidth image conversion: ' + cmd)
subprocess.call(cmd, shell=True)
# wait for conversion to happen
ctr = 0
while not os.path.isfile(low_bandwidthFilename):
while not os.path.isfile(low_bandwidth_filename):
print('Waiting for low bandwidth image conversion ' + str(ctr))
time.sleep(0.2)
ctr += 1
if ctr > 100:
print('WARN: timed out waiting for low bandwidth image conversion')
break
if os.path.isfile(low_bandwidthFilename):
if os.path.isfile(low_bandwidth_filename):
try:
os.remove(image_filename)
except OSError:
print('EX: convert_image_to_low_bandwidth unable to delete ' +
image_filename)
os.rename(low_bandwidthFilename, image_filename)
os.rename(low_bandwidth_filename, image_filename)
if os.path.isfile(image_filename):
print('Image converted to low bandwidth ' + image_filename)
else:
print('Low bandwidth converted image not found: ' +
low_bandwidthFilename)
low_bandwidth_filename)
def process_meta_data(base_dir: str, nickname: str, domain: str,
image_filename: str, outputFilename: str,
image_filename: str, output_filename: str,
city: str, content_license_url: str) -> None:
"""Handles image metadata. This tries to spoof the metadata
if possible, but otherwise just removes it
"""
# first remove the metadata
_remove_meta_data(image_filename, outputFilename)
_remove_meta_data(image_filename, output_filename)
# now add some spoofed data to misdirect surveillance capitalists
_spoof_meta_data(base_dir, nickname, domain, outputFilename, city,
_spoof_meta_data(base_dir, nickname, domain, output_filename, city,
content_license_url)
@ -220,65 +220,70 @@ def _is_media(image_filename: str) -> bool:
if not os.path.isfile(image_filename):
print('WARN: Media file does not exist ' + image_filename)
return False
permittedMedia = get_media_extensions()
for m in permittedMedia:
if image_filename.endswith('.' + m):
permitted_media = get_media_extensions()
for permit in permitted_media:
if image_filename.endswith('.' + permit):
return True
print('WARN: ' + image_filename + ' is not a permitted media type')
return False
def create_media_dirs(base_dir: str, mediaPath: str) -> None:
def create_media_dirs(base_dir: str, media_path: str) -> None:
"""Creates stored media directories
"""
if not os.path.isdir(base_dir + '/media'):
os.mkdir(base_dir + '/media')
if not os.path.isdir(base_dir + '/' + mediaPath):
os.mkdir(base_dir + '/' + mediaPath)
if not os.path.isdir(base_dir + '/' + media_path):
os.mkdir(base_dir + '/' + media_path)
def get_media_path() -> str:
"""Returns the path for stored media
"""
curr_time = datetime.datetime.utcnow()
weeksSinceEpoch = int((curr_time - datetime.datetime(1970, 1, 1)).days / 7)
return 'media/' + str(weeksSinceEpoch)
weeks_since_epoch = \
int((curr_time - datetime.datetime(1970, 1, 1)).days / 7)
return 'media/' + str(weeks_since_epoch)
def get_attachment_media_type(filename: str) -> str:
"""Returns the type of media for the given file
image, video or audio
"""
mediaType = None
imageTypes = get_image_extensions()
for mType in imageTypes:
if filename.endswith('.' + mType):
media_type = None
image_types = get_image_extensions()
for mtype in image_types:
if filename.endswith('.' + mtype):
return 'image'
videoTypes = get_video_extensions()
for mType in videoTypes:
if filename.endswith('.' + mType):
video_types = get_video_extensions()
for mtype in video_types:
if filename.endswith('.' + mtype):
return 'video'
audioTypes = get_audio_extensions()
for mType in audioTypes:
if filename.endswith('.' + mType):
audio_types = get_audio_extensions()
for mtype in audio_types:
if filename.endswith('.' + mtype):
return 'audio'
return mediaType
return media_type
def _update_etag(mediaFilename: str) -> None:
def _update_etag(media_filename: str) -> None:
""" calculate the etag, which is a sha1 of the data
"""
# only create etags for media
if '/media/' not in mediaFilename:
if '/media/' not in media_filename:
return
# check that the media exists
if not os.path.isfile(mediaFilename):
if not os.path.isfile(media_filename):
return
# read the binary data
data = None
try:
with open(mediaFilename, 'rb') as mediaFile:
data = mediaFile.read()
with open(media_filename, 'rb') as media_file:
data = media_file.read()
except OSError:
print('EX: _update_etag unable to read ' + str(mediaFilename))
print('EX: _update_etag unable to read ' + str(media_filename))
if not data:
return
@ -286,17 +291,17 @@ def _update_etag(mediaFilename: str) -> None:
etag = sha1(data).hexdigest() # nosec
# save the hash
try:
with open(mediaFilename + '.etag', 'w+') as etagFile:
etagFile.write(etag)
with open(media_filename + '.etag', 'w+') as efile:
efile.write(etag)
except OSError:
print('EX: _update_etag unable to write ' +
str(mediaFilename) + '.etag')
str(media_filename) + '.etag')
def attach_media(base_dir: str, http_prefix: str,
nickname: str, domain: str, port: int,
post_json: {}, image_filename: str,
mediaType: str, description: str,
media_type: str, description: str,
city: str, low_bandwidth: bool,
content_license_url: str) -> {}:
"""Attaches media to a json object post
@ -305,76 +310,76 @@ def attach_media(base_dir: str, http_prefix: str,
if not _is_media(image_filename):
return post_json
fileExtension = None
acceptedTypes = get_media_extensions()
for mType in acceptedTypes:
if image_filename.endswith('.' + mType):
if mType == 'jpg':
mType = 'jpeg'
if mType == 'mp3':
mType = 'mpeg'
fileExtension = mType
if not fileExtension:
file_extension = None
accepted_types = get_media_extensions()
for mtype in accepted_types:
if image_filename.endswith('.' + mtype):
if mtype == 'jpg':
mtype = 'jpeg'
if mtype == 'mp3':
mtype = 'mpeg'
file_extension = mtype
if not file_extension:
return post_json
mediaType = mediaType + '/' + fileExtension
print('Attached media type: ' + mediaType)
media_type = media_type + '/' + file_extension
print('Attached media type: ' + media_type)
if fileExtension == 'jpeg':
fileExtension = 'jpg'
if mediaType == 'audio/mpeg':
fileExtension = 'mp3'
if file_extension == 'jpeg':
file_extension = 'jpg'
if media_type == 'audio/mpeg':
file_extension = 'mp3'
domain = get_full_domain(domain, port)
mPath = get_media_path()
mediaPath = mPath + '/' + create_password(32) + '.' + fileExtension
mpath = get_media_path()
media_path = mpath + '/' + create_password(32) + '.' + file_extension
if base_dir:
create_media_dirs(base_dir, mPath)
mediaFilename = base_dir + '/' + mediaPath
create_media_dirs(base_dir, mpath)
media_filename = base_dir + '/' + media_path
mediaPath = \
mediaPath.replace('media/', 'system/media_attachments/files/', 1)
attachmentJson = {
'mediaType': mediaType,
media_path = \
media_path.replace('media/', 'system/media_attachments/files/', 1)
attachment_json = {
'mediaType': media_type,
'name': description,
'type': 'Document',
'url': http_prefix + '://' + domain + '/' + mediaPath
'url': http_prefix + '://' + domain + '/' + media_path
}
if mediaType.startswith('image/'):
attachmentJson['blurhash'] = _get_blur_hash()
if media_type.startswith('image/'):
attachment_json['blurhash'] = _get_blur_hash()
# find the dimensions of the image and add them as metadata
attachImageWidth, attachImageHeight = \
attach_image_width, attach_image_height = \
get_image_dimensions(image_filename)
if attachImageWidth and attachImageHeight:
attachmentJson['width'] = attachImageWidth
attachmentJson['height'] = attachImageHeight
if attach_image_width and attach_image_height:
attachment_json['width'] = attach_image_width
attachment_json['height'] = attach_image_height
post_json['attachment'] = [attachmentJson]
post_json['attachment'] = [attachment_json]
if base_dir:
if mediaType.startswith('image/'):
if media_type.startswith('image/'):
if low_bandwidth:
convert_image_to_low_bandwidth(image_filename)
process_meta_data(base_dir, nickname, domain,
image_filename, mediaFilename, city,
image_filename, media_filename, city,
content_license_url)
else:
copyfile(image_filename, mediaFilename)
_update_etag(mediaFilename)
copyfile(image_filename, media_filename)
_update_etag(media_filename)
return post_json
def archive_media(base_dir: str, archive_directory: str,
maxWeeks: int) -> None:
max_weeks: int) -> None:
"""Any media older than the given number of weeks gets archived
"""
if maxWeeks == 0:
if max_weeks == 0:
return
curr_time = datetime.datetime.utcnow()
weeksSinceEpoch = int((curr_time - datetime.datetime(1970, 1, 1)).days/7)
minWeek = weeksSinceEpoch - maxWeeks
weeks_since_epoch = int((curr_time - datetime.datetime(1970, 1, 1)).days/7)
min_week = weeks_since_epoch - max_weeks
if archive_directory:
if not os.path.isdir(archive_directory):
@ -382,20 +387,22 @@ def archive_media(base_dir: str, archive_directory: str,
if not os.path.isdir(archive_directory + '/media'):
os.mkdir(archive_directory + '/media')
for subdir, dirs, files in os.walk(base_dir + '/media'):
for weekDir in dirs:
if int(weekDir) < minWeek:
for _, dirs, _ in os.walk(base_dir + '/media'):
for week_dir in dirs:
if int(week_dir) < min_week:
if archive_directory:
move(os.path.join(base_dir + '/media', weekDir),
move(os.path.join(base_dir + '/media', week_dir),
archive_directory + '/media')
else:
# archive to /dev/null
rmtree(os.path.join(base_dir + '/media', weekDir),
rmtree(os.path.join(base_dir + '/media', week_dir),
ignore_errors=False, onerror=None)
break
def path_is_video(path: str) -> bool:
"""Is the given path a video file?
"""
if path.endswith('.ogv') or \
path.endswith('.mp4'):
return True
@ -403,6 +410,8 @@ def path_is_video(path: str) -> bool:
def path_is_audio(path: str) -> bool:
"""Is the given path an audio file?
"""
if path.endswith('.ogg') or \
path.endswith('.mp3'):
return True
@ -420,13 +429,13 @@ def get_image_dimensions(image_filename: str) -> (int, int):
return None, None
if not result:
return None, None
dimensionsStr = result.stdout.decode('utf-8').replace('"', '')
if 'x' not in dimensionsStr:
dimensions_str = result.stdout.decode('utf-8').replace('"', '')
if 'x' not in dimensions_str:
return None, None
widthStr = dimensionsStr.split('x')[0]
if not widthStr.isdigit():
width_str = dimensions_str.split('x')[0]
if not width_str.isdigit():
return None, None
heightStr = dimensionsStr.split('x')[1]
if not heightStr.isdigit():
height_str = dimensions_str.split('x')[1]
if not height_str.isdigit():
return None, None
return int(widthStr), int(heightStr)
return int(width_str), int(height_str)

View File

@ -17,23 +17,23 @@ from utils import no_of_active_accounts_monthly
def _get_status_count(base_dir: str) -> int:
"""Get the total number of posts
"""
statusCtr = 0
accountsDir = base_dir + '/accounts'
for subdir, dirs, files in os.walk(accountsDir):
status_ctr = 0
accounts_dir = base_dir + '/accounts'
for _, dirs, _ in os.walk(accounts_dir):
for acct in dirs:
if not is_account_dir(acct):
continue
acct_dir = os.path.join(accountsDir, acct + '/outbox')
for subdir2, dirs2, files2 in os.walk(acct_dir):
statusCtr += len(files2)
acct_dir = os.path.join(accounts_dir, acct + '/outbox')
for _, _, files2 in os.walk(acct_dir):
status_ctr += len(files2)
break
break
return statusCtr
return status_ctr
def meta_data_node_info(base_dir: str,
aboutUrl: str,
termsOfServiceUrl: str,
about_url: str,
terms_of_service_url: str,
registration: bool, version: str,
showAccounts: bool) -> {}:
""" /nodeinfo/2.0 endpoint
@ -47,15 +47,15 @@ def meta_data_node_info(base_dir: str,
sensitive
"""
if showAccounts:
activeAccounts = no_of_accounts(base_dir)
activeAccountsMonthly = no_of_active_accounts_monthly(base_dir, 1)
activeAccountsHalfYear = no_of_active_accounts_monthly(base_dir, 6)
localPosts = _get_status_count(base_dir)
active_accounts = no_of_accounts(base_dir)
active_accounts_monthly = no_of_active_accounts_monthly(base_dir, 1)
active_accounts_half_year = no_of_active_accounts_monthly(base_dir, 6)
local_posts = _get_status_count(base_dir)
else:
activeAccounts = 1
activeAccountsMonthly = 1
activeAccountsHalfYear = 1
localPosts = 1
active_accounts = 1
active_accounts_monthly = 1
active_accounts_half_year = 1
local_posts = 1
nodeinfo = {
'openRegistrations': registration,
@ -65,15 +65,15 @@ def meta_data_node_info(base_dir: str,
'version': version
},
'documents': {
'about': aboutUrl,
'terms': termsOfServiceUrl
'about': about_url,
'terms': terms_of_service_url
},
'usage': {
'localPosts': localPosts,
'localPosts': local_posts,
'users': {
'activeHalfyear': activeAccountsHalfYear,
'activeMonth': activeAccountsMonthly,
'total': activeAccounts
'activeHalfyear': active_accounts_half_year,
'activeMonth': active_accounts_monthly,
'total': active_accounts
}
},
'version': '2.0'
@ -82,100 +82,100 @@ def meta_data_node_info(base_dir: str,
def meta_data_instance(showAccounts: bool,
instanceTitle: str,
instanceDescriptionShort: str,
instanceDescription: str,
instance_title: str,
instance_description_short: str,
instance_description: str,
http_prefix: str, base_dir: str,
admin_nickname: str, domain: str, domain_full: str,
registration: bool, system_language: str,
version: str) -> {}:
""" /api/v1/instance endpoint
"""
adminActorFilename = \
admin_actor_filename = \
base_dir + '/accounts/' + admin_nickname + '@' + domain + '.json'
if not os.path.isfile(adminActorFilename):
if not os.path.isfile(admin_actor_filename):
return {}
adminActor = load_json(adminActorFilename, 0)
if not adminActor:
admin_actor = load_json(admin_actor_filename, 0)
if not admin_actor:
print('WARN: json load exception meta_data_instance')
return {}
rulesList = []
rulesFilename = \
rules_list = []
rules_filename = \
base_dir + '/accounts/tos.md'
if os.path.isfile(rulesFilename):
with open(rulesFilename, 'r') as fp:
rulesLines = fp.readlines()
ruleCtr = 1
for line in rulesLines:
if os.path.isfile(rules_filename):
with open(rules_filename, 'r') as fp_rules:
rules_lines = fp_rules.readlines()
rule_ctr = 1
for line in rules_lines:
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
rulesList.append({
'id': str(ruleCtr),
rules_list.append({
'id': str(rule_ctr),
'text': line
})
ruleCtr += 1
rule_ctr += 1
isBot = False
isGroup = False
if adminActor['type'] == 'Group':
isGroup = True
elif adminActor['type'] != 'Person':
isBot = True
is_bot = False
is_group = False
if admin_actor['type'] == 'Group':
is_group = True
elif admin_actor['type'] != 'Person':
is_bot = True
url = \
http_prefix + '://' + domain_full + '/@' + \
adminActor['preferredUsername']
admin_actor['preferredUsername']
if showAccounts:
activeAccounts = no_of_accounts(base_dir)
localPosts = _get_status_count(base_dir)
active_accounts = no_of_accounts(base_dir)
local_posts = _get_status_count(base_dir)
else:
activeAccounts = 1
localPosts = 1
active_accounts = 1
local_posts = 1
createdAt = ''
if adminActor.get('published'):
createdAt = adminActor['published']
created_at = ''
if admin_actor.get('published'):
created_at = admin_actor['published']
instance = {
'approval_required': False,
'invites_enabled': False,
'registrations': registration,
'contact_account': {
'acct': adminActor['preferredUsername'],
'created_at': createdAt,
'avatar': adminActor['icon']['url'],
'avatar_static': adminActor['icon']['url'],
'header': adminActor['image']['url'],
'header_static': adminActor['image']['url'],
'bot': isBot,
'acct': admin_actor['preferredUsername'],
'created_at': created_at,
'avatar': admin_actor['icon']['url'],
'avatar_static': admin_actor['icon']['url'],
'header': admin_actor['image']['url'],
'header_static': admin_actor['image']['url'],
'bot': is_bot,
'discoverable': True,
'group': isGroup,
'display_name': adminActor['name'],
'locked': adminActor['manuallyApprovesFollowers'],
'group': is_group,
'display_name': admin_actor['name'],
'locked': admin_actor['manuallyApprovesFollowers'],
'note': '<p>Admin of ' + domain + '</p>',
'url': url,
'username': adminActor['preferredUsername']
'username': admin_actor['preferredUsername']
},
'description': instanceDescription,
'description': instance_description,
'languages': [system_language],
'short_description': instanceDescriptionShort,
'short_description': instance_description_short,
'stats': {
'domain_count': 2,
'status_count': localPosts,
'user_count': activeAccounts
'status_count': local_posts,
'user_count': active_accounts
},
'thumbnail': http_prefix + '://' + domain_full + '/login.png',
'title': instanceTitle,
'title': instance_title,
'uri': domain_full,
'urls': {},
'version': version,
'rules': rulesList,
'rules': rules_list,
'configuration': {
'statuses': {
'max_media_attachments': 1
@ -214,18 +214,18 @@ def metadata_custom_emoji(base_dir: str,
See https://docs.joinmastodon.org/methods/instance/custom_emojis
"""
result = []
emojisUrl = http_prefix + '://' + domain_full + '/emoji'
for subdir, dirs, files in os.walk(base_dir + '/emoji'):
for f in files:
if len(f) < 3:
emojis_url = http_prefix + '://' + domain_full + '/emoji'
for _, _, files in os.walk(base_dir + '/emoji'):
for fname in files:
if len(fname) < 3:
continue
if f[0].isdigit() or f[1].isdigit():
if fname[0].isdigit() or fname[1].isdigit():
continue
if not f.endswith('.png'):
if not fname.endswith('.png'):
continue
url = os.path.join(emojisUrl, f)
url = os.path.join(emojis_url, fname)
result.append({
"shortcode": f.replace('.png', ''),
"shortcode": fname.replace('.png', ''),
"url": url,
"static_url": url,
"visible_in_picker": True

20
pgp.py
View File

@ -108,7 +108,7 @@ def set_email_address(actor_json: {}, email_address: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -116,10 +116,10 @@ def set_email_address(actor_json: {}, email_address: str) -> None:
continue
if not property_value['name'].lower().startswith('email'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
if notEmailAddress:
return
@ -159,7 +159,7 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -167,9 +167,9 @@ def set_pgp_pub_key(actor_json: {}, pgp_pub_key: str) -> None:
continue
if not property_value['name'].lower().startswith('pgp'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
if property_found:
actor_json['attachment'].remove(property_value)
if removeKey:
return
@ -208,7 +208,7 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -216,9 +216,9 @@ def set_pgp_fingerprint(actor_json: {}, fingerprint: str) -> None:
continue
if not property_value['name'].lower().startswith('openpgp'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
if property_found:
actor_json['attachment'].remove(property_value)
if removeFingerprint:
return

8
ssb.py
View File

@ -60,7 +60,7 @@ def set_ssb_address(actor_json: {}, ssb_address: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -68,10 +68,10 @@ def set_ssb_address(actor_json: {}, ssb_address: str) -> None:
continue
if not property_value['name'].lower().startswith('ssb'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
if notSSBAddress:
return

View File

@ -5432,13 +5432,13 @@ def test_update_actor(base_dir: str):
if len(actor_json['attachment']) == 0:
print("actor_json['attachment'] has no contents")
assert len(actor_json['attachment']) > 0
propertyFound = False
property_found = False
for property_value in actor_json['attachment']:
if property_value['name'] == 'PGP':
print('PGP property set within attachment')
assert pubKey in property_value['value']
propertyFound = True
assert propertyFound
property_found = True
assert property_found
# stop the server
thrAlice.kill()

8
tox.py
View File

@ -65,7 +65,7 @@ def set_tox_address(actor_json: {}, tox_address: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -73,10 +73,10 @@ def set_tox_address(actor_json: {}, tox_address: str) -> None:
continue
if not property_value['name'].lower().startswith('tox'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
if notToxAddress:
return

View File

@ -181,7 +181,7 @@ def _set_actor_property_url(actor_json: {},
property_nameLower = property_name.lower()
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -189,10 +189,10 @@ def _set_actor_property_url(actor_json: {},
continue
if not property_value['name'].lower().startswith(property_nameLower):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
prefixes = get_protocol_prefixes()
prefixFound = False

View File

@ -51,7 +51,7 @@ def set_xmpp_address(actor_json: {}, xmpp_address: str) -> None:
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -60,10 +60,10 @@ def set_xmpp_address(actor_json: {}, xmpp_address: str) -> None:
if not (property_value['name'].lower().startswith('xmpp') or
property_value['name'].lower().startswith('jabber')):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if property_found:
actor_json['attachment'].remove(property_found)
if notXmppAddress:
return