mirror of https://gitlab.com/bashrc2/epicyon
Move announce function
parent
388ecf2b2f
commit
ef38b4a33f
78
announce.py
78
announce.py
|
@ -10,9 +10,13 @@ __status__ = "Production"
|
|||
__module_group__ = "ActivityPub"
|
||||
|
||||
import os
|
||||
from pprint import pprint
|
||||
from flags import has_group_type
|
||||
from flags import url_permitted
|
||||
from status import get_status_number
|
||||
from utils import remove_post_from_cache
|
||||
from utils import get_cached_post_filename
|
||||
from utils import load_json
|
||||
from utils import text_in_file
|
||||
from utils import get_user_paths
|
||||
from utils import has_object_string_object
|
||||
|
@ -26,7 +30,6 @@ from utils import get_nickname_from_actor
|
|||
from utils import get_domain_from_actor
|
||||
from utils import locate_post
|
||||
from utils import save_json
|
||||
from utils import undo_announce_collection_entry
|
||||
from utils import update_announce_collection
|
||||
from utils import local_actor_url
|
||||
from utils import replace_users_with_at
|
||||
|
@ -555,3 +558,76 @@ def mark_announce_as_seen(base_dir: str, nickname: str, domain: str,
|
|||
fp_seen.write(announce_id)
|
||||
except OSError:
|
||||
print('EX: mark_announce_as_seen unable to write ' + seen_filename)
|
||||
|
||||
|
||||
def undo_announce_collection_entry(recent_posts_cache: {},
|
||||
base_dir: str, post_filename: str,
|
||||
actor: str, domain: str,
|
||||
debug: bool) -> None:
|
||||
"""Undoes an announce for a particular actor by removing it from
|
||||
the "shares" collection within a post. Note that the "shares"
|
||||
collection has no relation to shared items in shares.py. It's
|
||||
shares of posts, not shares of physical objects.
|
||||
"""
|
||||
post_json_object = load_json(post_filename)
|
||||
if not post_json_object:
|
||||
return
|
||||
# remove any cached version of this announce so that the announce
|
||||
# icon is changed
|
||||
nickname = get_nickname_from_actor(actor)
|
||||
if not nickname:
|
||||
return
|
||||
cached_post_filename = \
|
||||
get_cached_post_filename(base_dir, nickname, domain,
|
||||
post_json_object)
|
||||
if cached_post_filename:
|
||||
if os.path.isfile(cached_post_filename):
|
||||
try:
|
||||
os.remove(cached_post_filename)
|
||||
except OSError:
|
||||
if debug:
|
||||
print('EX: undo_announce_collection_entry ' +
|
||||
'unable to delete cached post ' +
|
||||
str(cached_post_filename))
|
||||
remove_post_from_cache(post_json_object, recent_posts_cache)
|
||||
|
||||
if not post_json_object.get('type'):
|
||||
return
|
||||
if post_json_object['type'] != 'Create':
|
||||
return
|
||||
if not has_object_dict(post_json_object):
|
||||
if debug:
|
||||
pprint(post_json_object)
|
||||
print('DEBUG: post has no object')
|
||||
return
|
||||
if not post_json_object['object'].get('shares'):
|
||||
return
|
||||
if not post_json_object['object']['shares'].get('items'):
|
||||
return
|
||||
total_items = 0
|
||||
if post_json_object['object']['shares'].get('totalItems'):
|
||||
total_items = post_json_object['object']['shares']['totalItems']
|
||||
item_found = False
|
||||
for announce_item in post_json_object['object']['shares']['items']:
|
||||
if not announce_item.get('actor'):
|
||||
continue
|
||||
if announce_item['actor'] != actor:
|
||||
continue
|
||||
if debug:
|
||||
print('DEBUG: Announce was removed for ' + actor)
|
||||
an_it = announce_item
|
||||
post_json_object['object']['shares']['items'].remove(an_it)
|
||||
item_found = True
|
||||
break
|
||||
if not item_found:
|
||||
return
|
||||
if total_items == 1:
|
||||
if debug:
|
||||
print('DEBUG: shares (announcements) ' +
|
||||
'was removed from post')
|
||||
del post_json_object['object']['shares']
|
||||
else:
|
||||
itlen = len(post_json_object['object']['shares']['items'])
|
||||
post_json_object['object']['shares']['totalItems'] = itlen
|
||||
|
||||
save_json(post_json_object, post_filename)
|
||||
|
|
|
@ -10,7 +10,7 @@ __module_group__ = "Timeline"
|
|||
import os
|
||||
from flags import has_group_type
|
||||
from timeFunctions import get_account_timezone
|
||||
from utils import undo_announce_collection_entry
|
||||
from announce import undo_announce_collection_entry
|
||||
from utils import has_object_dict
|
||||
from utils import remove_domain_port
|
||||
from utils import remove_id_ending
|
||||
|
|
73
utils.py
73
utils.py
|
@ -2607,79 +2607,6 @@ def get_file_case_insensitive(path: str) -> str:
|
|||
return None
|
||||
|
||||
|
||||
def undo_announce_collection_entry(recent_posts_cache: {},
|
||||
base_dir: str, post_filename: str,
|
||||
actor: str, domain: str,
|
||||
debug: bool) -> None:
|
||||
"""Undoes an announce for a particular actor by removing it from
|
||||
the "shares" collection within a post. Note that the "shares"
|
||||
collection has no relation to shared items in shares.py. It's
|
||||
shares of posts, not shares of physical objects.
|
||||
"""
|
||||
post_json_object = load_json(post_filename)
|
||||
if not post_json_object:
|
||||
return
|
||||
# remove any cached version of this announce so that the announce
|
||||
# icon is changed
|
||||
nickname = get_nickname_from_actor(actor)
|
||||
if not nickname:
|
||||
return
|
||||
cached_post_filename = \
|
||||
get_cached_post_filename(base_dir, nickname, domain,
|
||||
post_json_object)
|
||||
if cached_post_filename:
|
||||
if os.path.isfile(cached_post_filename):
|
||||
try:
|
||||
os.remove(cached_post_filename)
|
||||
except OSError:
|
||||
if debug:
|
||||
print('EX: undo_announce_collection_entry ' +
|
||||
'unable to delete cached post ' +
|
||||
str(cached_post_filename))
|
||||
remove_post_from_cache(post_json_object, recent_posts_cache)
|
||||
|
||||
if not post_json_object.get('type'):
|
||||
return
|
||||
if post_json_object['type'] != 'Create':
|
||||
return
|
||||
if not has_object_dict(post_json_object):
|
||||
if debug:
|
||||
pprint(post_json_object)
|
||||
print('DEBUG: post has no object')
|
||||
return
|
||||
if not post_json_object['object'].get('shares'):
|
||||
return
|
||||
if not post_json_object['object']['shares'].get('items'):
|
||||
return
|
||||
total_items = 0
|
||||
if post_json_object['object']['shares'].get('totalItems'):
|
||||
total_items = post_json_object['object']['shares']['totalItems']
|
||||
item_found = False
|
||||
for announce_item in post_json_object['object']['shares']['items']:
|
||||
if not announce_item.get('actor'):
|
||||
continue
|
||||
if announce_item['actor'] != actor:
|
||||
continue
|
||||
if debug:
|
||||
print('DEBUG: Announce was removed for ' + actor)
|
||||
an_it = announce_item
|
||||
post_json_object['object']['shares']['items'].remove(an_it)
|
||||
item_found = True
|
||||
break
|
||||
if not item_found:
|
||||
return
|
||||
if total_items == 1:
|
||||
if debug:
|
||||
print('DEBUG: shares (announcements) ' +
|
||||
'was removed from post')
|
||||
del post_json_object['object']['shares']
|
||||
else:
|
||||
itlen = len(post_json_object['object']['shares']['items'])
|
||||
post_json_object['object']['shares']['totalItems'] = itlen
|
||||
|
||||
save_json(post_json_object, post_filename)
|
||||
|
||||
|
||||
def update_announce_collection(recent_posts_cache: {},
|
||||
base_dir: str, post_filename: str,
|
||||
actor: str, nickname: str, domain: str,
|
||||
|
|
Loading…
Reference in New Issue