Allow dict attachments and convert them to lists

merge-requests/30/head
Bob Mottram 2024-03-29 14:52:14 +00:00
parent de5d70c2a7
commit ccab03cc2a
11 changed files with 98 additions and 63 deletions

View File

@ -337,7 +337,17 @@ def update_inbox_queue(self, nickname: str, message_json: {},
# check that some fields are lists
if debug:
print('INBOX: checking object to and cc fields')
list_fields = ('to', 'cc', 'attachment')
# check attachment
if message_json['object'].get('attachment'):
if not isinstance(message_json['object']['attachment'], list) and \
not isinstance(message_json['object']['attachment'], dict):
print('INBOX: attachment should be a list or dict ' +
str(message_json['object']['attachment']))
http_400(self)
self.server.postreq_busy = False
return 3
# check to and cc fields
list_fields = ('to', 'cc')
for check_field in list_fields:
if not message_json['object'].get(check_field):
continue

View File

@ -16,6 +16,7 @@ import webbrowser
import urllib.parse
from pathlib import Path
from random import randint
from utils import get_post_attachments
from utils import get_url_from_post
from utils import get_actor_languages_list
from utils import get_attributed_to
@ -705,15 +706,12 @@ def _get_image_description(post_json_object: {}) -> str:
"""Returns a image description/s on a post
"""
image_description = ''
if not post_json_object['object'].get('attachment'):
return image_description
attach_list = post_json_object['object']['attachment']
if not isinstance(attach_list, list):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return image_description
# for each attachment
for img in attach_list:
for img in post_attachments:
if not isinstance(img, dict):
continue
if not img.get('name'):
@ -2326,9 +2324,7 @@ def run_desktop_client(base_dir: str, proxy_type: str, http_prefix: str,
post_summary = ''
if post_json_object['object'].get('summary'):
post_summary = post_json_object['object']['summary']
attachment = []
if post_json_object['object'].get('attachment'):
attachment = post_json_object['object']['attachment']
attachment = get_post_attachments(post_json_object)
capabilities = {}
if post_json_object['object'].get('capabilities'):
capabilities = \

View File

@ -18,6 +18,7 @@ from languages import understood_post_language
from like import update_likes_collection
from reaction import update_reaction_collection
from reaction import valid_emoji_content
from utils import get_post_attachments
from utils import lines_in_file
from utils import resembles_url
from utils import get_url_from_post
@ -183,18 +184,17 @@ def cache_svg_images(session, base_dir: str, http_prefix: str,
obj = post_json_object
if not obj.get('id'):
return False
if not obj.get('attachment'):
return False
if not isinstance(obj['attachment'], list):
post_attachments = get_post_attachments(obj)
if not post_attachments:
return False
cached = False
post_id = remove_id_ending(obj['id']).replace('/', '--')
actor = 'unknown'
if obj.get('attributedTo'):
if post_attachments:
actor = get_attributed_to(obj['attributedTo'])
log_filename = base_dir + '/accounts/svg_scripts_log.txt'
for index in range(len(obj['attachment'])):
attach = obj['attachment'][index]
for index in range(len(post_attachments)):
attach = post_attachments[index]
if not attach.get('mediaType'):
continue
if not attach.get('url'):
@ -249,6 +249,9 @@ def cache_svg_images(session, base_dir: str, http_prefix: str,
except OSError:
print('EX: unable to write cleaned up svg ' + url)
if svg_written:
# convert to list if needed
if isinstance(obj['attachment'], dict):
obj['attachment'] = [obj['attachment']]
# change the url to be the local version
obj['attachment'][index]['url'] = \
http_prefix + '://' + domain_full + '/media/' + \

View File

@ -15,6 +15,7 @@ from posts import outbox_message_create_wrap
from posts import save_post_to_box
from posts import send_to_followers_thread
from posts import send_to_named_addresses_thread
from utils import get_post_attachments
from utils import get_attributed_to
from utils import contains_invalid_actor_url_chars
from utils import get_attachment_property_value
@ -392,9 +393,10 @@ def post_message_to_outbox(session, translate: {},
system_language)
# https://www.w3.org/TR/activitypub/#create-activity-outbox
message_json['object']['attributedTo'] = actor_url
if message_json['object'].get('attachment'):
message_attachments = get_post_attachments(message_json['object'])
if message_attachments:
attachment_index = 0
attach = message_json['object']['attachment'][attachment_index]
attach = message_attachments[attachment_index]
if attach.get('mediaType'):
file_extension = 'png'
media_type_str = \
@ -442,6 +444,13 @@ def post_message_to_outbox(session, translate: {},
media_filename = base_dir + '/' + media_path
# move the uploaded image to its new path
os.rename(upload_media_filename, media_filename)
# convert dictionary to list if needed
if isinstance(message_json['object']['attachment'], dict):
message_json['object']['attachment'] = \
[message_json['object']['attachment']]
attach_idx = attachment_index
attach = \
message_json['object']['attachment'][attach_idx]
# change the url of the attachment
attach['url'] = \
http_prefix + '://' + domain_full + '/' + media_path

View File

@ -34,6 +34,7 @@ from webfinger import webfinger_handle
from httpsig import create_signed_header
from siteactive import site_is_active
from languages import understood_post_language
from utils import get_post_attachments
from utils import is_premium_account
from utils import contains_private_key
from utils import get_url_from_post
@ -707,13 +708,14 @@ def _get_posts(session, outbox_url: str, max_posts: int,
continue
in_reply_to = reply_id
if this_item.get('attachment'):
if len(this_item['attachment']) > max_attachments:
post_attachments = get_post_attachments(this_item)
if post_attachments:
if len(post_attachments) > max_attachments:
if debug:
print('max attachments reached')
continue
if this_item['attachment']:
for attach in this_item['attachment']:
if post_attachments:
for attach in post_attachments:
if attach.get('name') and attach.get('url'):
# no attachments from non-permitted domains
url_str = get_url_from_post(attach['url'])
@ -4426,11 +4428,10 @@ def is_image_media(session, base_dir: str, http_prefix: str,
if post_json_object['object']['type'] not in ('Note', 'Page', 'Event',
'ChatMessage', 'Article'):
return False
if not post_json_object['object'].get('attachment'):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return False
if not isinstance(post_json_object['object']['attachment'], list):
return False
for attach in post_json_object['object']['attachment']:
for attach in post_attachments:
if attach.get('mediaType') and attach.get('url'):
if attach['mediaType'].startswith('image/') or \
attach['mediaType'].startswith('audio/') or \

View File

@ -10,6 +10,7 @@ __module_group__ = "Core"
import os
from collections import OrderedDict
from utils import get_post_attachments
from utils import get_content_from_post
from utils import has_object_dict
from utils import remove_id_ending
@ -71,12 +72,11 @@ def get_book_from_post(post_json_object: {}, debug: bool) -> {}:
def _get_book_image_from_post(post_json_object: {}) -> str:
""" Returns a book image from the given post
"""
if 'attachment' not in post_json_object:
return ''
if not isinstance(post_json_object['attachment'], list):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return ''
extensions = get_image_extensions()
for attach_dict in post_json_object['attachment']:
for attach_dict in post_attachments:
if not isinstance(attach_dict, dict):
continue
if 'url' not in attach_dict:

View File

@ -11,6 +11,7 @@ import os
import html
import random
import urllib.parse
from utils import get_post_attachments
from utils import get_cached_post_filename
from utils import remove_id_ending
from utils import is_dm
@ -466,10 +467,10 @@ def _post_to_speaker_json(base_dir: str, http_prefix: str,
say_content = content
image_description = ''
if post_json_object['object'].get('attachment'):
attach_list = post_json_object['object']['attachment']
if isinstance(attach_list, list):
for img in attach_list:
post_attachments = get_post_attachments(post_json_object)
if post_attachments:
if isinstance(post_attachments, list):
for img in post_attachments:
if not isinstance(img, dict):
continue
if img.get('name'):

View File

@ -517,15 +517,11 @@ def get_media_descriptions_from_post(post_json_object: {}) -> str:
"""Returns all attached media descriptions as a single text.
This is used for filtering
"""
this_post_json = post_json_object
if has_object_dict(post_json_object):
this_post_json = post_json_object['object']
if not this_post_json.get('attachment'):
return ''
if not isinstance(this_post_json['attachment'], list):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return ''
descriptions = ''
for attach in this_post_json['attachment']:
for attach in post_attachments:
if not isinstance(attach, dict):
print('WARN: attachment is not a dict ' + str(attach))
continue
@ -2208,11 +2204,12 @@ def _remove_attachment(base_dir: str, http_prefix: str, domain: str,
post_json: {}):
"""Removes media files for an attachment
"""
if not post_json.get('attachment'):
post_attachments = get_post_attachments(post_json)
if not post_attachments:
return
if not post_json['attachment'][0].get('url'):
if not post_attachments[0].get('url'):
return
attachment_url = get_url_from_post(post_json['attachment'][0]['url'])
attachment_url = get_url_from_post(post_attachments[0]['url'])
if not attachment_url:
return
attachment_url = remove_html(attachment_url)
@ -5247,3 +5244,18 @@ def set_premium_account(base_dir: str, nickname: str, domain: str,
fp_premium.write('\n')
except OSError:
print('EX: unable to set premium flag ' + premium_filename)
def get_post_attachments(post_json_object: {}) -> []:
""" Returns the list of attachments for a post
"""
post_obj = post_json_object
if has_object_dict(post_json_object):
post_obj = post_json_object['object']
if not post_obj.get('attachment'):
return []
if isinstance(post_obj['attachment'], list):
return post_obj['attachment']
if isinstance(post_obj['attachment'], dict):
return [post_obj['attachment']]
return []

View File

@ -24,6 +24,7 @@ from posts import post_is_muted
from posts import get_person_box
from posts import download_announce
from posts import populate_replies_json
from utils import get_post_attachments
from utils import get_url_from_post
from utils import date_from_string_format
from utils import remove_markup_tag
@ -194,7 +195,8 @@ def _html_post_metadata_open_graph(domain: str, post_json_object: {},
metadata += \
" <meta content=\"" + obj_json['published'] + \
"\" property=\"og:published_time\" />\n"
if not obj_json.get('attachment') or obj_json.get('sensitive'):
post_attachments = get_post_attachments(obj_json)
if not post_attachments or obj_json.get('sensitive'):
if 'content' in obj_json and not obj_json.get('sensitive'):
obj_content = obj_json['content']
if 'contentMap' in obj_json:
@ -210,7 +212,7 @@ def _html_post_metadata_open_graph(domain: str, post_json_object: {},
return metadata
# metadata for attachment
for attach_json in obj_json['attachment']:
for attach_json in post_attachments:
if not isinstance(attach_json, dict):
continue
if not attach_json.get('mediaType'):
@ -1979,7 +1981,8 @@ def _get_content_license(post_json_object: {}) -> str:
value = license_link_from_name(value)
return value
for item in post_json_object['object']['attachment']:
post_attachments = get_post_attachments(post_json_object)
for item in post_attachments:
if not item.get('name'):
continue
name_lower = item['name'].lower()
@ -2765,9 +2768,7 @@ def individual_post_as_html(signing_priv_key_pem: str,
content_all_str = str(summary_str) + ' ' + content_str
# does an emoji or lack of alt text on an image indicate a
# no boost preference? if so then don't show the repeat/announce icon
attachment = []
if post_json_object['object'].get('attachment'):
attachment = post_json_object['object']['attachment']
attachment = get_post_attachments(post_json_object)
capabilities = {}
if post_json_object['object'].get('capabilities'):
capabilities = post_json_object['object']['capabilities']
@ -2784,7 +2785,8 @@ def individual_post_as_html(signing_priv_key_pem: str,
# html for the buy icon
buy_str = ''
if 'attachment' not in post_json_object['object']:
post_attachments = get_post_attachments(post_json_object['object'])
if not post_attachments:
post_json_object['object']['attachment'] = []
if not is_patch:
buy_links = get_buy_links(post_json_object, translate, buy_sites)

View File

@ -10,6 +10,7 @@ __module_group__ = "Web Interface"
import os
from shutil import copyfile
import urllib.parse
from utils import get_post_attachments
from utils import get_url_from_post
from utils import date_from_string_format
from utils import get_attributed_to
@ -1372,8 +1373,9 @@ def hashtag_search_rss(nickname: str, domain: str, port: int,
' <description>' + description + '</description>'
hashtag_feed += \
' <pubDate>' + rss_date_str + '</pubDate>'
if post_json_object['object'].get('attachment'):
for attach in post_json_object['object']['attachment']:
post_attachments = get_post_attachments(post_json_object)
if post_attachments:
for attach in post_attachments:
if not attach.get('url'):
continue
url_str = get_url_from_post(attach['url'])

View File

@ -12,6 +12,7 @@ from shutil import copyfile
from collections import OrderedDict
from session import get_json
from session import get_json_valid
from utils import get_post_attachments
from utils import image_mime_types_dict
from utils import get_url_from_post
from utils import get_media_url_from_video
@ -1249,17 +1250,16 @@ def get_post_attachments_as_html(base_dir: str,
'type': 'Document',
'url': media_url
}]
if not post_json_object['object'].get('attachment'):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
post_json_object['object']['attachment'] = \
attachment_dict
if not post_json_object['object'].get('attachment'):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return attachment_str, gallery_str
if not isinstance(post_json_object['object']['attachment'], list):
return attachment_str, gallery_str
attachment_dict += post_json_object['object']['attachment']
attachment_dict += post_attachments
media_style_added = False
post_id = None
@ -2297,9 +2297,8 @@ def html_following_dropdown(base_dir: str, nickname: str,
def get_buy_links(post_json_object: str, translate: {}, buy_sites: {}) -> {}:
"""Returns any links to buy something from an external site
"""
if not post_json_object['object'].get('attachment'):
return {}
if not isinstance(post_json_object['object']['attachment'], list):
post_attachments = get_post_attachments(post_json_object)
if not post_attachments:
return {}
links = {}
buy_strings = []
@ -2308,7 +2307,7 @@ def get_buy_links(post_json_object: str, translate: {}, buy_sites: {}) -> {}:
buy_str = translate[buy_str]
buy_strings += buy_str.lower()
buy_strings += ('Paypal', 'Stripe', 'Cashapp', 'Venmo')
for item in post_json_object['object']['attachment']:
for item in post_attachments:
if not isinstance(item, dict):
continue
if not item.get('name'):