From cccad2fbdaf5271e7016ac2bb1d81339667b5f11 Mon Sep 17 00:00:00 2001 From: Bob Mottram Date: Mon, 2 Oct 2023 21:29:30 +0100 Subject: [PATCH] Limited support for mastodon api v2 --- daemon.py | 136 ++++++++++++++++++++++ mastoapiv1.py | 39 +++---- mastoapiv2.py | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 12 ++ 4 files changed, 469 insertions(+), 25 deletions(-) create mode 100644 mastoapiv2.py diff --git a/daemon.py b/daemon.py index c2ce4a285..60b6ac1cd 100644 --- a/daemon.py +++ b/daemon.py @@ -33,6 +33,7 @@ from webfinger import webfinger_lookup from webfinger import wellknown_protocol_handler from webfinger import webfinger_update from mastoapiv1 import masto_api_v1_response +from mastoapiv2 import masto_api_v2_response from metadata import meta_data_node_info from metadata import metadata_custom_emoji from enigma import get_enigma_pub_key @@ -1476,6 +1477,132 @@ class PubServer(BaseHTTPRequestHandler): self.server.masto_api_is_active = False return True + def _masto_api_v2(self, path: str, calling_domain: str, + ua_str: str, + authorized: bool, + http_prefix: str, + base_dir: str, nickname: str, domain: str, + domain_full: str, + onion_domain: str, i2p_domain: str, + translate: {}, + registration: bool, + system_language: str, + project_version: str, + custom_emoji: [], + show_node_info_accounts: bool, + referer_domain: str, + debug: bool, + calling_site_timeout: int, + known_crawlers: {}, + sites_unavailable: []) -> bool: + """This is a vestigil mastodon v2 API for the purpose + of returning an empty result to sites like + https://mastopeek.app-dist.eu + """ + if not path.startswith('/api/v2/'): + return False + + if not referer_domain: + if not (debug and self.server.unit_test): + print('mastodon api v2 request has no referer domain ' + + str(ua_str)) + self._400() + return True + if referer_domain == domain_full: + print('mastodon api v2 request from self') + self._400() + return True + if self.server.masto_api_is_active: + print('mastodon api v2 is busy during request from ' + + referer_domain) + self._503() + return True + self.server.masto_api_is_active = True + # is this a real website making the call ? + if not debug and not self.server.unit_test and referer_domain: + # Does calling_domain look like a domain? + if ' ' in referer_domain or \ + ';' in referer_domain or \ + '.' not in referer_domain: + print('mastodon api v2 ' + + 'referer does not look like a domain ' + + referer_domain) + self._400() + self.server.masto_api_is_active = False + return True + if not self.server.allow_local_network_access: + if local_network_host(referer_domain): + print('mastodon api v2 referer domain is from the ' + + 'local network ' + referer_domain) + self._400() + self.server.masto_api_is_active = False + return True + if not referer_is_active(http_prefix, + referer_domain, ua_str, + calling_site_timeout, + sites_unavailable): + print('mastodon api v2 referer url is not active ' + + referer_domain) + self._400() + self.server.masto_api_is_active = False + return True + + print('mastodon api v2: ' + path) + print('mastodon api v2: authorized ' + str(authorized)) + print('mastodon api v2: nickname ' + str(nickname)) + print('mastodon api v2: referer ' + str(referer_domain)) + crawl_time = \ + update_known_crawlers(ua_str, base_dir, + known_crawlers, + self.server.last_known_crawler) + if crawl_time is not None: + self.server.last_known_crawler = crawl_time + + broch_mode = broch_mode_is_active(base_dir) + send_json, send_json_str = \ + masto_api_v2_response(path, + calling_domain, + ua_str, + authorized, + http_prefix, + base_dir, + nickname, domain, + domain_full, + onion_domain, + i2p_domain, + translate, + registration, + system_language, + project_version, + custom_emoji, + show_node_info_accounts, + broch_mode) + + if send_json is not None: + msg_str = json.dumps(send_json) + msg_str = self._convert_domains(calling_domain, referer_domain, + msg_str) + msg = msg_str.encode('utf-8') + msglen = len(msg) + if self._has_accept(calling_domain): + protocol_str = \ + get_json_content_from_accept(self.headers.get('Accept')) + self._set_headers(protocol_str, msglen, + None, calling_domain, True) + else: + self._set_headers('application/ld+json', msglen, + None, calling_domain, True) + self._write(msg) + if send_json_str: + print(send_json_str) + self.server.masto_api_is_active = False + return True + + # no api v2 endpoints were matched + self._404() + self.server.masto_api_is_active = False + return True + def _masto_api(self, path: str, calling_domain: str, ua_str: str, authorized: bool, http_prefix: str, @@ -1491,6 +1618,15 @@ class PubServer(BaseHTTPRequestHandler): referer_domain: str, debug: bool, known_crawlers: {}, sites_unavailable: []) -> bool: + if self._masto_api_v2(path, calling_domain, ua_str, authorized, + http_prefix, base_dir, nickname, domain, + domain_full, onion_domain, i2p_domain, + translate, registration, system_language, + project_version, custom_emoji, + show_node_info_accounts, + referer_domain, debug, 5, + known_crawlers, sites_unavailable): + return True return self._masto_api_v1(path, calling_domain, ua_str, authorized, http_prefix, base_dir, nickname, domain, domain_full, onion_domain, i2p_domain, diff --git a/mastoapiv1.py b/mastoapiv1.py index 89c8555c8..e72c31e42 100644 --- a/mastoapiv1.py +++ b/mastoapiv1.py @@ -15,6 +15,7 @@ from utils import remove_html from utils import get_attachment_property_value from utils import no_of_accounts from utils import get_status_count +from utils import lines_in_file def _meta_data_instance_v1(show_accounts: bool, @@ -191,18 +192,6 @@ def get_nickname_from_masto_api_v1id(masto_id: int) -> str: return nickname[::-1] -def _lines_in_file(filename: str) -> int: - """Returns the number of lines in a file - """ - if os.path.isfile(filename): - try: - with open(filename, 'r', encoding='utf-8') as fp_lines: - return len(fp_lines.read().split('\n')) - except OSError: - print('EX: _lines_in_file error reading ' + filename) - return 0 - - def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str, show_accounts: bool, broch_mode: bool) -> {}: """See https://github.com/McKael/mastodon-documentation/ @@ -241,8 +230,8 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str, fields = [] published = None if show_accounts and not broch_mode: - no_of_followers = _lines_in_file(account_dir + '/followers.txt') - no_of_following = _lines_in_file(account_dir + '/following.txt') + no_of_followers = lines_in_file(account_dir + '/followers.txt') + no_of_following = lines_in_file(account_dir + '/following.txt') # count the number of posts for _, _, files2 in os.walk(account_dir + '/outbox'): no_of_statuses = len(files2) @@ -270,16 +259,16 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str, "value": tag[prop_value_name], "verified_at": None }) - published_filename = \ - acct_dir(base_dir, nickname, domain) + '/.last_published' - if os.path.isfile(published_filename): - try: - with open(published_filename, 'r', - encoding='utf-8') as fp_pub: - published = fp_pub.read() - except OSError: - print('EX: unable to read last published time ' + - published_filename) + published_filename = \ + acct_dir(base_dir, nickname, domain) + '/.last_published' + if os.path.isfile(published_filename): + try: + with open(published_filename, 'r', + encoding='utf-8') as fp_pub: + published = fp_pub.read() + except OSError: + print('EX: unable to read last published time 1 ' + + published_filename) masto_account_json = { "id": get_masto_api_v1id_from_nickname(nickname), @@ -325,7 +314,7 @@ def masto_api_v1_response(path: str, calling_domain: str, show_node_info_accounts: bool, broch_mode: bool) -> ({}, str): """This is a vestigil mastodon API for the purpose - of returning an empty result to sites like + of returning a result to sites like https://mastopeek.app-dist.eu """ send_json = None diff --git a/mastoapiv2.py b/mastoapiv2.py new file mode 100644 index 000000000..07ee2663c --- /dev/null +++ b/mastoapiv2.py @@ -0,0 +1,307 @@ +__filename__ = "mastoapiv2.py" +__author__ = "Bob Mottram" +__license__ = "AGPL3+" +__version__ = "1.4.0" +__maintainer__ = "Bob Mottram" +__email__ = "bob@libreserver.org" +__status__ = "Production" +__module_group__ = "API" + +import os +from utils import load_json +from utils import get_config_param +from utils import acct_dir +from utils import remove_html +from utils import get_attachment_property_value +from utils import no_of_accounts +from utils import get_image_extensions +from utils import get_video_extensions +from utils import get_audio_extensions +from utils import get_image_mime_type +from utils import lines_in_file + + +def _get_masto_api_v2id_from_nickname(nickname: str) -> int: + """Given an account nickname return the corresponding mastodon id + """ + return int.from_bytes(nickname.encode('utf-8'), 'little') + + +def _meta_data_instance_v2(show_accounts: bool, + instance_title: str, + instance_description: str, + http_prefix: str, base_dir: str, + admin_nickname: str, domain: str, domain_full: str, + registration: bool, system_language: str, + version: str, translate: {}) -> {}: + """ /api/v2/instance endpoint + """ + account_dir = base_dir + '/accounts/' + admin_nickname + '@' + domain + admin_actor_filename = account_dir + '.json' + if not os.path.isfile(admin_actor_filename): + return {} + + admin_actor = load_json(admin_actor_filename, 0) + if not admin_actor: + print('WARN: json load exception _meta_data_instance_v1') + return {} + + rules_list = [] + rules_filename = \ + base_dir + '/accounts/tos.md' + if os.path.isfile(rules_filename): + with open(rules_filename, 'r', encoding='utf-8') as fp_rules: + rules_lines = fp_rules.readlines() + rule_ctr = 1 + for line in rules_lines: + line = line.strip() + if not line: + continue + if line.startswith('#'): + continue + rules_list.append({ + 'id': str(rule_ctr), + 'text': line + }) + rule_ctr += 1 + + is_bot = False + is_group = False + if admin_actor['type'] == 'Group': + is_group = True + elif admin_actor['type'] != 'Person': + is_bot = True + + url = \ + http_prefix + '://' + domain_full + '/@' + \ + admin_actor['preferredUsername'] + + if show_accounts: + active_accounts = no_of_accounts(base_dir) + else: + active_accounts = 1 + + created_at = '' + if admin_actor.get('published'): + created_at = admin_actor['published'] + + icon_url = remove_html(admin_actor['icon']['url']) + image_url = remove_html(admin_actor['image']['url']) + thumbnail_url = http_prefix + '://' + domain_full + '/login.png' + admin_email = None + noindex = True + if 'indexable' in admin_actor: + if admin_actor['indexable'] is True: + noindex = False + discoverable = True + if 'discoverable' in admin_actor: + if admin_actor['discoverable'] is False: + discoverable = False + no_of_statuses = 0 + no_of_followers = 0 + no_of_following = 0 + if show_accounts: + no_of_followers = lines_in_file(account_dir + '/followers.txt') + no_of_following = lines_in_file(account_dir + '/following.txt') + # count the number of posts + for _, _, files2 in os.walk(account_dir + '/outbox'): + no_of_statuses = len(files2) + break + published = None + published_filename = \ + acct_dir(base_dir, admin_nickname, domain) + '/.last_published' + if os.path.isfile(published_filename): + try: + with open(published_filename, 'r', + encoding='utf-8') as fp_pub: + published = fp_pub.read() + except OSError: + print('EX: unable to read last published time 2 ' + + published_filename) + + # get all supported mime types + supported_mime_types = [] + image_ext = get_image_extensions() + for ext in image_ext: + mime_str = get_image_mime_type('x.' + ext) + if mime_str not in supported_mime_types: + supported_mime_types.append(mime_str) + video_ext = get_video_extensions() + for ext in video_ext: + supported_mime_types.append('video/' + ext) + audio_ext = get_audio_extensions() + for ext in audio_ext: + supported_mime_types.append('audio/' + ext) + + fields = [] + # get account fields from attachments + if admin_actor.get('attachment'): + if isinstance(admin_actor['attachment'], list): + translated_email = translate['Email'].lower() + email_fields = ('email', 'e-mail', translated_email) + for tag in admin_actor['attachment']: + if not isinstance(tag, dict): + continue + if not tag.get('name'): + continue + if not isinstance(tag['name'], str): + continue + prop_value_name, _ = \ + get_attachment_property_value(tag) + if not prop_value_name: + continue + if not tag.get(prop_value_name): + continue + if not isinstance(tag[prop_value_name], str): + continue + tag_name = tag['name'] + tag_name_lower = tag_name.lower() + if tag_name_lower in email_fields and \ + '@' in tag[prop_value_name]: + admin_email = tag[prop_value_name] + fields.append({ + "name": tag_name, + "value": tag[prop_value_name], + "verified_at": None + }) + + instance = { + "domain": domain_full, + "title": instance_title, + "version": version, + "source_url": "https://gitlab.com/bashrc2/epicyon", + "description": instance_description, + "usage": { + "users": { + "active_month": active_accounts + } + }, + "thumbnail": { + "url": thumbnail_url, + "blurhash": "UeKUpFxuo~R%0nW;WCnhF6RjaJt757oJodS$", + "versions": { + "@1x": thumbnail_url, + "@2x": thumbnail_url + } + }, + "languages": [system_language], + "configuration": { + "urls": { + }, + "accounts": { + "max_featured_tags": 20 + }, + "statuses": { + "max_characters": 5000, + "max_media_attachments": 1, + "characters_reserved_per_url": 23 + }, + "media_attachments": { + "supported_mime_types": supported_mime_types, + "image_size_limit": 10485760, + "image_matrix_limit": 16777216, + "video_size_limit": 41943040, + "video_frame_rate_limit": 60, + "video_matrix_limit": 2304000 + }, + "polls": { + "max_options": 4, + "max_characters_per_option": 50, + "min_expiration": 300, + "max_expiration": 2629746 + }, + "translation": { + "enabled": False + } + }, + "registrations": { + "enabled": registration, + "approval_required": False, + "message": None + }, + "contact": { + "email": admin_email, + "account": { + "id": _get_masto_api_v2id_from_nickname(admin_nickname), + "username": admin_nickname, + "acct": admin_nickname, + "display_name": admin_actor['name'], + "locked": admin_actor['manuallyApprovesFollowers'], + "bot": is_bot, + "discoverable": discoverable, + "group": is_group, + "created_at": created_at, + "note": '

