__filename__ = "like.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.3.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "ActivityPub" import os from pprint import pprint from utils import has_object_string from utils import has_object_string_object from utils import has_object_stringType from utils import remove_domain_port from utils import has_object_dict from utils import has_users_path from utils import get_full_domain from utils import remove_id_ending from utils import url_permitted from utils import get_nickname_from_actor from utils import get_domain_from_actor from utils import locate_post from utils import undo_likes_collection_entry from utils import has_group_type from utils import local_actor_url from utils import load_json from utils import save_json from utils import remove_post_from_cache from utils import get_cached_post_filename from posts import send_signed_json from session import post_json from webfinger import webfinger_handle from auth import create_basic_auth_header from posts import get_person_box def no_of_likes(post_json_object: {}) -> int: """Returns the number of likes ona given post """ obj = post_json_object if has_object_dict(post_json_object): obj = post_json_object['object'] if not obj.get('likes'): return 0 if not isinstance(obj['likes'], dict): return 0 if not obj['likes'].get('items'): obj['likes']['items'] = [] obj['likes']['totalItems'] = 0 return len(obj['likes']['items']) def liked_by_person(post_json_object: {}, nickname: str, domain: str) -> bool: """Returns True if the given post is liked by the given person """ if no_of_likes(post_json_object) == 0: return False actor_match = domain + '/users/' + nickname obj = post_json_object if has_object_dict(post_json_object): obj = post_json_object['object'] for item in obj['likes']['items']: if item['actor'].endswith(actor_match): return True return False def _create_like(recent_posts_cache: {}, session, base_dir: str, federation_list: [], nickname: str, domain: str, port: int, cc_list: [], http_prefix: str, object_url: str, actor_liked: str, client_to_server: bool, send_threads: [], postLog: [], person_cache: {}, cached_webfingers: {}, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Creates a like actor is the person doing the liking 'to' might be a specific person (actor) whose post was liked object is typically the url of the message which was liked """ if not url_permitted(object_url, federation_list): return None full_domain = get_full_domain(domain, port) new_like_json = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Like', 'actor': local_actor_url(http_prefix, nickname, full_domain), 'object': object_url } if cc_list: if len(cc_list) > 0: new_like_json['cc'] = cc_list # Extract the domain and nickname from a statuses link liked_post_nickname = None liked_post_domain = None liked_post_port = None group_account = False if actor_liked: liked_post_nickname = get_nickname_from_actor(actor_liked) liked_post_domain, liked_post_port = get_domain_from_actor(actor_liked) group_account = has_group_type(base_dir, actor_liked, person_cache) else: if has_users_path(object_url): liked_post_nickname = get_nickname_from_actor(object_url) liked_post_domain, liked_post_port = \ get_domain_from_actor(object_url) if '/' + str(liked_post_nickname) + '/' in object_url: actor_liked = \ object_url.split('/' + liked_post_nickname + '/')[0] + \ '/' + liked_post_nickname group_account = \ has_group_type(base_dir, actor_liked, person_cache) if liked_post_nickname: post_filename = locate_post(base_dir, nickname, domain, object_url) if not post_filename: print('DEBUG: like base_dir: ' + base_dir) print('DEBUG: like nickname: ' + nickname) print('DEBUG: like domain: ' + domain) print('DEBUG: like object_url: ' + object_url) return None update_likes_collection(recent_posts_cache, base_dir, post_filename, object_url, new_like_json['actor'], nickname, domain, debug, None) send_signed_json(new_like_json, session, base_dir, nickname, domain, port, liked_post_nickname, liked_post_domain, liked_post_port, 'https://www.w3.org/ns/activitystreams#Public', http_prefix, True, client_to_server, federation_list, send_threads, postLog, cached_webfingers, person_cache, debug, project_version, None, group_account, signing_priv_key_pem, 7367374) return new_like_json def like_post(recent_posts_cache: {}, session, base_dir: str, federation_list: [], nickname: str, domain: str, port: int, http_prefix: str, like_nickname: str, like_domain: str, like_port: int, cc_list: [], like_status_number: int, client_to_server: bool, send_threads: [], postLog: [], person_cache: {}, cached_webfingers: {}, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Likes a given status post. This is only used by unit tests """ like_domain = get_full_domain(like_domain, like_port) actor_liked = local_actor_url(http_prefix, like_nickname, like_domain) object_url = actor_liked + '/statuses/' + str(like_status_number) return _create_like(recent_posts_cache, session, base_dir, federation_list, nickname, domain, port, cc_list, http_prefix, object_url, actor_liked, client_to_server, send_threads, postLog, person_cache, cached_webfingers, debug, project_version, signing_priv_key_pem) def send_like_via_server(base_dir: str, session, from_nickname: str, password: str, from_domain: str, from_port: int, http_prefix: str, like_url: str, cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Creates a like via c2s """ if not session: print('WARN: No session for send_like_via_server') return 6 from_domain_full = get_full_domain(from_domain, from_port) actor = local_actor_url(http_prefix, from_nickname, from_domain_full) new_like_json = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Like', 'actor': actor, 'object': like_url } handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname # lookup the inbox for the To handle wf_request = webfinger_handle(session, handle, http_prefix, cached_webfingers, from_domain, project_version, debug, False, signing_priv_key_pem) if not wf_request: if debug: print('DEBUG: like webfinger failed for ' + handle) return 1 if not isinstance(wf_request, dict): print('WARN: like webfinger for ' + handle + ' did not return a dict. ' + str(wf_request)) return 1 post_to_box = 'outbox' # get the actor inbox for the To handle origin_domain = from_domain (inbox_url, _, _, from_person_id, _, _, _, _) = get_person_box(signing_priv_key_pem, origin_domain, base_dir, session, wf_request, person_cache, project_version, http_prefix, from_nickname, from_domain, post_to_box, 72873) if not inbox_url: if debug: print('DEBUG: like no ' + post_to_box + ' was found for ' + handle) return 3 if not from_person_id: if debug: print('DEBUG: like no actor was found for ' + handle) return 4 auth_header = create_basic_auth_header(from_nickname, password) headers = { 'host': from_domain, 'Content-type': 'application/json', 'Authorization': auth_header } post_result = post_json(http_prefix, from_domain_full, session, new_like_json, [], inbox_url, headers, 3, True) if not post_result: if debug: print('WARN: POST like failed for c2s to ' + inbox_url) return 5 if debug: print('DEBUG: c2s POST like success') return new_like_json def send_undo_like_via_server(base_dir: str, session, from_nickname: str, password: str, from_domain: str, from_port: int, http_prefix: str, like_url: str, cached_webfingers: {}, person_cache: {}, debug: bool, project_version: str, signing_priv_key_pem: str) -> {}: """Undo a like via c2s """ if not session: print('WARN: No session for send_undo_like_via_server') return 6 from_domain_full = get_full_domain(from_domain, from_port) actor = local_actor_url(http_prefix, from_nickname, from_domain_full) new_undo_like_json = { "@context": "https://www.w3.org/ns/activitystreams", 'type': 'Undo', 'actor': actor, 'object': { 'type': 'Like', 'actor': actor, 'object': like_url } } handle = http_prefix + '://' + from_domain_full + '/@' + from_nickname # lookup the inbox for the To handle wf_request = webfinger_handle(session, handle, http_prefix, cached_webfingers, from_domain, project_version, debug, False, signing_priv_key_pem) if not wf_request: if debug: print('DEBUG: unlike webfinger failed for ' + handle) return 1 if not isinstance(wf_request, dict): if debug: print('WARN: unlike webfinger for ' + handle + ' did not return a dict. ' + str(wf_request)) return 1 post_to_box = 'outbox' # get the actor inbox for the To handle origin_domain = from_domain (inbox_url, _, _, from_person_id, _, _, _, _) = get_person_box(signing_priv_key_pem, origin_domain, base_dir, session, wf_request, person_cache, project_version, http_prefix, from_nickname, from_domain, post_to_box, 72625) if not inbox_url: if debug: print('DEBUG: unlike no ' + post_to_box + ' was found for ' + handle) return 3 if not from_person_id: if debug: print('DEBUG: unlike no actor was found for ' + handle) return 4 auth_header = create_basic_auth_header(from_nickname, password) headers = { 'host': from_domain, 'Content-type': 'application/json', 'Authorization': auth_header } post_result = post_json(http_prefix, from_domain_full, session, new_undo_like_json, [], inbox_url, headers, 3, True) if not post_result: if debug: print('WARN: POST unlike failed for c2s to ' + inbox_url) return 5 if debug: print('DEBUG: c2s POST unlike success') return new_undo_like_json def outbox_like(recent_posts_cache: {}, base_dir: str, http_prefix: str, nickname: str, domain: str, port: int, message_json: {}, debug: bool) -> None: """ When a like request is received by the outbox from c2s """ if not message_json.get('type'): if debug: print('DEBUG: like - no type') return if not message_json['type'] == 'Like': if debug: print('DEBUG: not a like') return if not has_object_string(message_json, debug): return if debug: print('DEBUG: c2s like request arrived in outbox') message_id = remove_id_ending(message_json['object']) domain = remove_domain_port(domain) post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s like post not found in inbox or outbox') print(message_id) return True update_likes_collection(recent_posts_cache, base_dir, post_filename, message_id, message_json['actor'], nickname, domain, debug, None) if debug: print('DEBUG: post liked via c2s - ' + post_filename) def outbox_undo_like(recent_posts_cache: {}, base_dir: str, http_prefix: str, nickname: str, domain: str, port: int, message_json: {}, debug: bool) -> None: """ When an undo like request is received by the outbox from c2s """ if not message_json.get('type'): return if not message_json['type'] == 'Undo': return if not has_object_stringType(message_json, debug): return if not message_json['object']['type'] == 'Like': if debug: print('DEBUG: not a undo like') return if not has_object_string_object(message_json, debug): return if debug: print('DEBUG: c2s undo like request arrived in outbox') message_id = remove_id_ending(message_json['object']['object']) domain = remove_domain_port(domain) post_filename = locate_post(base_dir, nickname, domain, message_id) if not post_filename: if debug: print('DEBUG: c2s undo like post not found in inbox or outbox') print(message_id) return True undo_likes_collection_entry(recent_posts_cache, base_dir, post_filename, message_id, message_json['actor'], domain, debug, None) if debug: print('DEBUG: post undo liked via c2s - ' + post_filename) def update_likes_collection(recent_posts_cache: {}, base_dir: str, post_filename: str, object_url: str, actor: str, nickname: str, domain: str, debug: bool, post_json_object: {}) -> None: """Updates the likes collection within a post """ if not post_json_object: post_json_object = load_json(post_filename) if not post_json_object: return # remove any cached version of this post so that the # like icon is changed remove_post_from_cache(post_json_object, recent_posts_cache) cached_post_filename = \ get_cached_post_filename(base_dir, nickname, domain, post_json_object) if cached_post_filename: if os.path.isfile(cached_post_filename): try: os.remove(cached_post_filename) except OSError: print('EX: update_likes_collection unable to delete ' + cached_post_filename) obj = post_json_object if has_object_dict(post_json_object): obj = post_json_object['object'] if not object_url.endswith('/likes'): object_url = object_url + '/likes' if not obj.get('likes'): if debug: print('DEBUG: Adding initial like to ' + object_url) likes_json = { "@context": "https://www.w3.org/ns/activitystreams", 'id': object_url, 'type': 'Collection', "totalItems": 1, 'items': [{ 'type': 'Like', 'actor': actor }] } obj['likes'] = likes_json else: if not obj['likes'].get('items'): obj['likes']['items'] = [] for like_item in obj['likes']['items']: if like_item.get('actor'): if like_item['actor'] == actor: # already liked return new_like = { 'type': 'Like', 'actor': actor } obj['likes']['items'].append(new_like) it_len = len(obj['likes']['items']) obj['likes']['totalItems'] = it_len if debug: print('DEBUG: saving post with likes added') pprint(post_json_object) save_json(post_json_object, post_filename)