__filename__ = "follow.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.5.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "ActivityPub" import os from pprint import pprint from utils import get_user_paths from utils import acct_handle_dir from utils import has_object_string_object from utils import has_object_string_type from utils import remove_domain_port from utils import has_users_path from utils import get_full_domain from utils import get_followers_list from utils import valid_nickname from utils import domain_permitted from utils import get_domain_from_actor from utils import get_nickname_from_actor from utils import get_status_number from utils import follow_person from posts import send_signed_json from posts import get_person_box from utils import load_json from utils import save_json from utils import is_account_dir from utils import acct_dir from utils import has_group_type from utils import local_actor_url from utils import text_in_file from utils import remove_eol from utils import get_actor_from_post from utils import data_dir from acceptreject import create_accept from acceptreject import create_reject from webfinger import webfinger_handle from auth import create_basic_auth_header from session import get_json from session import get_json_valid from session import post_json from followerSync import remove_followers_sync def create_initial_last_seen(base_dir: str, http_prefix: str) -> None: """Creates initial lastseen files for all follows. The lastseen files are used to generate the Zzz icons on follows/following lists on the profile screen. """ dir_str = data_dir(base_dir) for _, dirs, _ in os.walk(dir_str): for acct in dirs: if not is_account_dir(acct): continue account_dir = os.path.join(dir_str, acct) following_filename = account_dir + '/following.txt' if not os.path.isfile(following_filename): continue last_seen_dir = account_dir + '/lastseen' if not os.path.isdir(last_seen_dir): os.mkdir(last_seen_dir) following_handles = [] try: with open(following_filename, 'r', encoding='utf-8') as fp_foll: following_handles = fp_foll.readlines() except OSError: print('EX: create_initial_last_seen ' + following_filename) for handle in following_handles: if '#' in handle: continue if '@' not in handle: continue handle = remove_eol(handle) nickname = handle.split('@')[0] domain = handle.split('@')[1] if nickname.startswith('!'): nickname = nickname[1:] actor = local_actor_url(http_prefix, nickname, domain) last_seen_filename = \ last_seen_dir + '/' + actor.replace('/', '#') + '.txt' if os.path.isfile(last_seen_filename): continue try: with open(last_seen_filename, 'w+', encoding='utf-8') as fp_last: fp_last.write(str(100)) except OSError: print('EX: create_initial_last_seen 2 ' + last_seen_filename) break def _pre_approved_follower(base_dir: str, nickname: str, domain: str, approve_handle: str) -> bool: """Is the given handle an already manually approved follower? """ account_dir = acct_dir(base_dir, nickname, domain) approved_filename = account_dir + '/approved.txt' if os.path.isfile(approved_filename): if text_in_file(approve_handle, approved_filename): return True return False def _remove_from_follow_base(base_dir: str, nickname: str, domain: str, accept_or_deny_handle: str, follow_file: str, debug: bool) -> None: """Removes a handle/actor from follow requests or rejects file """ accounts_dir = acct_dir(base_dir, nickname, domain) approve_follows_filename = accounts_dir + '/' + follow_file + '.txt' if not os.path.isfile(approve_follows_filename): if debug: print('There is no ' + follow_file + ' to remove ' + nickname + '@' + domain + ' from') return accept_deny_actor = None if not text_in_file(accept_or_deny_handle, approve_follows_filename): # is this stored in the file as an actor rather than a handle? accept_deny_nickname = accept_or_deny_handle.split('@')[0] accept_deny_domain = accept_or_deny_handle.split('@')[1] # for each possible users path construct an actor and # check if it exists in the file users_paths = get_user_paths() actor_found = False for users_name in users_paths: accept_deny_actor = \ '://' + accept_deny_domain + users_name + accept_deny_nickname if text_in_file(accept_deny_actor, approve_follows_filename): actor_found = True break if not actor_found: accept_deny_actor = \ '://' + accept_deny_domain + '/' + accept_deny_nickname if text_in_file(accept_deny_actor, approve_follows_filename): actor_found = True if not actor_found: return try: with open(approve_follows_filename + '.new', 'w+', encoding='utf-8') as fp_approve_new: with open(approve_follows_filename, 'r', encoding='utf-8') as fp_approve: if not accept_deny_actor: for approve_handle in fp_approve: accept_deny_handle = accept_or_deny_handle if not approve_handle.startswith(accept_deny_handle): fp_approve_new.write(approve_handle) else: for approve_handle in fp_approve: if accept_deny_actor not in approve_handle: fp_approve_new.write(approve_handle) except OSError as ex: print('EX: _remove_from_follow_base ' + approve_follows_filename + ' ' + str(ex)) os.rename(approve_follows_filename + '.new', approve_follows_filename) def remove_from_follow_requests(base_dir: str, nickname: str, domain: str, deny_handle: str, debug: bool) -> None: """Removes a handle from follow requests """ _remove_from_follow_base(base_dir, nickname, domain, deny_handle, 'followrequests', debug) def _remove_from_follow_rejects(base_dir: str, nickname: str, domain: str, accept_handle: str, debug: bool) -> None: """Removes a handle from follow rejects """ _remove_from_follow_base(base_dir, nickname, domain, accept_handle, 'followrejects', debug) def is_following_actor(base_dir: str, nickname: str, domain: str, actor: str) -> bool: """Is the given nickname following the given actor? The actor can also be a handle: nickname@domain """ domain = remove_domain_port(domain) accounts_dir = acct_dir(base_dir, nickname, domain) if not os.path.isdir(accounts_dir): return False following_file = accounts_dir + '/following.txt' if not os.path.isfile(following_file): return False if actor.startswith('@'): actor = actor[1:] if text_in_file(actor, following_file, False): return True following_nickname = get_nickname_from_actor(actor) if not following_nickname: print('WARN: unable to find nickname in ' + actor) return False following_domain, following_port = get_domain_from_actor(actor) if not following_domain: print('WARN: unable to find domain in ' + actor) return False following_handle = \ get_full_domain(following_nickname + '@' + following_domain, following_port) if text_in_file(following_handle, following_file, False): return True return False def get_mutuals_of_person(base_dir: str, nickname: str, domain: str) -> []: """Returns the mutuals of a person i.e. accounts which they follow and which also follow back """ followers = \ get_followers_list(base_dir, nickname, domain, 'followers.txt') following = \ get_followers_list(base_dir, nickname, domain, 'following.txt') mutuals = [] for handle in following: if handle in followers: mutuals.append(handle) return mutuals def add_follower_of_person(base_dir: str, nickname: str, domain: str, follower_nickname: str, follower_domain: str, federation_list: [], debug: bool, group_account: bool) -> bool: """Adds a follower of the given person """ return follow_person(base_dir, nickname, domain, follower_nickname, follower_domain, federation_list, debug, group_account, 'followers.txt') def get_follower_domains(base_dir: str, nickname: str, domain: str) -> []: """Returns a list of domains for followers """ domain = remove_domain_port(domain) followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt' if not os.path.isfile(followers_file): return [] lines = [] try: with open(followers_file, 'r', encoding='utf-8') as fp_foll: lines = fp_foll.readlines() except OSError: print('EX: get_follower_domains ' + followers_file) domains_list = [] for handle in lines: handle = remove_eol(handle) follower_domain, _ = get_domain_from_actor(handle) if not follower_domain: continue if follower_domain not in domains_list: domains_list.append(follower_domain) return domains_list def is_follower_of_person(base_dir: str, nickname: str, domain: str, follower_nickname: str, follower_domain: str) -> bool: """is the given nickname a follower of follower_nickname? """ if not follower_domain: print('No follower_domain') return False if not follower_nickname: print('No follower_nickname for ' + follower_domain) return False domain = remove_domain_port(domain) followers_file = acct_dir(base_dir, nickname, domain) + '/followers.txt' if not os.path.isfile(followers_file): return False handle = follower_nickname + '@' + follower_domain already_following = False followers_str = '' try: with open(followers_file, 'r', encoding='utf-8') as fp_foll: followers_str = fp_foll.read() except OSError: print('EX: is_follower_of_person ' + followers_file) if handle in followers_str: already_following = True else: paths = get_user_paths() for user_path in paths: url = '://' + follower_domain + user_path + follower_nickname if url in followers_str: already_following = True break if not already_following: url = '://' + follower_domain + '/' + follower_nickname if url in followers_str: already_following = True return already_following def unfollow_account(base_dir: str, nickname: str, domain: str, follow_nickname: str, follow_domain: str, debug: bool, group_account: bool, follow_file: str) -> bool: """Removes a person to the follow list """ domain = remove_domain_port(domain) handle = nickname + '@' + domain handle_to_unfollow = follow_nickname + '@' + follow_domain if group_account: handle_to_unfollow = '!' + handle_to_unfollow dir_str = data_dir(base_dir) if not os.path.isdir(dir_str): os.mkdir(dir_str) handle_dir = acct_handle_dir(base_dir, handle) if not os.path.isdir(handle_dir): os.mkdir(handle_dir) accounts_dir = acct_dir(base_dir, nickname, domain) filename = accounts_dir + '/' + follow_file if not os.path.isfile(filename): if debug: print('DEBUG: follow file ' + filename + ' was not found') return False handle_to_unfollow_lower = handle_to_unfollow.lower() if not text_in_file(handle_to_unfollow_lower, filename, False): if debug: print('DEBUG: handle to unfollow ' + handle_to_unfollow + ' is not in ' + filename) return False lines = [] try: with open(filename, 'r', encoding='utf-8') as fp_unfoll: lines = fp_unfoll.readlines() except OSError: print('EX: unfollow_account ' + filename) if lines: try: with open(filename, 'w+', encoding='utf-8') as fp_unfoll: for line in lines: check_handle = line.strip("\n").strip("\r").lower() if check_handle not in (handle_to_unfollow_lower, '!' + handle_to_unfollow_lower): fp_unfoll.write(line) except OSError as ex: print('EX: unfollow_account unable to write ' + filename + ' ' + str(ex)) # write to an unfollowed file so that if a follow accept # later arrives then it can be ignored unfollowed_filename = accounts_dir + '/unfollowed.txt' if os.path.isfile(unfollowed_filename): if not text_in_file(handle_to_unfollow_lower, unfollowed_filename, False): try: with open(unfollowed_filename, 'a+', encoding='utf-8') as fp_unfoll: fp_unfoll.write(handle_to_unfollow + '\n') except OSError: print('EX: unfollow_account unable to append ' + unfollowed_filename) else: try: with open(unfollowed_filename, 'w+', encoding='utf-8') as fp_unfoll: fp_unfoll.write(handle_to_unfollow + '\n') except OSError: print('EX: unfollow_account unable to write ' + unfollowed_filename) return True def unfollower_of_account(base_dir: str, nickname: str, domain: str, follower_nickname: str, follower_domain: str, debug: bool, group_account: bool) -> bool: """Remove a follower of a person """ return unfollow_account(base_dir, nickname, domain, follower_nickname, follower_domain, debug, group_account, 'followers.txt') def clear_follows(base_dir: str, nickname: str, domain: str, follow_file: str) -> None: """Removes all follows """ dir_str = data_dir(base_dir) if not os.path.isdir(dir_str): os.mkdir(dir_str) accounts_dir = acct_dir(base_dir, nickname, domain) if not os.path.isdir(accounts_dir): os.mkdir(accounts_dir) filename = accounts_dir + '/' + follow_file if os.path.isfile(filename): try: os.remove(filename) except OSError: print('EX: clear_follows unable to delete ' + filename) def clear_followers(base_dir: str, nickname: str, domain: str) -> None: """Removes all followers """ clear_follows(base_dir, nickname, domain, 'followers.txt') def _get_no_of_follows(base_dir: str, nickname: str, domain: str, follow_file='following.txt') -> int: """Returns the number of follows or followers """ # only show number of followers to authenticated # account holders # if not authenticated: # return 9999 accounts_dir = acct_dir(base_dir, nickname, domain) filename = accounts_dir + '/' + follow_file if not os.path.isfile(filename): return 0 ctr = 0 lines = [] try: with open(filename, 'r', encoding='utf-8') as fp_foll: lines = fp_foll.readlines() except OSError: print('EX: _get_no_of_follows ' + filename) if lines: for line in lines: if '#' in line: continue if '@' in line and \ '.' in line and \ not line.startswith('http'): ctr += 1 elif ((line.startswith('http') or line.startswith('ipfs') or line.startswith('ipns') or line.startswith('hyper')) and has_users_path(line)): ctr += 1 return ctr def get_no_of_followers(base_dir: str, nickname: str, domain: str) -> int: """Returns the number of followers of the given person """ return _get_no_of_follows(base_dir, nickname, domain, 'followers.txt') def get_following_feed(base_dir: str, domain: str, port: int, path: str, http_prefix: str, authorized: bool, follows_per_page=12, follow_file='following') -> {}: """Returns the following and followers feeds from GET requests. This accesses the following.txt or followers.txt and builds a collection. """ # Show a small number of follows to non-authorized viewers if not authorized: follows_per_page = 6 if '/' + follow_file not in path: return None # handle page numbers header_only = True page_number = None if '?page=' in path: page_number = path.split('?page=')[1] if len(page_number) > 5: page_number = "1" if page_number == 'true' or not authorized: page_number = 1 else: try: page_number = int(page_number) except BaseException: print('EX: get_following_feed unable to convert to int ' + str(page_number)) path = path.split('?page=')[0] header_only = False if not path.endswith('/' + follow_file): return None nickname = None if path.startswith('/users/'): nickname = \ path.replace('/users/', '', 1).replace('/' + follow_file, '') if path.startswith('/@'): nickname = path.replace('/@', '', 1).replace('/' + follow_file, '') if not nickname: return None if not valid_nickname(domain, nickname): return None domain = get_full_domain(domain, port) if header_only: first_str = \ local_actor_url(http_prefix, nickname, domain) + \ '/' + follow_file + '?page=1' id_str = \ local_actor_url(http_prefix, nickname, domain) + '/' + follow_file total_str = \ _get_no_of_follows(base_dir, nickname, domain) following = { '@context': 'https://www.w3.org/ns/activitystreams', 'first': first_str, 'id': id_str, 'totalItems': total_str, 'type': 'OrderedCollection' } return following if not page_number: page_number = 1 next_page_number = int(page_number + 1) id_str = \ local_actor_url(http_prefix, nickname, domain) + \ '/' + follow_file + '?page=' + str(page_number) part_of_str = \ local_actor_url(http_prefix, nickname, domain) + '/' + follow_file following = { '@context': 'https://www.w3.org/ns/activitystreams', 'id': id_str, 'orderedItems': [], 'partOf': part_of_str, 'totalItems': 0, 'type': 'OrderedCollectionPage' } handle_domain = domain handle_domain = remove_domain_port(handle_domain) accounts_dir = acct_dir(base_dir, nickname, handle_domain) filename = accounts_dir + '/' + follow_file + '.txt' if not os.path.isfile(filename): return following curr_page = 1 page_ctr = 0 total_ctr = 0 lines = [] try: with open(filename, 'r', encoding='utf-8') as fp_foll: lines = fp_foll.readlines() except OSError: print('EX: get_following_feed ' + filename) for line in lines: if '#' not in line: if '@' in line and not line.startswith('http'): # nickname@domain page_ctr += 1 total_ctr += 1 if curr_page == page_number: line2_lower = line.lower() line2 = remove_eol(line2_lower) nick = line2.split('@')[0] dom = line2.split('@')[1] if not nick.startswith('!'): # person actor url = local_actor_url(http_prefix, nick, dom) else: # group actor url = http_prefix + '://' + dom + '/c/' + nick following['orderedItems'].append(url) elif ((line.startswith('http') or line.startswith('ipfs') or line.startswith('ipns') or line.startswith('hyper')) and has_users_path(line)): # https://domain/users/nickname page_ctr += 1 total_ctr += 1 if curr_page == page_number: append_str1 = line.lower() append_str = remove_eol(append_str1) following['orderedItems'].append(append_str) if page_ctr >= follows_per_page: page_ctr = 0 curr_page += 1 following['totalItems'] = total_ctr last_page = int(total_ctr / follows_per_page) last_page = max(last_page, 1) if next_page_number > last_page: following['next'] = \ local_actor_url(http_prefix, nickname, domain) + \ '/' + follow_file + '?page=' + str(last_page) return following def follow_approval_required(base_dir: str, nickname_to_follow: str, domain_to_follow: str, debug: bool, follow_request_handle: str) -> bool: """ Returns the policy for follower approvals """ # has this handle already been manually approved? if _pre_approved_follower(base_dir, nickname_to_follow, domain_to_follow, follow_request_handle): return False manually_approve_follows = False domain_to_follow = remove_domain_port(domain_to_follow) actor_filename = data_dir(base_dir) + '/' + \ nickname_to_follow + '@' + domain_to_follow + '.json' if os.path.isfile(actor_filename): actor = load_json(actor_filename) if actor: if 'manuallyApprovesFollowers' in actor: manually_approve_follows = actor['manuallyApprovesFollowers'] else: if debug: print(nickname_to_follow + '@' + domain_to_follow + ' automatically approves followers') else: if debug: print('DEBUG: Actor file not found: ' + actor_filename) return manually_approve_follows def no_of_follow_requests(base_dir: str, nickname_to_follow: str, domain_to_follow: str, follow_type: str) -> int: """Returns the current number of follow requests """ accounts_dir = acct_dir(base_dir, nickname_to_follow, domain_to_follow) approve_follows_filename = accounts_dir + '/followrequests.txt' if not os.path.isfile(approve_follows_filename): return 0 ctr = 0 lines = [] try: with open(approve_follows_filename, 'r', encoding='utf-8') as fp_approve: lines = fp_approve.readlines() except OSError: print('EX: no_of_follow_requests ' + approve_follows_filename) if lines: if follow_type == "onion": for file_line in lines: if '.onion' in file_line: ctr += 1 elif follow_type == "i2p": for file_line in lines: if '.i2p' in file_line: ctr += 1 else: return len(lines) return ctr def store_follow_request(base_dir: str, nickname_to_follow: str, domain_to_follow: str, port: int, nickname: str, domain: str, from_port: int, follow_json: {}, debug: bool, person_url: str, group_account: bool) -> bool: """Stores the follow request for later use """ accounts_dir = acct_dir(base_dir, nickname_to_follow, domain_to_follow) if not os.path.isdir(accounts_dir): return False domain_full = get_full_domain(domain, from_port) approve_handle = get_full_domain(nickname + '@' + domain, from_port) if group_account: approve_handle = '!' + approve_handle followers_filename = accounts_dir + '/followers.txt' if os.path.isfile(followers_filename): already_following = False followers_str = '' try: with open(followers_filename, 'r', encoding='utf-8') as fp_foll: followers_str = fp_foll.read() except OSError: print('EX: store_follow_request ' + followers_filename) if approve_handle in followers_str: already_following = True else: users_paths = get_user_paths() for possible_users_path in users_paths: url = '://' + domain_full + possible_users_path + nickname if url in followers_str: already_following = True break if not already_following: url = '://' + domain_full + '/' + nickname if url in followers_str: already_following = True if already_following: if debug: print('DEBUG: ' + nickname_to_follow + '@' + domain_to_follow + ' already following ' + approve_handle) return True # should this follow be denied? deny_follows_filename = accounts_dir + '/followrejects.txt' if os.path.isfile(deny_follows_filename): if text_in_file(approve_handle, deny_follows_filename): remove_from_follow_requests(base_dir, nickname_to_follow, domain_to_follow, approve_handle, debug) print(approve_handle + ' was already denied as a follower of ' + nickname_to_follow) return True # add to a file which contains a list of requests approve_follows_filename = accounts_dir + '/followrequests.txt' # store either nick@domain or the full person/actor url approve_handle_stored = approve_handle if '/users/' not in person_url: approve_handle_stored = person_url if group_account: approve_handle = '!' + approve_handle if os.path.isfile(approve_follows_filename): if not text_in_file(approve_handle, approve_follows_filename): try: with open(approve_follows_filename, 'a+', encoding='utf-8') as fp_approve: fp_approve.write(approve_handle_stored + '\n') except OSError: print('EX: store_follow_request 2 ' + approve_follows_filename) else: if debug: print('DEBUG: ' + approve_handle_stored + ' is already awaiting approval') else: try: with open(approve_follows_filename, 'w+', encoding='utf-8') as fp_approve: fp_approve.write(approve_handle_stored + '\n') except OSError: print('EX: store_follow_request 3 ' + approve_follows_filename) # store the follow request in its own directory # We don't rely upon the inbox because items in there could expire requests_dir = accounts_dir + '/requests' if not os.path.isdir(requests_dir): os.mkdir(requests_dir) follow_activity_filename = requests_dir + '/' + approve_handle + '.follow' return save_json(follow_json, follow_activity_filename) def followed_account_accepts(session, base_dir: str, http_prefix: str, nickname_to_follow: str, domain_to_follow: str, port: int, nickname: str, domain: str, from_port: int, person_url: str, federation_list: [], follow_json: {}, send_threads: [], post_log: [], cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, remove_follow_activity: bool, signing_priv_key_pem: str, curr_domain: str, onion_domain: str, i2p_domain: str, followers_sync_cache: {}, sites_unavailable: [], system_language: str): """The person receiving a follow request accepts the new follower and sends back an Accept activity """ accept_handle = nickname + '@' + domain # send accept back print('Sending follow Accept activity for ' + 'follow request which arrived at ' + nickname_to_follow + '@' + domain_to_follow + ' back to ' + accept_handle) accept_json = create_accept(federation_list, nickname_to_follow, domain_to_follow, port, person_url, '', http_prefix, follow_json) pprint(accept_json) print('DEBUG: sending follow Accept from ' + nickname_to_follow + '@' + domain_to_follow + ' port ' + str(port) + ' to ' + accept_handle + ' port ' + str(from_port)) client_to_server = False if remove_follow_activity: # remove the follow request json follow_activity_filename = \ acct_dir(base_dir, nickname_to_follow, domain_to_follow) + \ '/requests/' + nickname + '@' + domain + '.follow' if os.path.isfile(follow_activity_filename): try: os.remove(follow_activity_filename) except OSError: print('EX: follow Accept ' + 'followed_account_accepts unable to delete ' + follow_activity_filename) group_account = False if follow_json: if follow_json.get('actor'): actor_url = get_actor_from_post(follow_json) if has_group_type(base_dir, actor_url, person_cache): group_account = True extra_headers = {} domain_full = get_full_domain(domain, from_port) remove_followers_sync(followers_sync_cache, nickname_to_follow, domain_full) return send_signed_json(accept_json, session, base_dir, nickname_to_follow, domain_to_follow, port, nickname, domain, from_port, http_prefix, client_to_server, federation_list, send_threads, post_log, cached_webfingers, person_cache, debug, project_version, None, group_account, signing_priv_key_pem, 7856837, curr_domain, onion_domain, i2p_domain, extra_headers, sites_unavailable, system_language) def followed_account_rejects(session, session_onion, session_i2p, onion_domain: str, i2p_domain: str, base_dir: str, http_prefix: str, nickname_to_follow: str, domain_to_follow: str, port: int, nickname: str, domain: str, from_port: int, federation_list: [], send_threads: [], post_log: [], cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str, followers_sync_cache: {}, sites_unavailable: [], system_language: str): """The person receiving a follow request rejects the new follower and sends back a Reject activity """ # send reject back if debug: print('DEBUG: sending Reject activity for ' + 'follow request which arrived at ' + nickname_to_follow + '@' + domain_to_follow + ' back to ' + nickname + '@' + domain) # get the json for the original follow request follow_activity_filename = \ acct_dir(base_dir, nickname_to_follow, domain_to_follow) + \ '/requests/' + nickname + '@' + domain + '.follow' follow_json = load_json(follow_activity_filename) if not follow_json: print('No follow request json was found for ' + follow_activity_filename) return None # actor who made the follow request person_url = get_actor_from_post(follow_json) # create the reject activity reject_json = \ create_reject(federation_list, nickname_to_follow, domain_to_follow, port, person_url, '', http_prefix, follow_json) if debug: pprint(reject_json) print('DEBUG: sending follow Reject from ' + nickname_to_follow + '@' + domain_to_follow + ' port ' + str(port) + ' to ' + nickname + '@' + domain + ' port ' + str(from_port)) client_to_server = False deny_handle = get_full_domain(nickname + '@' + domain, from_port) group_account = False if has_group_type(base_dir, person_url, person_cache): group_account = True # remove from the follow requests file remove_from_follow_requests(base_dir, nickname_to_follow, domain_to_follow, deny_handle, debug) # remove the follow request json try: os.remove(follow_activity_filename) except OSError: print('EX: followed_account_rejects unable to delete ' + follow_activity_filename) curr_session = session if domain.endswith('.onion') and session_onion: curr_session = session_onion elif domain.endswith('.i2p') and session_i2p: curr_session = session_i2p extra_headers = {} domain_full = get_full_domain(domain, from_port) remove_followers_sync(followers_sync_cache, nickname_to_follow, domain_full) # send the reject activity return send_signed_json(reject_json, curr_session, base_dir, nickname_to_follow, domain_to_follow, port, nickname, domain, from_port, http_prefix, client_to_server, federation_list, send_threads, post_log, cached_webfingers, person_cache, debug, project_version, None, group_account, signing_priv_key_pem, 6393063, domain, onion_domain, i2p_domain, extra_headers, sites_unavailable, system_language) def send_follow_request(session, base_dir: str, nickname: str, domain: str, sender_domain: str, sender_port: int, http_prefix: str, follow_nickname: str, follow_domain: str, followed_actor: str, follow_port: int, follow_http_prefix: str, client_to_server: bool, federation_list: [], send_threads: [], post_log: [], cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str, curr_domain: str, onion_domain: str, i2p_domain: str, sites_unavailable: [], system_language: str) -> {}: """Gets the json object for sending a follow request """ if not signing_priv_key_pem: print('WARN: follow request without signing key') if not domain_permitted(follow_domain, federation_list): print('You are not permitted to follow the domain ' + follow_domain) return None full_domain = get_full_domain(sender_domain, sender_port) follow_actor = local_actor_url(http_prefix, nickname, full_domain) request_domain = get_full_domain(follow_domain, follow_port) status_number, _ = get_status_number() group_account = False if follow_nickname: followed_id = followed_actor follow_handle = follow_nickname + '@' + request_domain group_account = has_group_type(base_dir, followed_actor, person_cache) if group_account: follow_handle = '!' + follow_handle print('Follow request being sent to group account') else: if debug: print('DEBUG: send_follow_request - assuming single user instance') followed_id = follow_http_prefix + '://' + request_domain single_user_nickname = 'dev' follow_handle = single_user_nickname + '@' + request_domain # remove follow handle from unfollowed.txt unfollowed_filename = \ acct_dir(base_dir, nickname, domain) + '/unfollowed.txt' if os.path.isfile(unfollowed_filename): if text_in_file(follow_handle, unfollowed_filename): unfollowed_file = None try: with open(unfollowed_filename, 'r', encoding='utf-8') as fp_unfoll: unfollowed_file = fp_unfoll.read() except OSError: print('EX: send_follow_request ' + unfollowed_filename) if unfollowed_file: unfollowed_file = \ unfollowed_file.replace(follow_handle + '\n', '') try: with open(unfollowed_filename, 'w+', encoding='utf-8') as fp_unfoll: fp_unfoll.write(unfollowed_file) except OSError: print('EX: send_follow_request unable to write ' + unfollowed_filename) new_follow_json = { '@context': 'https://www.w3.org/ns/activitystreams', 'id': follow_actor + '/statuses/' + str(status_number), 'type': 'Follow', 'actor': follow_actor, 'object': followed_id } if group_account: new_follow_json['to'] = [followed_id] print('Follow request: ' + str(new_follow_json)) if follow_approval_required(base_dir, nickname, domain, debug, follow_handle): # Remove any follow requests rejected for the account being followed. # It's assumed that if you are following someone then you are # ok with them following back. If this isn't the case then a rejected # follow request will block them again. _remove_from_follow_rejects(base_dir, nickname, domain, follow_handle, debug) extra_headers = {} send_signed_json(new_follow_json, session, base_dir, nickname, sender_domain, sender_port, follow_nickname, follow_domain, follow_port, http_prefix, client_to_server, federation_list, send_threads, post_log, cached_webfingers, person_cache, debug, project_version, None, group_account, signing_priv_key_pem, 8234389, curr_domain, onion_domain, i2p_domain, extra_headers, sites_unavailable, system_language) return new_follow_json def send_follow_request_via_server(base_dir: str, session, from_nickname: str, password: str, from_domain: str, from_port: int, follow_nickname: str, follow_domain: str, follow_port: int, http_prefix: str, cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str, system_language: str) -> {}: """Creates a follow request via c2s """ if not session: print('WARN: No session for send_follow_request_via_server') return 6 from_domain_full = get_full_domain(from_domain, from_port) follow_domain_full = get_full_domain(follow_domain, follow_port) follow_actor = \ local_actor_url(http_prefix, from_nickname, from_domain_full) followed_id = \ http_prefix + '://' + follow_domain_full + '/@' + follow_nickname status_number, _ = get_status_number() new_follow_json = { '@context': 'https://www.w3.org/ns/activitystreams', 'id': follow_actor + '/statuses/' + str(status_number), 'type': 'Follow', 'actor': follow_actor, 'object': followed_id } handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname # lookup the inbox for the To handle wf_request = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, from_domain, project_version, debug, False, signing_priv_key_pem) if not wf_request: if debug: print('DEBUG: follow request webfinger failed for ' + handle) return 1 if not isinstance(wf_request, dict): print('WARN: follow request Webfinger for ' + handle + ' did not return a dict. ' + str(wf_request)) return 1 post_to_box = 'outbox' # get the actor inbox for the To handle origin_domain = from_domain (inbox_url, _, _, from_person_id, _, _, _, _) = get_person_box(signing_priv_key_pem, origin_domain, base_dir, session, wf_request, person_cache, project_version, http_prefix, from_nickname, from_domain, post_to_box, 52025, system_language) if not inbox_url: if debug: print('DEBUG: follow request no ' + post_to_box + ' was found for ' + handle) return 3 if not from_person_id: if debug: print('DEBUG: follow request no actor was found for ' + handle) return 4 auth_header = create_basic_auth_header(from_nickname, password) headers = { 'host': from_domain, 'Content-type': 'application/json', 'Authorization': auth_header } post_result = \ post_json(http_prefix, from_domain_full, session, new_follow_json, [], inbox_url, headers, 3, True) if not post_result: if debug: print('DEBUG: POST follow request failed for c2s to ' + inbox_url) return 5 if debug: print('DEBUG: c2s POST follow request success') return new_follow_json def send_unfollow_request_via_server(base_dir: str, session, from_nickname: str, password: str, from_domain: str, from_port: int, follow_nickname: str, follow_domain: str, follow_port: int, http_prefix: str, cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str, system_language: str) -> {}: """Creates a unfollow request via c2s """ if not session: print('WARN: No session for send_unfollow_request_via_server') return 6 from_domain_full = get_full_domain(from_domain, from_port) follow_domain_full = get_full_domain(follow_domain, follow_port) follow_actor = \ local_actor_url(http_prefix, from_nickname, from_domain_full) followed_id = \ http_prefix + '://' + follow_domain_full + '/@' + follow_nickname status_number, _ = get_status_number() unfollow_json = { '@context': 'https://www.w3.org/ns/activitystreams', 'id': follow_actor + '/statuses/' + str(status_number) + '/undo', 'type': 'Undo', 'actor': follow_actor, 'object': { 'id': follow_actor + '/statuses/' + str(status_number), 'type': 'Follow', 'actor': follow_actor, 'object': followed_id } } handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname # lookup the inbox for the To handle wf_request = \ webfinger_handle(session, handle, http_prefix, cached_webfingers, from_domain, project_version, debug, False, signing_priv_key_pem) if not wf_request: if debug: print('DEBUG: unfollow webfinger failed for ' + handle) return 1 if not isinstance(wf_request, dict): print('WARN: unfollow webfinger for ' + handle + ' did not return a dict. ' + str(wf_request)) return 1 post_to_box = 'outbox' # get the actor inbox for the To handle origin_domain = from_domain (inbox_url, _, _, from_person_id, _, _, _, _) = get_person_box(signing_priv_key_pem, origin_domain, base_dir, session, wf_request, person_cache, project_version, http_prefix, from_nickname, from_domain, post_to_box, 76536, system_language) if not inbox_url: if debug: print('DEBUG: unfollow no ' + post_to_box + ' was found for ' + handle) return 3 if not from_person_id: if debug: print('DEBUG: unfollow no actor was found for ' + handle) return 4 auth_header = create_basic_auth_header(from_nickname, password) headers = { 'host': from_domain, 'Content-type': 'application/json', 'Authorization': auth_header } post_result = \ post_json(http_prefix, from_domain_full, session, unfollow_json, [], inbox_url, headers, 3, True) if not post_result: if debug: print('DEBUG: POST unfollow failed for c2s to ' + inbox_url) return 5 if debug: print('DEBUG: c2s POST unfollow success') return unfollow_json def get_following_via_server(session, nickname: str, password: str, domain: str, port: int, http_prefix: str, page_number: int, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Gets a page from the following collection as json """ if not session: print('WARN: No session for get_following_via_server') return 6 domain_full = get_full_domain(domain, port) follow_actor = local_actor_url(http_prefix, nickname, domain_full) auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'application/json', 'Authorization': auth_header } page_number = max(page_number, 1) url = follow_actor + '/following?page=' + str(page_number) following_json = \ get_json(signing_priv_key_pem, session, url, headers, {}, debug, project_version, http_prefix, domain, 10, True) if not get_json_valid(following_json): if debug: print('DEBUG: GET following list failed for c2s to ' + url) return 5 if debug: print('DEBUG: c2s GET following list request success') return following_json def get_followers_via_server(session, nickname: str, password: str, domain: str, port: int, http_prefix: str, page_number: int, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Gets a page from the followers collection as json """ if not session: print('WARN: No session for get_followers_via_server') return 6 domain_full = get_full_domain(domain, port) follow_actor = local_actor_url(http_prefix, nickname, domain_full) auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'application/json', 'Authorization': auth_header } page_number = max(page_number, 1) url = follow_actor + '/followers?page=' + str(page_number) followers_json = \ get_json(signing_priv_key_pem, session, url, headers, {}, debug, project_version, http_prefix, domain, 10, True) if not get_json_valid(followers_json): if debug: print('DEBUG: GET followers list failed for c2s to ' + url) return 5 if debug: print('DEBUG: c2s GET followers list request success') return followers_json def get_follow_requests_via_server(session, nickname: str, password: str, domain: str, port: int, http_prefix: str, page_number: int, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Gets a page from the follow requests collection as json """ if not session: print('WARN: No session for get_follow_requests_via_server') return 6 domain_full = get_full_domain(domain, port) follow_actor = local_actor_url(http_prefix, nickname, domain_full) auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'application/json', 'Authorization': auth_header } page_number = max(page_number, 1) url = follow_actor + '/followrequests?page=' + str(page_number) followers_json = \ get_json(signing_priv_key_pem, session, url, headers, {}, debug, project_version, http_prefix, domain, 10, True) if not get_json_valid(followers_json): if debug: print('DEBUG: GET follow requests list failed for c2s to ' + url) return 5 if debug: print('DEBUG: c2s GET follow requests list request success') return followers_json def approve_follow_request_via_server(session, nickname: str, password: str, domain: str, port: int, http_prefix: str, approve_handle: int, debug: bool, project_version: str, signing_priv_key_pem: str) -> str: """Approves a follow request This is not exactly via c2s though. It simulates pressing the Approve button on the web interface """ if not session: print('WARN: No session for approve_follow_request_via_server') return 6 domain_full = get_full_domain(domain, port) actor = local_actor_url(http_prefix, nickname, domain_full) auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'text/html; charset=utf-8', 'Authorization': auth_header } url = actor + '/followapprove=' + approve_handle approve_html = \ get_json(signing_priv_key_pem, session, url, headers, {}, debug, project_version, http_prefix, domain, 10, True) if not get_json_valid(approve_html): if debug: print('DEBUG: GET approve follow request failed for c2s to ' + url) return 5 if debug: print('DEBUG: c2s GET approve follow request request success') return approve_html def deny_follow_request_via_server(session, nickname: str, password: str, domain: str, port: int, http_prefix: str, deny_handle: int, debug: bool, project_version: str, signing_priv_key_pem: str) -> str: """Denies a follow request This is not exactly via c2s though. It simulates pressing the Deny button on the web interface """ if not session: print('WARN: No session for deny_follow_request_via_server') return 6 domain_full = get_full_domain(domain, port) actor = local_actor_url(http_prefix, nickname, domain_full) auth_header = create_basic_auth_header(nickname, password) headers = { 'host': domain, 'Content-type': 'text/html; charset=utf-8', 'Authorization': auth_header } url = actor + '/followdeny=' + deny_handle deny_html = \ get_json(signing_priv_key_pem, session, url, headers, {}, debug, project_version, http_prefix, domain, 10, True) if not get_json_valid(deny_html): if debug: print('DEBUG: GET deny follow request failed for c2s to ' + url) return 5 if debug: print('DEBUG: c2s GET deny follow request request success') return deny_html def get_followers_of_actor(base_dir: str, actor: str, debug: bool) -> {}: """In a shared inbox if we receive a post we know who it's from and if it's addressed to followers then we need to get a list of those. This returns a list of account handles which follow the given actor """ if debug: print('DEBUG: getting followers of ' + actor) recipients_dict = {} if ':' not in actor: return recipients_dict nickname = get_nickname_from_actor(actor) if not nickname: if debug: print('DEBUG: no nickname found in ' + actor) return recipients_dict domain, _ = get_domain_from_actor(actor) if not domain: if debug: print('DEBUG: no domain found in ' + actor) return recipients_dict actor_handle = nickname + '@' + domain if debug: print('DEBUG: searching for handle ' + actor_handle) # for each of the accounts dir_str = data_dir(base_dir) for subdir, dirs, _ in os.walk(dir_str): for account in dirs: if '@' not in account: continue if account.startswith('inbox@'): continue if account.startswith('Actor@'): continue following_filename = \ os.path.join(subdir, account) + '/following.txt' if debug: print('DEBUG: examining follows of ' + account) print(following_filename) if os.path.isfile(following_filename): # does this account follow the given actor? if debug: print('DEBUG: checking if ' + actor_handle + ' in ' + following_filename) if text_in_file(actor_handle, following_filename): if debug: print('DEBUG: ' + account + ' follows ' + actor_handle) recipients_dict[account] = None break return recipients_dict def outbox_undo_follow(base_dir: str, message_json: {}, debug: bool) -> None: """When an unfollow request is received by the outbox from c2s This removes the followed handle from the following.txt file of the relevant account """ if not message_json.get('type'): return if not message_json['type'] == 'Undo': return if not has_object_string_type(message_json, debug): return if not message_json['object']['type'] == 'Follow': if not message_json['object']['type'] == 'Join': return if not has_object_string_object(message_json, debug): return if not message_json['object'].get('actor'): return if debug: print('DEBUG: undo follow arrived in outbox') actor_url = get_actor_from_post(message_json['object']) nickname_follower = get_nickname_from_actor(actor_url) if not nickname_follower: print('WARN: unable to find nickname in ' + actor_url) return domain_follower, port_follower = get_domain_from_actor(actor_url) if not domain_follower: print('WARN: unable to find domain in ' + actor_url) return domain_follower_full = get_full_domain(domain_follower, port_follower) nickname_following = \ get_nickname_from_actor(message_json['object']['object']) if not nickname_following: print('WARN: unable to find nickname in ' + message_json['object']['object']) return domain_following, port_following = \ get_domain_from_actor(message_json['object']['object']) if not domain_following: print('WARN: unable to find domain in ' + message_json['object']['object']) return domain_following_full = get_full_domain(domain_following, port_following) group_account = \ has_group_type(base_dir, message_json['object']['object'], None) if unfollow_account(base_dir, nickname_follower, domain_follower_full, nickname_following, domain_following_full, debug, group_account, 'following.txt'): if debug: print('DEBUG: ' + nickname_follower + ' unfollowed ' + nickname_following + '@' + domain_following_full) else: if debug: print('WARN: ' + nickname_follower + ' could not unfollow ' + nickname_following + '@' + domain_following_full) def follower_approval_active(base_dir: str, nickname: str, domain: str) -> bool: """Returns true if the given account requires follower approval """ manually_approves_followers = False actor_filename = acct_dir(base_dir, nickname, domain) + '.json' if os.path.isfile(actor_filename): actor_json = load_json(actor_filename) if actor_json: if 'manuallyApprovesFollowers' in actor_json: manually_approves_followers = \ actor_json['manuallyApprovesFollowers'] return manually_approves_followers def remove_follower(base_dir: str, nickname: str, domain: str, remove_nickname: str, remove_domain: str) -> bool: """Removes a follower """ followers_filename = \ acct_dir(base_dir, nickname, domain) + '/followers.txt' if not os.path.isfile(followers_filename): return False followers_str = '' try: with open(followers_filename, 'r', encoding='utf-8') as fp_foll: followers_str = fp_foll.read() except OSError: print('EX: remove_follower unable to read followers ' + followers_filename) return False followers_list = followers_str.split('\n') handle = remove_nickname + '@' + remove_domain handle = handle.lower() new_followers_str = '' found = False for handle2 in followers_list: if handle2.lower() != handle: new_followers_str += handle2 + '\n' else: found = True if not found: return False try: with open(followers_filename, 'w+', encoding='utf-8') as fp_foll: fp_foll.write(new_followers_str) except OSError: print('EX: remove_follower unable to write followers ' + followers_filename) return True def pending_followers_timeline_json(actor: str, base_dir: str, nickname: str, domain: str) -> {}: """Returns pending followers collection for an account https://codeberg.org/fediverse/fep/src/branch/main/fep/4ccd/fep-4ccd.md """ result_json = { "@context": [ "https://www.w3.org/ns/activitystreams" ], "id": actor, "type": "OrderedCollection", "name": nickname + "'s Pending Followers", "orderedItems": [] } follow_requests_filename = \ acct_dir(base_dir, nickname, domain) + '/followrequests.txt' if os.path.isfile(follow_requests_filename): try: with open(follow_requests_filename, 'r', encoding='utf-8') as fp_req: for follower_handle in fp_req: if len(follower_handle) == 0: continue follower_handle = remove_eol(follower_handle) foll_domain, _ = get_domain_from_actor(follower_handle) if not foll_domain: continue foll_nickname = get_nickname_from_actor(follower_handle) if not foll_nickname: continue follow_activity_filename = \ acct_dir(base_dir, nickname, domain) + \ '/requests/' + \ foll_nickname + '@' + foll_domain + '.follow' if not os.path.isfile(follow_activity_filename): continue follow_json = load_json(follow_activity_filename) if not follow_json: continue result_json['orderedItems'].append(follow_json) except OSError as exc: print('EX: unable to read follow requests ' + follow_requests_filename + ' ' + str(exc)) return result_json