epicyon/follow.py

1614 lines
62 KiB
Python

__filename__ = "follow.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from pprint import pprint
from utils import get_user_paths
from utils import acct_handle_dir
from utils import has_object_string_object
from utils import has_object_string_type
from utils import remove_domain_port
from utils import has_users_path
from utils import get_full_domain
from utils import get_followers_list
from utils import valid_nickname
from utils import domain_permitted
from utils import get_domain_from_actor
from utils import get_nickname_from_actor
from utils import get_status_number
from utils import follow_person
from posts import send_signed_json
from posts import get_person_box
from utils import load_json
from utils import save_json
from utils import is_account_dir
from utils import acct_dir
from utils import has_group_type
from utils import local_actor_url
from utils import text_in_file
from utils import remove_eol
from utils import get_actor_from_post
from acceptreject import create_accept
from acceptreject import create_reject
from webfinger import webfinger_handle
from auth import create_basic_auth_header
from session import get_json
from session import get_json_valid
from session import post_json
from followerSync import remove_followers_sync
def create_initial_last_seen(base_dir: str, http_prefix: str) -> None:
"""Creates initial lastseen files for all follows.
The lastseen files are used to generate the Zzz icons on
follows/following lists on the profile screen.
"""
for _, dirs, _ in os.walk(base_dir + '/accounts'):
for acct in dirs:
if not is_account_dir(acct):
continue
account_dir = os.path.join(base_dir + '/accounts', acct)
following_filename = account_dir + '/following.txt'
if not os.path.isfile(following_filename):
continue
last_seen_dir = account_dir + '/lastseen'
if not os.path.isdir(last_seen_dir):
os.mkdir(last_seen_dir)
following_handles = []
try:
with open(following_filename, 'r',
encoding='utf-8') as fp_foll:
following_handles = fp_foll.readlines()
except OSError:
print('EX: create_initial_last_seen ' + following_filename)
for handle in following_handles:
if '#' in handle:
continue
if '@' not in handle:
continue
handle = remove_eol(handle)
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
if nickname.startswith('!'):
nickname = nickname[1:]
actor = local_actor_url(http_prefix, nickname, domain)
last_seen_filename = \
last_seen_dir + '/' + actor.replace('/', '#') + '.txt'
if not os.path.isfile(last_seen_filename):
try:
with open(last_seen_filename, 'w+',
encoding='utf-8') as fp_last:
fp_last.write(str(100))
except OSError:
print('EX: create_initial_last_seen 2 ' +
last_seen_filename)
break
def _pre_approved_follower(base_dir: str,
nickname: str, domain: str,
approve_handle: str) -> bool:
"""Is the given handle an already manually approved follower?
"""
account_dir = acct_dir(base_dir, nickname, domain)
approved_filename = account_dir + '/approved.txt'
if os.path.isfile(approved_filename):
if text_in_file(approve_handle, approved_filename):
return True
return False
def _remove_from_follow_base(base_dir: str,
nickname: str, domain: str,
accept_or_deny_handle: str, follow_file: str,
debug: bool) -> None:
"""Removes a handle/actor from follow requests or rejects file
"""
accounts_dir = acct_dir(base_dir, nickname, domain)
approve_follows_filename = accounts_dir + '/' + follow_file + '.txt'
if not os.path.isfile(approve_follows_filename):
if debug:
print('There is no ' + follow_file +
' to remove ' + nickname + '@' + domain + ' from')
return
accept_deny_actor = None
if not text_in_file(accept_or_deny_handle, approve_follows_filename):
# is this stored in the file as an actor rather than a handle?
accept_deny_nickname = accept_or_deny_handle.split('@')[0]
accept_deny_domain = accept_or_deny_handle.split('@')[1]
# for each possible users path construct an actor and
# check if it exists in the file
users_paths = get_user_paths()
actor_found = False
for users_name in users_paths:
accept_deny_actor = \
'://' + accept_deny_domain + users_name + accept_deny_nickname
if text_in_file(accept_deny_actor, approve_follows_filename):
actor_found = True
break
if not actor_found:
accept_deny_actor = \
'://' + accept_deny_domain + '/' + accept_deny_nickname
if text_in_file(accept_deny_actor, approve_follows_filename):
actor_found = True
if not actor_found:
return
try:
with open(approve_follows_filename + '.new', 'w+',
encoding='utf-8') as approvefilenew:
with open(approve_follows_filename, 'r',
encoding='utf-8') as approvefile:
if not accept_deny_actor:
for approve_handle in approvefile:
accept_deny_handle = accept_or_deny_handle
if not approve_handle.startswith(accept_deny_handle):
approvefilenew.write(approve_handle)
else:
for approve_handle in approvefile:
if accept_deny_actor not in approve_handle:
approvefilenew.write(approve_handle)
except OSError as ex:
print('EX: _remove_from_follow_base ' +
approve_follows_filename + ' ' + str(ex))
os.rename(approve_follows_filename + '.new', approve_follows_filename)
def remove_from_follow_requests(base_dir: str,
nickname: str, domain: str,
deny_handle: str, debug: bool) -> None:
"""Removes a handle from follow requests
"""
_remove_from_follow_base(base_dir, nickname, domain,
deny_handle, 'followrequests', debug)
def _remove_from_follow_rejects(base_dir: str,
nickname: str, domain: str,
accept_handle: str, debug: bool) -> None:
"""Removes a handle from follow rejects
"""
_remove_from_follow_base(base_dir, nickname, domain,
accept_handle, 'followrejects', debug)
def is_following_actor(base_dir: str,
nickname: str, domain: str, actor: str) -> bool:
"""Is the given nickname following the given actor?
The actor can also be a handle: nickname@domain
"""
domain = remove_domain_port(domain)
accounts_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(accounts_dir):
return False
following_file = accounts_dir + '/following.txt'
if not os.path.isfile(following_file):
return False
if actor.startswith('@'):
actor = actor[1:]
if text_in_file(actor, following_file, False):
return True
following_nickname = get_nickname_from_actor(actor)
if not following_nickname:
print('WARN: unable to find nickname in ' + actor)
return False
following_domain, following_port = get_domain_from_actor(actor)
if not following_domain:
print('WARN: unable to find domain in ' + actor)
return False
following_handle = \
get_full_domain(following_nickname + '@' + following_domain,
following_port)
if text_in_file(following_handle, following_file, False):
return True
return False
def get_mutuals_of_person(base_dir: str,
nickname: str, domain: str) -> []:
"""Returns the mutuals of a person
i.e. accounts which they follow and which also follow back
"""
followers = \
get_followers_list(base_dir, nickname, domain, 'followers.txt')
following = \
get_followers_list(base_dir, nickname, domain, 'following.txt')
mutuals = []
for handle in following:
if handle in followers:
mutuals.append(handle)
return mutuals
def add_follower_of_person(base_dir: str, nickname: str, domain: str,
follower_nickname: str, follower_domain: str,
federation_list: [], debug: bool,
group_account: bool) -> bool:
"""Adds a follower of the given person
"""
return follow_person(base_dir, nickname, domain,
follower_nickname, follower_domain,
federation_list, debug, group_account,
'followers.txt')
def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []:
"""Returns a list of domains for followers
"""
domain = remove_domain_port(domain)
followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_file):
return []
lines = []
try:
with open(followers_file, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
except OSError:
print('EX: get_follower_domains ' + followers_file)
domains_list = []
for handle in lines:
handle = remove_eol(handle)
follower_domain, _ = get_domain_from_actor(handle)
if not follower_domain:
continue
if follower_domain not in domains_list:
domains_list.append(follower_domain)
return domains_list
def is_follower_of_person(base_dir: str, nickname: str, domain: str,
follower_nickname: str,
follower_domain: str) -> bool:
"""is the given nickname a follower of follower_nickname?
"""
if not follower_domain:
print('No follower_domain')
return False
if not follower_nickname:
print('No follower_nickname for ' + follower_domain)
return False
domain = remove_domain_port(domain)
followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_file):
return False
handle = follower_nickname + '@' + follower_domain
already_following = False
followers_str = ''
try:
with open(followers_file, 'r', encoding='utf-8') as fp_foll:
followers_str = fp_foll.read()
except OSError:
print('EX: is_follower_of_person ' + followers_file)
if handle in followers_str:
already_following = True
else:
paths = get_user_paths()
for user_path in paths:
url = '://' + follower_domain + user_path + follower_nickname
if url in followers_str:
already_following = True
break
if not already_following:
url = '://' + follower_domain + '/' + follower_nickname
if url in followers_str:
already_following = True
return already_following
def unfollow_account(base_dir: str, nickname: str, domain: str,
follow_nickname: str, follow_domain: str,
debug: bool, group_account: bool,
follow_file: str) -> bool:
"""Removes a person to the follow list
"""
domain = remove_domain_port(domain)
handle = nickname + '@' + domain
handle_to_unfollow = follow_nickname + '@' + follow_domain
if group_account:
handle_to_unfollow = '!' + handle_to_unfollow
if not os.path.isdir(base_dir + '/accounts'):
os.mkdir(base_dir + '/accounts')
handle_dir = acct_handle_dir(base_dir, handle)
if not os.path.isdir(handle_dir):
os.mkdir(handle_dir)
accounts_dir = acct_dir(base_dir, nickname, domain)
filename = accounts_dir + '/' + follow_file
if not os.path.isfile(filename):
if debug:
print('DEBUG: follow file ' + filename + ' was not found')
return False
handle_to_unfollow_lower = handle_to_unfollow.lower()
if not text_in_file(handle_to_unfollow_lower, filename, False):
if debug:
print('DEBUG: handle to unfollow ' + handle_to_unfollow +
' is not in ' + filename)
return
lines = []
try:
with open(filename, 'r', encoding='utf-8') as fp_unfoll:
lines = fp_unfoll.readlines()
except OSError:
print('EX: unfollow_account ' + filename)
if lines:
try:
with open(filename, 'w+', encoding='utf-8') as fp_unfoll:
for line in lines:
check_handle = line.strip("\n").strip("\r").lower()
if check_handle not in (handle_to_unfollow_lower,
'!' + handle_to_unfollow_lower):
fp_unfoll.write(line)
except OSError as ex:
print('EX: unable to write ' + filename + ' ' + str(ex))
# write to an unfollowed file so that if a follow accept
# later arrives then it can be ignored
unfollowed_filename = accounts_dir + '/unfollowed.txt'
if os.path.isfile(unfollowed_filename):
if not text_in_file(handle_to_unfollow_lower,
unfollowed_filename, False):
try:
with open(unfollowed_filename, 'a+',
encoding='utf-8') as fp_unfoll:
fp_unfoll.write(handle_to_unfollow + '\n')
except OSError:
print('EX: unable to append ' + unfollowed_filename)
else:
try:
with open(unfollowed_filename, 'w+',
encoding='utf-8') as fp_unfoll:
fp_unfoll.write(handle_to_unfollow + '\n')
except OSError:
print('EX: unable to write ' + unfollowed_filename)
return True
def unfollower_of_account(base_dir: str, nickname: str, domain: str,
follower_nickname: str, follower_domain: str,
debug: bool, group_account: bool) -> bool:
"""Remove a follower of a person
"""
return unfollow_account(base_dir, nickname, domain,
follower_nickname, follower_domain,
debug, group_account, 'followers.txt')
def clear_follows(base_dir: str, nickname: str, domain: str,
follow_file: str) -> None:
"""Removes all follows
"""
if not os.path.isdir(base_dir + '/accounts'):
os.mkdir(base_dir + '/accounts')
accounts_dir = acct_dir(base_dir, nickname, domain)
if not os.path.isdir(accounts_dir):
os.mkdir(accounts_dir)
filename = accounts_dir + '/' + follow_file
if os.path.isfile(filename):
try:
os.remove(filename)
except OSError:
print('EX: clear_follows unable to delete ' + filename)
def clear_followers(base_dir: str, nickname: str, domain: str) -> None:
"""Removes all followers
"""
clear_follows(base_dir, nickname, domain, 'followers.txt')
def _get_no_of_follows(base_dir: str, nickname: str, domain: str,
follow_file='following.txt') -> int:
"""Returns the number of follows or followers
"""
# only show number of followers to authenticated
# account holders
# if not authenticated:
# return 9999
accounts_dir = acct_dir(base_dir, nickname, domain)
filename = accounts_dir + '/' + follow_file
if not os.path.isfile(filename):
return 0
ctr = 0
lines = []
try:
with open(filename, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
except OSError:
print('EX: _get_no_of_follows ' + filename)
if lines:
for line in lines:
if '#' in line:
continue
if '@' in line and \
'.' in line and \
not line.startswith('http'):
ctr += 1
elif ((line.startswith('http') or
line.startswith('ipfs') or
line.startswith('ipns') or
line.startswith('hyper')) and
has_users_path(line)):
ctr += 1
return ctr
def get_no_of_followers(base_dir: str, nickname: str, domain: str) -> int:
"""Returns the number of followers of the given person
"""
return _get_no_of_follows(base_dir, nickname, domain, 'followers.txt')
def get_following_feed(base_dir: str, domain: str, port: int, path: str,
http_prefix: str, authorized: bool,
follows_per_page=12,
follow_file='following') -> {}:
"""Returns the following and followers feeds from GET requests.
This accesses the following.txt or followers.txt and builds a collection.
"""
# Show a small number of follows to non-authorized viewers
if not authorized:
follows_per_page = 6
if '/' + follow_file not in path:
return None
# handle page numbers
header_only = True
page_number = None
if '?page=' in path:
page_number = path.split('?page=')[1]
if len(page_number) > 5:
page_number = "1"
if page_number == 'true' or not authorized:
page_number = 1
else:
try:
page_number = int(page_number)
except BaseException:
print('EX: get_following_feed unable to convert to int ' +
str(page_number))
path = path.split('?page=')[0]
header_only = False
if not path.endswith('/' + follow_file):
return None
nickname = None
if path.startswith('/users/'):
nickname = \
path.replace('/users/', '', 1).replace('/' + follow_file, '')
if path.startswith('/@'):
nickname = path.replace('/@', '', 1).replace('/' + follow_file, '')
if not nickname:
return None
if not valid_nickname(domain, nickname):
return None
domain = get_full_domain(domain, port)
if header_only:
first_str = \
local_actor_url(http_prefix, nickname, domain) + \
'/' + follow_file + '?page=1'
id_str = \
local_actor_url(http_prefix, nickname, domain) + '/' + follow_file
total_str = \
_get_no_of_follows(base_dir, nickname, domain)
following = {
'@context': 'https://www.w3.org/ns/activitystreams',
'first': first_str,
'id': id_str,
'totalItems': total_str,
'type': 'OrderedCollection'
}
return following
if not page_number:
page_number = 1
next_page_number = int(page_number + 1)
id_str = \
local_actor_url(http_prefix, nickname, domain) + \
'/' + follow_file + '?page=' + str(page_number)
part_of_str = \
local_actor_url(http_prefix, nickname, domain) + '/' + follow_file
following = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': id_str,
'orderedItems': [],
'partOf': part_of_str,
'totalItems': 0,
'type': 'OrderedCollectionPage'
}
handle_domain = domain
handle_domain = remove_domain_port(handle_domain)
accounts_dir = acct_dir(base_dir, nickname, handle_domain)
filename = accounts_dir + '/' + follow_file + '.txt'
if not os.path.isfile(filename):
return following
curr_page = 1
page_ctr = 0
total_ctr = 0
lines = []
try:
with open(filename, 'r', encoding='utf-8') as fp_foll:
lines = fp_foll.readlines()
except OSError:
print('EX: get_following_feed ' + filename)
for line in lines:
if '#' not in line:
if '@' in line and not line.startswith('http'):
# nickname@domain
page_ctr += 1
total_ctr += 1
if curr_page == page_number:
line2_lower = line.lower()
line2 = remove_eol(line2_lower)
nick = line2.split('@')[0]
dom = line2.split('@')[1]
if not nick.startswith('!'):
# person actor
url = local_actor_url(http_prefix, nick, dom)
else:
# group actor
url = http_prefix + '://' + dom + '/c/' + nick
following['orderedItems'].append(url)
elif ((line.startswith('http') or
line.startswith('ipfs') or
line.startswith('ipns') or
line.startswith('hyper')) and
has_users_path(line)):
# https://domain/users/nickname
page_ctr += 1
total_ctr += 1
if curr_page == page_number:
append_str1 = line.lower()
append_str = remove_eol(append_str1)
following['orderedItems'].append(append_str)
if page_ctr >= follows_per_page:
page_ctr = 0
curr_page += 1
following['totalItems'] = total_ctr
last_page = int(total_ctr / follows_per_page)
last_page = max(last_page, 1)
if next_page_number > last_page:
following['next'] = \
local_actor_url(http_prefix, nickname, domain) + \
'/' + follow_file + '?page=' + str(last_page)
return following
def follow_approval_required(base_dir: str, nickname_to_follow: str,
domain_to_follow: str, debug: bool,
follow_request_handle: str) -> bool:
""" Returns the policy for follower approvals
"""
# has this handle already been manually approved?
if _pre_approved_follower(base_dir, nickname_to_follow, domain_to_follow,
follow_request_handle):
return False
manually_approve_follows = False
domain_to_follow = remove_domain_port(domain_to_follow)
actor_filename = base_dir + '/accounts/' + \
nickname_to_follow + '@' + domain_to_follow + '.json'
if os.path.isfile(actor_filename):
actor = load_json(actor_filename)
if actor:
if 'manuallyApprovesFollowers' in actor:
manually_approve_follows = actor['manuallyApprovesFollowers']
else:
if debug:
print(nickname_to_follow + '@' + domain_to_follow +
' automatically approves followers')
else:
if debug:
print('DEBUG: Actor file not found: ' + actor_filename)
return manually_approve_follows
def no_of_follow_requests(base_dir: str,
nickname_to_follow: str, domain_to_follow: str,
follow_type: str) -> int:
"""Returns the current number of follow requests
"""
accounts_dir = acct_dir(base_dir, nickname_to_follow, domain_to_follow)
approve_follows_filename = accounts_dir + '/followrequests.txt'
if not os.path.isfile(approve_follows_filename):
return 0
ctr = 0
lines = []
try:
with open(approve_follows_filename, 'r',
encoding='utf-8') as fp_approve:
lines = fp_approve.readlines()
except OSError:
print('EX: no_of_follow_requests ' + approve_follows_filename)
if lines:
if follow_type == "onion":
for file_line in lines:
if '.onion' in file_line:
ctr += 1
elif follow_type == "i2p":
for file_line in lines:
if '.i2p' in file_line:
ctr += 1
else:
return len(lines)
return ctr
def store_follow_request(base_dir: str,
nickname_to_follow: str,
domain_to_follow: str, port: int,
nickname: str, domain: str, from_port: int,
follow_json: {},
debug: bool, person_url: str,
group_account: bool) -> bool:
"""Stores the follow request for later use
"""
accounts_dir = acct_dir(base_dir, nickname_to_follow, domain_to_follow)
if not os.path.isdir(accounts_dir):
return False
domain_full = get_full_domain(domain, from_port)
approve_handle = get_full_domain(nickname + '@' + domain, from_port)
if group_account:
approve_handle = '!' + approve_handle
followers_filename = accounts_dir + '/followers.txt'
if os.path.isfile(followers_filename):
already_following = False
followers_str = ''
try:
with open(followers_filename, 'r',
encoding='utf-8') as fp_foll:
followers_str = fp_foll.read()
except OSError:
print('EX: store_follow_request ' + followers_filename)
if approve_handle in followers_str:
already_following = True
else:
users_paths = get_user_paths()
for possible_users_path in users_paths:
url = '://' + domain_full + possible_users_path + nickname
if url in followers_str:
already_following = True
break
if not already_following:
url = '://' + domain_full + '/' + nickname
if url in followers_str:
already_following = True
if already_following:
if debug:
print('DEBUG: ' +
nickname_to_follow + '@' + domain_to_follow +
' already following ' + approve_handle)
return True
# should this follow be denied?
deny_follows_filename = accounts_dir + '/followrejects.txt'
if os.path.isfile(deny_follows_filename):
if text_in_file(approve_handle, deny_follows_filename):
remove_from_follow_requests(base_dir, nickname_to_follow,
domain_to_follow, approve_handle,
debug)
print(approve_handle + ' was already denied as a follower of ' +
nickname_to_follow)
return True
# add to a file which contains a list of requests
approve_follows_filename = accounts_dir + '/followrequests.txt'
# store either nick@domain or the full person/actor url
approve_handle_stored = approve_handle
if '/users/' not in person_url:
approve_handle_stored = person_url
if group_account:
approve_handle = '!' + approve_handle
if os.path.isfile(approve_follows_filename):
if not text_in_file(approve_handle, approve_follows_filename):
try:
with open(approve_follows_filename, 'a+',
encoding='utf-8') as fp_approve:
fp_approve.write(approve_handle_stored + '\n')
except OSError:
print('EX: store_follow_request 2 ' + approve_follows_filename)
else:
if debug:
print('DEBUG: ' + approve_handle_stored +
' is already awaiting approval')
else:
try:
with open(approve_follows_filename, 'w+',
encoding='utf-8') as fp_approve:
fp_approve.write(approve_handle_stored + '\n')
except OSError:
print('EX: store_follow_request 3 ' + approve_follows_filename)
# store the follow request in its own directory
# We don't rely upon the inbox because items in there could expire
requests_dir = accounts_dir + '/requests'
if not os.path.isdir(requests_dir):
os.mkdir(requests_dir)
follow_activity_filename = requests_dir + '/' + approve_handle + '.follow'
return save_json(follow_json, follow_activity_filename)
def followed_account_accepts(session, base_dir: str, http_prefix: str,
nickname_to_follow: str, domain_to_follow: str,
port: int,
nickname: str, domain: str, from_port: int,
person_url: str, federation_list: [],
follow_json: {}, send_threads: [], post_log: [],
cached_webfingers: {}, person_cache: {},
debug: bool, project_version: str,
remove_follow_activity: bool,
signing_priv_key_pem: str,
curr_domain: str,
onion_domain: str, i2p_domain: str,
followers_sync_cache: {},
sites_unavailable: [],
system_language: str):
"""The person receiving a follow request accepts the new follower
and sends back an Accept activity
"""
accept_handle = nickname + '@' + domain
# send accept back
print('Sending follow Accept activity for ' +
'follow request which arrived at ' +
nickname_to_follow + '@' + domain_to_follow +
' back to ' + accept_handle)
accept_json = create_accept(base_dir, federation_list,
nickname_to_follow, domain_to_follow, port,
person_url, '', http_prefix,
follow_json)
pprint(accept_json)
print('DEBUG: sending follow Accept from ' +
nickname_to_follow + '@' + domain_to_follow +
' port ' + str(port) + ' to ' +
accept_handle + ' port ' + str(from_port))
client_to_server = False
if remove_follow_activity:
# remove the follow request json
follow_activity_filename = \
acct_dir(base_dir, nickname_to_follow, domain_to_follow) + \
'/requests/' + nickname + '@' + domain + '.follow'
if os.path.isfile(follow_activity_filename):
try:
os.remove(follow_activity_filename)
except OSError:
print('EX: follow Accept ' +
'followed_account_accepts unable to delete ' +
follow_activity_filename)
group_account = False
if follow_json:
if follow_json.get('actor'):
actor_url = get_actor_from_post(follow_json)
if has_group_type(base_dir, actor_url, person_cache):
group_account = True
extra_headers = {}
domain_full = get_full_domain(domain, from_port)
remove_followers_sync(followers_sync_cache,
nickname_to_follow,
domain_full)
return send_signed_json(accept_json, session, base_dir,
nickname_to_follow, domain_to_follow, port,
nickname, domain, from_port,
http_prefix, client_to_server,
federation_list,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version, None,
group_account, signing_priv_key_pem,
7856837, curr_domain, onion_domain, i2p_domain,
extra_headers, sites_unavailable,
system_language)
def followed_account_rejects(session, session_onion, session_i2p,
onion_domain: str, i2p_domain: str,
base_dir: str, http_prefix: str,
nickname_to_follow: str, domain_to_follow: str,
port: int,
nickname: str, domain: str, from_port: int,
federation_list: [],
send_threads: [], post_log: [],
cached_webfingers: {}, person_cache: {},
debug: bool, project_version: str,
signing_priv_key_pem: str,
followers_sync_cache: {},
sites_unavailable: [],
system_language: str):
"""The person receiving a follow request rejects the new follower
and sends back a Reject activity
"""
# send reject back
if debug:
print('DEBUG: sending Reject activity for ' +
'follow request which arrived at ' +
nickname_to_follow + '@' + domain_to_follow +
' back to ' + nickname + '@' + domain)
# get the json for the original follow request
follow_activity_filename = \
acct_dir(base_dir, nickname_to_follow, domain_to_follow) + \
'/requests/' + nickname + '@' + domain + '.follow'
follow_json = load_json(follow_activity_filename)
if not follow_json:
print('No follow request json was found for ' +
follow_activity_filename)
return None
# actor who made the follow request
person_url = get_actor_from_post(follow_json)
# create the reject activity
reject_json = \
create_reject(base_dir, federation_list,
nickname_to_follow, domain_to_follow, port,
person_url, '', http_prefix, follow_json)
if debug:
pprint(reject_json)
print('DEBUG: sending follow Reject from ' +
nickname_to_follow + '@' + domain_to_follow +
' port ' + str(port) + ' to ' +
nickname + '@' + domain + ' port ' + str(from_port))
client_to_server = False
deny_handle = get_full_domain(nickname + '@' + domain, from_port)
group_account = False
if has_group_type(base_dir, person_url, person_cache):
group_account = True
# remove from the follow requests file
remove_from_follow_requests(base_dir, nickname_to_follow, domain_to_follow,
deny_handle, debug)
# remove the follow request json
try:
os.remove(follow_activity_filename)
except OSError:
print('EX: followed_account_rejects unable to delete ' +
follow_activity_filename)
curr_session = session
if domain.endswith('.onion') and session_onion:
curr_session = session_onion
elif domain.endswith('.i2p') and session_i2p:
curr_session = session_i2p
extra_headers = {}
domain_full = get_full_domain(domain, from_port)
remove_followers_sync(followers_sync_cache,
nickname_to_follow,
domain_full)
# send the reject activity
return send_signed_json(reject_json, curr_session, base_dir,
nickname_to_follow, domain_to_follow, port,
nickname, domain, from_port,
http_prefix, client_to_server,
federation_list,
send_threads, post_log, cached_webfingers,
person_cache, debug, project_version, None,
group_account, signing_priv_key_pem,
6393063,
domain, onion_domain, i2p_domain,
extra_headers, sites_unavailable,
system_language)
def send_follow_request(session, base_dir: str,
nickname: str, domain: str,
sender_domain: str, sender_port: int,
http_prefix: str,
follow_nickname: str, follow_domain: str,
followed_actor: str,
follow_port: int, follow_http_prefix: str,
client_to_server: bool, federation_list: [],
send_threads: [], post_log: [], cached_webfingers: {},
person_cache: {}, debug: bool,
project_version: str, signing_priv_key_pem: str,
curr_domain: str,
onion_domain: str, i2p_domain: str,
sites_unavailable: [],
system_language: str) -> {}:
"""Gets the json object for sending a follow request
"""
if not signing_priv_key_pem:
print('WARN: follow request without signing key')
if not domain_permitted(follow_domain, federation_list):
print('You are not permitted to follow the domain ' + follow_domain)
return None
full_domain = get_full_domain(sender_domain, sender_port)
follow_actor = local_actor_url(http_prefix, nickname, full_domain)
request_domain = get_full_domain(follow_domain, follow_port)
status_number, _ = get_status_number()
group_account = False
if follow_nickname:
followed_id = followed_actor
follow_handle = follow_nickname + '@' + request_domain
group_account = has_group_type(base_dir, followed_actor, person_cache)
if group_account:
follow_handle = '!' + follow_handle
print('Follow request being sent to group account')
else:
if debug:
print('DEBUG: send_follow_request - assuming single user instance')
followed_id = follow_http_prefix + '://' + request_domain
single_user_nickname = 'dev'
follow_handle = single_user_nickname + '@' + request_domain
# remove follow handle from unfollowed.txt
unfollowed_filename = \
acct_dir(base_dir, nickname, domain) + '/unfollowed.txt'
if os.path.isfile(unfollowed_filename):
if text_in_file(follow_handle, unfollowed_filename):
unfollowed_file = None
try:
with open(unfollowed_filename, 'r',
encoding='utf-8') as fp_unfoll:
unfollowed_file = fp_unfoll.read()
except OSError:
print('EX: send_follow_request ' + unfollowed_filename)
if unfollowed_file:
unfollowed_file = \
unfollowed_file.replace(follow_handle + '\n', '')
try:
with open(unfollowed_filename, 'w+',
encoding='utf-8') as fp_unfoll:
fp_unfoll.write(unfollowed_file)
except OSError:
print('EX: unable to write ' + unfollowed_filename)
new_follow_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': follow_actor + '/statuses/' + str(status_number),
'type': 'Follow',
'actor': follow_actor,
'object': followed_id
}
if group_account:
new_follow_json['to'] = [followed_id]
print('Follow request: ' + str(new_follow_json))
if follow_approval_required(base_dir, nickname, domain, debug,
follow_handle):
# Remove any follow requests rejected for the account being followed.
# It's assumed that if you are following someone then you are
# ok with them following back. If this isn't the case then a rejected
# follow request will block them again.
_remove_from_follow_rejects(base_dir,
nickname, domain,
follow_handle, debug)
extra_headers = {}
send_signed_json(new_follow_json, session, base_dir,
nickname, sender_domain, sender_port,
follow_nickname, follow_domain, follow_port,
http_prefix, client_to_server,
federation_list,
send_threads, post_log, cached_webfingers, person_cache,
debug, project_version, None, group_account,
signing_priv_key_pem, 8234389,
curr_domain, onion_domain, i2p_domain,
extra_headers, sites_unavailable,
system_language)
return new_follow_json
def send_follow_request_via_server(base_dir: str, session,
from_nickname: str, password: str,
from_domain: str, from_port: int,
follow_nickname: str, follow_domain: str,
follow_port: int,
http_prefix: str,
cached_webfingers: {}, person_cache: {},
debug: bool, project_version: str,
signing_priv_key_pem: str,
system_language: str) -> {}:
"""Creates a follow request via c2s
"""
if not session:
print('WARN: No session for send_follow_request_via_server')
return 6
from_domain_full = get_full_domain(from_domain, from_port)
follow_domain_full = get_full_domain(follow_domain, follow_port)
follow_actor = \
local_actor_url(http_prefix, from_nickname, from_domain_full)
followed_id = \
http_prefix + '://' + follow_domain_full + '/@' + follow_nickname
status_number, _ = get_status_number()
new_follow_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': follow_actor + '/statuses/' + str(status_number),
'type': 'Follow',
'actor': follow_actor,
'object': followed_id
}
handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname
# lookup the inbox for the To handle
wf_request = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
from_domain, project_version, debug, False,
signing_priv_key_pem)
if not wf_request:
if debug:
print('DEBUG: follow request webfinger failed for ' + handle)
return 1
if not isinstance(wf_request, dict):
print('WARN: follow request Webfinger for ' + handle +
' did not return a dict. ' + str(wf_request))
return 1
post_to_box = 'outbox'
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem, origin_domain,
base_dir, session, wf_request,
person_cache,
project_version, http_prefix,
from_nickname,
from_domain, post_to_box, 52025,
system_language)
if not inbox_url:
if debug:
print('DEBUG: follow request no ' + post_to_box +
' was found for ' + handle)
return 3
if not from_person_id:
if debug:
print('DEBUG: follow request no actor was found for ' + handle)
return 4
auth_header = create_basic_auth_header(from_nickname, password)
headers = {
'host': from_domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
post_result = \
post_json(http_prefix, from_domain_full,
session, new_follow_json, [], inbox_url, headers, 3, True)
if not post_result:
if debug:
print('DEBUG: POST follow request failed for c2s to ' + inbox_url)
return 5
if debug:
print('DEBUG: c2s POST follow request success')
return new_follow_json
def send_unfollow_request_via_server(base_dir: str, session,
from_nickname: str, password: str,
from_domain: str, from_port: int,
follow_nickname: str, follow_domain: str,
follow_port: int,
http_prefix: str,
cached_webfingers: {}, person_cache: {},
debug: bool, project_version: str,
signing_priv_key_pem: str,
system_language: str) -> {}:
"""Creates a unfollow request via c2s
"""
if not session:
print('WARN: No session for send_unfollow_request_via_server')
return 6
from_domain_full = get_full_domain(from_domain, from_port)
follow_domain_full = get_full_domain(follow_domain, follow_port)
follow_actor = \
local_actor_url(http_prefix, from_nickname, from_domain_full)
followed_id = \
http_prefix + '://' + follow_domain_full + '/@' + follow_nickname
status_number, _ = get_status_number()
unfollow_json = {
'@context': 'https://www.w3.org/ns/activitystreams',
'id': follow_actor + '/statuses/' + str(status_number) + '/undo',
'type': 'Undo',
'actor': follow_actor,
'object': {
'id': follow_actor + '/statuses/' + str(status_number),
'type': 'Follow',
'actor': follow_actor,
'object': followed_id
}
}
handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname
# lookup the inbox for the To handle
wf_request = \
webfinger_handle(session, handle, http_prefix, cached_webfingers,
from_domain, project_version, debug, False,
signing_priv_key_pem)
if not wf_request:
if debug:
print('DEBUG: unfollow webfinger failed for ' + handle)
return 1
if not isinstance(wf_request, dict):
print('WARN: unfollow webfinger for ' + handle +
' did not return a dict. ' + str(wf_request))
return 1
post_to_box = 'outbox'
# get the actor inbox for the To handle
origin_domain = from_domain
(inbox_url, _, _, from_person_id, _, _,
_, _) = get_person_box(signing_priv_key_pem,
origin_domain,
base_dir, session,
wf_request, person_cache,
project_version, http_prefix,
from_nickname,
from_domain, post_to_box,
76536, system_language)
if not inbox_url:
if debug:
print('DEBUG: unfollow no ' + post_to_box +
' was found for ' + handle)
return 3
if not from_person_id:
if debug:
print('DEBUG: unfollow no actor was found for ' + handle)
return 4
auth_header = create_basic_auth_header(from_nickname, password)
headers = {
'host': from_domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
post_result = \
post_json(http_prefix, from_domain_full,
session, unfollow_json, [], inbox_url, headers, 3, True)
if not post_result:
if debug:
print('DEBUG: POST unfollow failed for c2s to ' + inbox_url)
return 5
if debug:
print('DEBUG: c2s POST unfollow success')
return unfollow_json
def get_following_via_server(session, nickname: str, password: str,
domain: str, port: int,
http_prefix: str, page_number: int,
debug: bool, project_version: str,
signing_priv_key_pem: str) -> {}:
"""Gets a page from the following collection as json
"""
if not session:
print('WARN: No session for get_following_via_server')
return 6
domain_full = get_full_domain(domain, port)
follow_actor = local_actor_url(http_prefix, nickname, domain_full)
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
page_number = max(page_number, 1)
url = follow_actor + '/following?page=' + str(page_number)
following_json = \
get_json(signing_priv_key_pem, session, url, headers, {}, debug,
project_version, http_prefix, domain, 10, True)
if not get_json_valid(following_json):
if debug:
print('DEBUG: GET following list failed for c2s to ' + url)
return 5
if debug:
print('DEBUG: c2s GET following list request success')
return following_json
def get_followers_via_server(session, nickname: str, password: str,
domain: str, port: int,
http_prefix: str, page_number: int,
debug: bool, project_version: str,
signing_priv_key_pem: str) -> {}:
"""Gets a page from the followers collection as json
"""
if not session:
print('WARN: No session for get_followers_via_server')
return 6
domain_full = get_full_domain(domain, port)
follow_actor = local_actor_url(http_prefix, nickname, domain_full)
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
page_number = max(page_number, 1)
url = follow_actor + '/followers?page=' + str(page_number)
followers_json = \
get_json(signing_priv_key_pem, session, url, headers, {}, debug,
project_version, http_prefix, domain, 10, True)
if not get_json_valid(followers_json):
if debug:
print('DEBUG: GET followers list failed for c2s to ' + url)
return 5
if debug:
print('DEBUG: c2s GET followers list request success')
return followers_json
def get_follow_requests_via_server(session,
nickname: str, password: str,
domain: str, port: int,
http_prefix: str, page_number: int,
debug: bool, project_version: str,
signing_priv_key_pem: str) -> {}:
"""Gets a page from the follow requests collection as json
"""
if not session:
print('WARN: No session for get_follow_requests_via_server')
return 6
domain_full = get_full_domain(domain, port)
follow_actor = local_actor_url(http_prefix, nickname, domain_full)
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'application/json',
'Authorization': auth_header
}
page_number = max(page_number, 1)
url = follow_actor + '/followrequests?page=' + str(page_number)
followers_json = \
get_json(signing_priv_key_pem, session, url, headers, {}, debug,
project_version, http_prefix, domain, 10, True)
if not get_json_valid(followers_json):
if debug:
print('DEBUG: GET follow requests list failed for c2s to ' + url)
return 5
if debug:
print('DEBUG: c2s GET follow requests list request success')
return followers_json
def approve_follow_request_via_server(session,
nickname: str, password: str,
domain: str, port: int,
http_prefix: str, approve_handle: int,
debug: bool, project_version: str,
signing_priv_key_pem: str) -> str:
"""Approves a follow request
This is not exactly via c2s though. It simulates pressing the Approve
button on the web interface
"""
if not session:
print('WARN: No session for approve_follow_request_via_server')
return 6
domain_full = get_full_domain(domain, port)
actor = local_actor_url(http_prefix, nickname, domain_full)
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'text/html; charset=utf-8',
'Authorization': auth_header
}
url = actor + '/followapprove=' + approve_handle
approve_html = \
get_json(signing_priv_key_pem, session, url, headers, {}, debug,
project_version, http_prefix, domain, 10, True)
if not get_json_valid(approve_html):
if debug:
print('DEBUG: GET approve follow request failed for c2s to ' + url)
return 5
if debug:
print('DEBUG: c2s GET approve follow request request success')
return approve_html
def deny_follow_request_via_server(session,
nickname: str, password: str,
domain: str, port: int,
http_prefix: str, deny_handle: int,
debug: bool, project_version: str,
signing_priv_key_pem: str) -> str:
"""Denies a follow request
This is not exactly via c2s though. It simulates pressing the Deny
button on the web interface
"""
if not session:
print('WARN: No session for deny_follow_request_via_server')
return 6
domain_full = get_full_domain(domain, port)
actor = local_actor_url(http_prefix, nickname, domain_full)
auth_header = create_basic_auth_header(nickname, password)
headers = {
'host': domain,
'Content-type': 'text/html; charset=utf-8',
'Authorization': auth_header
}
url = actor + '/followdeny=' + deny_handle
deny_html = \
get_json(signing_priv_key_pem, session, url, headers, {}, debug,
project_version, http_prefix, domain, 10, True)
if not get_json_valid(deny_html):
if debug:
print('DEBUG: GET deny follow request failed for c2s to ' + url)
return 5
if debug:
print('DEBUG: c2s GET deny follow request request success')
return deny_html
def get_followers_of_actor(base_dir: str, actor: str, debug: bool) -> {}:
"""In a shared inbox if we receive a post we know who it's from
and if it's addressed to followers then we need to get a list of those.
This returns a list of account handles which follow the given actor
"""
if debug:
print('DEBUG: getting followers of ' + actor)
recipients_dict = {}
if ':' not in actor:
return recipients_dict
nickname = get_nickname_from_actor(actor)
if not nickname:
if debug:
print('DEBUG: no nickname found in ' + actor)
return recipients_dict
domain, _ = get_domain_from_actor(actor)
if not domain:
if debug:
print('DEBUG: no domain found in ' + actor)
return recipients_dict
actor_handle = nickname + '@' + domain
if debug:
print('DEBUG: searching for handle ' + actor_handle)
# for each of the accounts
for subdir, dirs, _ in os.walk(base_dir + '/accounts'):
for account in dirs:
if '@' not in account:
continue
if account.startswith('inbox@'):
continue
if account.startswith('Actor@'):
continue
following_filename = \
os.path.join(subdir, account) + '/following.txt'
if debug:
print('DEBUG: examining follows of ' + account)
print(following_filename)
if os.path.isfile(following_filename):
# does this account follow the given actor?
if debug:
print('DEBUG: checking if ' + actor_handle +
' in ' + following_filename)
if text_in_file(actor_handle, following_filename):
if debug:
print('DEBUG: ' + account +
' follows ' + actor_handle)
recipients_dict[account] = None
break
return recipients_dict
def outbox_undo_follow(base_dir: str, message_json: {}, debug: bool) -> None:
"""When an unfollow request is received by the outbox from c2s
This removes the followed handle from the following.txt file
of the relevant account
"""
if not message_json.get('type'):
return
if not message_json['type'] == 'Undo':
return
if not has_object_string_type(message_json, debug):
return
if not message_json['object']['type'] == 'Follow':
if not message_json['object']['type'] == 'Join':
return
if not has_object_string_object(message_json, debug):
return
if not message_json['object'].get('actor'):
return
if debug:
print('DEBUG: undo follow arrived in outbox')
actor_url = get_actor_from_post(message_json['object'])
nickname_follower = get_nickname_from_actor(actor_url)
if not nickname_follower:
print('WARN: unable to find nickname in ' +
actor_url)
return
domain_follower, port_follower = get_domain_from_actor(actor_url)
if not domain_follower:
print('WARN: unable to find domain in ' + actor_url)
return
domain_follower_full = get_full_domain(domain_follower, port_follower)
nickname_following = \
get_nickname_from_actor(message_json['object']['object'])
if not nickname_following:
print('WARN: unable to find nickname in ' +
message_json['object']['object'])
return
domain_following, port_following = \
get_domain_from_actor(message_json['object']['object'])
if not domain_following:
print('WARN: unable to find domain in ' +
message_json['object']['object'])
return
domain_following_full = get_full_domain(domain_following, port_following)
group_account = \
has_group_type(base_dir, message_json['object']['object'], None)
if unfollow_account(base_dir, nickname_follower, domain_follower_full,
nickname_following, domain_following_full,
debug, group_account, 'following.txt'):
if debug:
print('DEBUG: ' + nickname_follower + ' unfollowed ' +
nickname_following + '@' + domain_following_full)
else:
if debug:
print('WARN: ' + nickname_follower + ' could not unfollow ' +
nickname_following + '@' + domain_following_full)
def follower_approval_active(base_dir: str,
nickname: str, domain: str) -> bool:
"""Returns true if the given account requires follower approval
"""
manually_approves_followers = False
actor_filename = acct_dir(base_dir, nickname, domain) + '.json'
if os.path.isfile(actor_filename):
actor_json = load_json(actor_filename)
if actor_json:
if 'manuallyApprovesFollowers' in actor_json:
manually_approves_followers = \
actor_json['manuallyApprovesFollowers']
return manually_approves_followers
def remove_follower(base_dir: str,
nickname: str, domain: str,
remove_nickname: str, remove_domain: str) -> bool:
"""Removes a follower
"""
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if not os.path.isfile(followers_filename):
return False
followers_str = ''
try:
with open(followers_filename, 'r', encoding='utf-8') as fp_foll:
followers_str = fp_foll.read()
except OSError:
print('EX: remove_follower unable to read followers ' +
followers_filename)
return False
followers_list = followers_str.split('\n')
handle = remove_nickname + '@' + remove_domain
handle = handle.lower()
new_followers_str = ''
found = False
for handle2 in followers_list:
if handle2.lower() != handle:
new_followers_str += handle2 + '\n'
else:
found = True
if not found:
return False
try:
with open(followers_filename, 'w+', encoding='utf-8') as fp_foll:
fp_foll.write(new_followers_str)
except OSError:
print('EX: remove_follower unable to write followers ' +
followers_filename)
return True
def pending_followers_timeline_json(actor: str, base_dir: str,
nickname: str, domain: str) -> {}:
"""Returns pending followers collection for an account
https://codeberg.org/fediverse/fep/src/branch/main/fep/4ccd/fep-4ccd.md
"""
result_json = {
"@context": [
"https://www.w3.org/ns/activitystreams"
],
"id": actor,
"type": "OrderedCollection",
"name": nickname + "'s Pending Followers",
"orderedItems": []
}
follow_requests_filename = \
acct_dir(base_dir, nickname, domain) + '/followrequests.txt'
if os.path.isfile(follow_requests_filename):
with open(follow_requests_filename, 'r',
encoding='utf-8') as req_file:
for follower_handle in req_file:
if len(follower_handle) == 0:
continue
follower_handle = remove_eol(follower_handle)
foll_domain, _ = get_domain_from_actor(follower_handle)
if not foll_domain:
continue
foll_nickname = get_nickname_from_actor(follower_handle)
if not foll_nickname:
continue
follow_activity_filename = \
acct_dir(base_dir, nickname, domain) + \
'/requests/' + \
foll_nickname + '@' + foll_domain + '.follow'
if not os.path.isfile(follow_activity_filename):
continue
follow_json = load_json(follow_activity_filename)
if not follow_json:
continue
result_json['orderedItems'].append(follow_json)
return result_json