Admin of ' + domain + '

', + "url": url, + "avatar": icon_url, + "avatar_static": icon_url, + "header": image_url, + "header_static": image_url, + "followers_count": no_of_followers, + "following_count": no_of_following, + "statuses_count": no_of_statuses, + "last_status_at": published, + "noindex": noindex, + "emojis": [], + "fields": fields + } + }, + "rules": rules_list + } + + return instance + + +def masto_api_v2_response(path: str, calling_domain: str, + ua_str: str, + authorized: bool, + http_prefix: str, + base_dir: str, nickname: str, domain: str, + domain_full: str, + onion_domain: str, i2p_domain: str, + translate: {}, + registration: bool, + system_language: str, + project_version: str, + custom_emoji: [], + show_node_info_accounts: bool, + broch_mode: bool) -> ({}, str): + """This is a vestigil mastodon API for the purpose + of returning a result + """ + send_json = None + send_json_str = '' + if not ua_str: + ua_str = '' + + admin_nickname = get_config_param(base_dir, 'admin') + if admin_nickname and path == '/api/v2/instance': + instance_description = \ + get_config_param(base_dir, 'instanceDescription') + instance_title = get_config_param(base_dir, 'instanceTitle') + + if calling_domain.endswith('.onion') and onion_domain: + domain_full = onion_domain + http_prefix = 'http' + elif (calling_domain.endswith('.i2p') and i2p_domain): + domain_full = i2p_domain + http_prefix = 'http' + + if broch_mode: + show_node_info_accounts = False + + send_json = \ + _meta_data_instance_v2(show_node_info_accounts, + instance_title, + instance_description, + http_prefix, + base_dir, + admin_nickname, + domain, + domain_full, + registration, + system_language, + project_version, + translate) + send_json_str = 'masto API instance metadata sent ' + ua_str + return send_json, send_json_str diff --git a/utils.py b/utils.py index 05f7d8f54..302bc7485 100644 --- a/utils.py +++ b/utils.py @@ -4641,3 +4641,15 @@ def get_status_count(base_dir: str) -> int: break break return status_ctr + + +def lines_in_file(filename: str) -> int: + """Returns the number of lines in a file + """ + if os.path.isfile(filename): + try: + with open(filename, 'r', encoding='utf-8') as fp_lines: + return len(fp_lines.read().split('\n')) + except OSError: + print('EX: lines_in_file error reading ' + filename) + return 0