Limited support for mastodon api v2

main
Bob Mottram 2023-10-02 21:29:30 +01:00
parent 7a5f06f28c
commit cccad2fbda
4 changed files with 469 additions and 25 deletions

136
daemon.py
View File

@ -33,6 +33,7 @@ from webfinger import webfinger_lookup
from webfinger import wellknown_protocol_handler
from webfinger import webfinger_update
from mastoapiv1 import masto_api_v1_response
from mastoapiv2 import masto_api_v2_response
from metadata import meta_data_node_info
from metadata import metadata_custom_emoji
from enigma import get_enigma_pub_key
@ -1476,6 +1477,132 @@ class PubServer(BaseHTTPRequestHandler):
self.server.masto_api_is_active = False
return True
def _masto_api_v2(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
referer_domain: str,
debug: bool,
calling_site_timeout: int,
known_crawlers: {},
sites_unavailable: []) -> bool:
"""This is a vestigil mastodon v2 API for the purpose
of returning an empty result to sites like
https://mastopeek.app-dist.eu
"""
if not path.startswith('/api/v2/'):
return False
if not referer_domain:
if not (debug and self.server.unit_test):
print('mastodon api v2 request has no referer domain ' +
str(ua_str))
self._400()
return True
if referer_domain == domain_full:
print('mastodon api v2 request from self')
self._400()
return True
if self.server.masto_api_is_active:
print('mastodon api v2 is busy during request from ' +
referer_domain)
self._503()
return True
self.server.masto_api_is_active = True
# is this a real website making the call ?
if not debug and not self.server.unit_test and referer_domain:
# Does calling_domain look like a domain?
if ' ' in referer_domain or \
';' in referer_domain or \
'.' not in referer_domain:
print('mastodon api v2 ' +
'referer does not look like a domain ' +
referer_domain)
self._400()
self.server.masto_api_is_active = False
return True
if not self.server.allow_local_network_access:
if local_network_host(referer_domain):
print('mastodon api v2 referer domain is from the ' +
'local network ' + referer_domain)
self._400()
self.server.masto_api_is_active = False
return True
if not referer_is_active(http_prefix,
referer_domain, ua_str,
calling_site_timeout,
sites_unavailable):
print('mastodon api v2 referer url is not active ' +
referer_domain)
self._400()
self.server.masto_api_is_active = False
return True
print('mastodon api v2: ' + path)
print('mastodon api v2: authorized ' + str(authorized))
print('mastodon api v2: nickname ' + str(nickname))
print('mastodon api v2: referer ' + str(referer_domain))
crawl_time = \
update_known_crawlers(ua_str, base_dir,
known_crawlers,
self.server.last_known_crawler)
if crawl_time is not None:
self.server.last_known_crawler = crawl_time
broch_mode = broch_mode_is_active(base_dir)
send_json, send_json_str = \
masto_api_v2_response(path,
calling_domain,
ua_str,
authorized,
http_prefix,
base_dir,
nickname, domain,
domain_full,
onion_domain,
i2p_domain,
translate,
registration,
system_language,
project_version,
custom_emoji,
show_node_info_accounts,
broch_mode)
if send_json is not None:
msg_str = json.dumps(send_json)
msg_str = self._convert_domains(calling_domain, referer_domain,
msg_str)
msg = msg_str.encode('utf-8')
msglen = len(msg)
if self._has_accept(calling_domain):
protocol_str = \
get_json_content_from_accept(self.headers.get('Accept'))
self._set_headers(protocol_str, msglen,
None, calling_domain, True)
else:
self._set_headers('application/ld+json', msglen,
None, calling_domain, True)
self._write(msg)
if send_json_str:
print(send_json_str)
self.server.masto_api_is_active = False
return True
# no api v2 endpoints were matched
self._404()
self.server.masto_api_is_active = False
return True
def _masto_api(self, path: str, calling_domain: str,
ua_str: str,
authorized: bool, http_prefix: str,
@ -1491,6 +1618,15 @@ class PubServer(BaseHTTPRequestHandler):
referer_domain: str, debug: bool,
known_crawlers: {},
sites_unavailable: []) -> bool:
if self._masto_api_v2(path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,
translate, registration, system_language,
project_version, custom_emoji,
show_node_info_accounts,
referer_domain, debug, 5,
known_crawlers, sites_unavailable):
return True
return self._masto_api_v1(path, calling_domain, ua_str, authorized,
http_prefix, base_dir, nickname, domain,
domain_full, onion_domain, i2p_domain,

View File

@ -15,6 +15,7 @@ from utils import remove_html
from utils import get_attachment_property_value
from utils import no_of_accounts
from utils import get_status_count
from utils import lines_in_file
def _meta_data_instance_v1(show_accounts: bool,
@ -191,18 +192,6 @@ def get_nickname_from_masto_api_v1id(masto_id: int) -> str:
return nickname[::-1]
def _lines_in_file(filename: str) -> int:
"""Returns the number of lines in a file
"""
if os.path.isfile(filename):
try:
with open(filename, 'r', encoding='utf-8') as fp_lines:
return len(fp_lines.read().split('\n'))
except OSError:
print('EX: _lines_in_file error reading ' + filename)
return 0
def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str,
show_accounts: bool, broch_mode: bool) -> {}:
"""See https://github.com/McKael/mastodon-documentation/
@ -241,8 +230,8 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str,
fields = []
published = None
if show_accounts and not broch_mode:
no_of_followers = _lines_in_file(account_dir + '/followers.txt')
no_of_following = _lines_in_file(account_dir + '/following.txt')
no_of_followers = lines_in_file(account_dir + '/followers.txt')
no_of_following = lines_in_file(account_dir + '/following.txt')
# count the number of posts
for _, _, files2 in os.walk(account_dir + '/outbox'):
no_of_statuses = len(files2)
@ -278,7 +267,7 @@ def _get_masto_api_v1account(base_dir: str, nickname: str, domain: str,
encoding='utf-8') as fp_pub:
published = fp_pub.read()
except OSError:
print('EX: unable to read last published time ' +
print('EX: unable to read last published time 1 ' +
published_filename)
masto_account_json = {
@ -325,7 +314,7 @@ def masto_api_v1_response(path: str, calling_domain: str,
show_node_info_accounts: bool,
broch_mode: bool) -> ({}, str):
"""This is a vestigil mastodon API for the purpose
of returning an empty result to sites like
of returning a result to sites like
https://mastopeek.app-dist.eu
"""
send_json = None

307
mastoapiv2.py 100644
View File

@ -0,0 +1,307 @@
__filename__ = "mastoapiv2.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.4.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "API"
import os
from utils import load_json
from utils import get_config_param
from utils import acct_dir
from utils import remove_html
from utils import get_attachment_property_value
from utils import no_of_accounts
from utils import get_image_extensions
from utils import get_video_extensions
from utils import get_audio_extensions
from utils import get_image_mime_type
from utils import lines_in_file
def _get_masto_api_v2id_from_nickname(nickname: str) -> int:
"""Given an account nickname return the corresponding mastodon id
"""
return int.from_bytes(nickname.encode('utf-8'), 'little')
def _meta_data_instance_v2(show_accounts: bool,
instance_title: str,
instance_description: str,
http_prefix: str, base_dir: str,
admin_nickname: str, domain: str, domain_full: str,
registration: bool, system_language: str,
version: str, translate: {}) -> {}:
""" /api/v2/instance endpoint
"""
account_dir = base_dir + '/accounts/' + admin_nickname + '@' + domain
admin_actor_filename = account_dir + '.json'
if not os.path.isfile(admin_actor_filename):
return {}
admin_actor = load_json(admin_actor_filename, 0)
if not admin_actor:
print('WARN: json load exception _meta_data_instance_v1')
return {}
rules_list = []
rules_filename = \
base_dir + '/accounts/tos.md'
if os.path.isfile(rules_filename):
with open(rules_filename, 'r', encoding='utf-8') as fp_rules:
rules_lines = fp_rules.readlines()
rule_ctr = 1
for line in rules_lines:
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
rules_list.append({
'id': str(rule_ctr),
'text': line
})
rule_ctr += 1
is_bot = False
is_group = False
if admin_actor['type'] == 'Group':
is_group = True
elif admin_actor['type'] != 'Person':
is_bot = True
url = \
http_prefix + '://' + domain_full + '/@' + \
admin_actor['preferredUsername']
if show_accounts:
active_accounts = no_of_accounts(base_dir)
else:
active_accounts = 1
created_at = ''
if admin_actor.get('published'):
created_at = admin_actor['published']
icon_url = remove_html(admin_actor['icon']['url'])
image_url = remove_html(admin_actor['image']['url'])
thumbnail_url = http_prefix + '://' + domain_full + '/login.png'
admin_email = None
noindex = True
if 'indexable' in admin_actor:
if admin_actor['indexable'] is True:
noindex = False
discoverable = True
if 'discoverable' in admin_actor:
if admin_actor['discoverable'] is False:
discoverable = False
no_of_statuses = 0
no_of_followers = 0
no_of_following = 0
if show_accounts:
no_of_followers = lines_in_file(account_dir + '/followers.txt')
no_of_following = lines_in_file(account_dir + '/following.txt')
# count the number of posts
for _, _, files2 in os.walk(account_dir + '/outbox'):
no_of_statuses = len(files2)
break
published = None
published_filename = \
acct_dir(base_dir, admin_nickname, domain) + '/.last_published'
if os.path.isfile(published_filename):
try:
with open(published_filename, 'r',
encoding='utf-8') as fp_pub:
published = fp_pub.read()
except OSError:
print('EX: unable to read last published time 2 ' +
published_filename)
# get all supported mime types
supported_mime_types = []
image_ext = get_image_extensions()
for ext in image_ext:
mime_str = get_image_mime_type('x.' + ext)
if mime_str not in supported_mime_types:
supported_mime_types.append(mime_str)
video_ext = get_video_extensions()
for ext in video_ext:
supported_mime_types.append('video/' + ext)
audio_ext = get_audio_extensions()
for ext in audio_ext:
supported_mime_types.append('audio/' + ext)
fields = []
# get account fields from attachments
if admin_actor.get('attachment'):
if isinstance(admin_actor['attachment'], list):
translated_email = translate['Email'].lower()
email_fields = ('email', 'e-mail', translated_email)
for tag in admin_actor['attachment']:
if not isinstance(tag, dict):
continue
if not tag.get('name'):
continue
if not isinstance(tag['name'], str):
continue
prop_value_name, _ = \
get_attachment_property_value(tag)
if not prop_value_name:
continue
if not tag.get(prop_value_name):
continue
if not isinstance(tag[prop_value_name], str):
continue
tag_name = tag['name']
tag_name_lower = tag_name.lower()
if tag_name_lower in email_fields and \
'@' in tag[prop_value_name]:
admin_email = tag[prop_value_name]
fields.append({
"name": tag_name,
"value": tag[prop_value_name],
"verified_at": None
})
instance = {
"domain": domain_full,
"title": instance_title,
"version": version,
"source_url": "https://gitlab.com/bashrc2/epicyon",
"description": instance_description,
"usage": {
"users": {
"active_month": active_accounts
}
},
"thumbnail": {
"url": thumbnail_url,
"blurhash": "UeKUpFxuo~R%0nW;WCnhF6RjaJt757oJodS$",
"versions": {
"@1x": thumbnail_url,
"@2x": thumbnail_url
}
},
"languages": [system_language],
"configuration": {
"urls": {
},
"accounts": {
"max_featured_tags": 20
},
"statuses": {
"max_characters": 5000,
"max_media_attachments": 1,
"characters_reserved_per_url": 23
},
"media_attachments": {
"supported_mime_types": supported_mime_types,
"image_size_limit": 10485760,
"image_matrix_limit": 16777216,
"video_size_limit": 41943040,
"video_frame_rate_limit": 60,
"video_matrix_limit": 2304000
},
"polls": {
"max_options": 4,
"max_characters_per_option": 50,
"min_expiration": 300,
"max_expiration": 2629746
},
"translation": {
"enabled": False
}
},
"registrations": {
"enabled": registration,
"approval_required": False,
"message": None
},
"contact": {
"email": admin_email,
"account": {
"id": _get_masto_api_v2id_from_nickname(admin_nickname),
"username": admin_nickname,
"acct": admin_nickname,
"display_name": admin_actor['name'],
"locked": admin_actor['manuallyApprovesFollowers'],
"bot": is_bot,
"discoverable": discoverable,
"group": is_group,
"created_at": created_at,
"note": '<p>Admin of ' + domain + '</p>',
"url": url,
"avatar": icon_url,
"avatar_static": icon_url,
"header": image_url,
"header_static": image_url,
"followers_count": no_of_followers,
"following_count": no_of_following,
"statuses_count": no_of_statuses,
"last_status_at": published,
"noindex": noindex,
"emojis": [],
"fields": fields
}
},
"rules": rules_list
}
return instance
def masto_api_v2_response(path: str, calling_domain: str,
ua_str: str,
authorized: bool,
http_prefix: str,
base_dir: str, nickname: str, domain: str,
domain_full: str,
onion_domain: str, i2p_domain: str,
translate: {},
registration: bool,
system_language: str,
project_version: str,
custom_emoji: [],
show_node_info_accounts: bool,
broch_mode: bool) -> ({}, str):
"""This is a vestigil mastodon API for the purpose
of returning a result
"""
send_json = None
send_json_str = ''
if not ua_str:
ua_str = ''
admin_nickname = get_config_param(base_dir, 'admin')
if admin_nickname and path == '/api/v2/instance':
instance_description = \
get_config_param(base_dir, 'instanceDescription')
instance_title = get_config_param(base_dir, 'instanceTitle')
if calling_domain.endswith('.onion') and onion_domain:
domain_full = onion_domain
http_prefix = 'http'
elif (calling_domain.endswith('.i2p') and i2p_domain):
domain_full = i2p_domain
http_prefix = 'http'
if broch_mode:
show_node_info_accounts = False
send_json = \
_meta_data_instance_v2(show_node_info_accounts,
instance_title,
instance_description,
http_prefix,
base_dir,
admin_nickname,
domain,
domain_full,
registration,
system_language,
project_version,
translate)
send_json_str = 'masto API instance metadata sent ' + ua_str
return send_json, send_json_str

View File

@ -4641,3 +4641,15 @@ def get_status_count(base_dir: str) -> int:
break
break
return status_ctr
def lines_in_file(filename: str) -> int:
"""Returns the number of lines in a file
"""
if os.path.isfile(filename):
try:
with open(filename, 'r', encoding='utf-8') as fp_lines:
return len(fp_lines.read().split('\n'))
except OSError:
print('EX: lines_in_file error reading ' + filename)
return 0