Function for checking text in a file

merge-requests/30/head
Bob Mottram 2022-06-10 10:24:11 +01:00
parent 3e96093c12
commit 9bab4c1684
3 changed files with 42 additions and 47 deletions

View File

@ -15,6 +15,7 @@ import secrets
import datetime
from utils import is_system_account
from utils import has_users_path
from utils import text_in_file
def _hash_password(password: str) -> str:
@ -177,7 +178,7 @@ def store_basic_credentials(base_dir: str,
password_file = base_dir + '/accounts/passwords'
store_str = nickname + ':' + _hash_password(password)
if os.path.isfile(password_file):
if nickname + ':' in open(password_file, encoding='utf-8').read():
if text_in_file(nickname + ':', password_file):
try:
with open(password_file, 'r', encoding='utf-8') as fin:
with open(password_file + '.new', 'w+',

View File

@ -33,6 +33,7 @@ from utils import get_nickname_from_actor
from utils import acct_dir
from utils import local_actor_url
from utils import has_actor
from utils import text_in_file
from conversation import mute_conversation
from conversation import unmute_conversation
@ -46,8 +47,7 @@ def add_global_block(base_dir: str,
# is the handle already blocked?
block_handle = block_nickname + '@' + block_domain
if os.path.isfile(blocking_filename):
if block_handle in open(blocking_filename,
encoding='utf-8').read():
if text_in_file(block_handle, blocking_filename):
return False
# block an account handle or domain
try:
@ -85,16 +85,14 @@ def add_block(base_dir: str, nickname: str, domain: str,
blocking_filename = acct_dir(base_dir, nickname, domain) + '/blocking.txt'
block_handle = block_nickname + '@' + block_domain
if os.path.isfile(blocking_filename):
if block_handle + '\n' in open(blocking_filename,
encoding='utf-8').read():
if text_in_file(block_handle + '\n', blocking_filename):
return False
# if we are following then unfollow
following_filename = \
acct_dir(base_dir, nickname, domain) + '/following.txt'
if os.path.isfile(following_filename):
if block_handle + '\n' in open(following_filename,
encoding='utf-8').read():
if text_in_file(block_handle + '\n', following_filename):
following_str = ''
try:
with open(following_filename, 'r',
@ -119,8 +117,7 @@ def add_block(base_dir: str, nickname: str, domain: str,
followers_filename = \
acct_dir(base_dir, nickname, domain) + '/followers.txt'
if os.path.isfile(followers_filename):
if block_handle + '\n' in open(followers_filename,
encoding='utf-8').read():
if text_in_file(block_handle + '\n', followers_filename):
followers_str = ''
try:
with open(followers_filename, 'r',
@ -159,8 +156,7 @@ def remove_global_block(base_dir: str,
if not unblock_nickname.startswith('#'):
unblock_handle = unblock_nickname + '@' + unblock_domain
if os.path.isfile(unblocking_filename):
if unblock_handle in open(unblocking_filename,
encoding='utf-8').read():
if text_in_file(unblock_handle, unblocking_filename):
try:
with open(unblocking_filename, 'r',
encoding='utf-8') as fp_unblock:
@ -187,8 +183,7 @@ def remove_global_block(base_dir: str,
else:
unblock_hashtag = unblock_nickname
if os.path.isfile(unblocking_filename):
if unblock_hashtag + '\n' in open(unblocking_filename,
encoding='utf-8').read():
if text_in_file(unblock_hashtag + '\n', unblocking_filename):
try:
with open(unblocking_filename, 'r',
encoding='utf-8') as fp_unblock:
@ -224,8 +219,7 @@ def remove_block(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/blocking.txt'
unblock_handle = unblock_nickname + '@' + unblock_domain
if os.path.isfile(unblocking_filename):
if unblock_handle in open(unblocking_filename,
encoding='utf-8').read():
if text_in_file(unblock_handle, unblocking_filename):
try:
with open(unblocking_filename, 'r',
encoding='utf-8') as fp_unblock:
@ -262,8 +256,7 @@ def is_blocked_hashtag(base_dir: str, hashtag: str) -> bool:
hashtag = hashtag.strip('\n').strip('\r')
if not hashtag.startswith('#'):
hashtag = '#' + hashtag
if hashtag + '\n' in open(global_blocking_filename,
encoding='utf-8').read():
if text_in_file(hashtag + '\n', global_blocking_filename):
return True
return False
@ -373,11 +366,10 @@ def is_blocked_domain(base_dir: str, domain: str,
allow_filename = base_dir + '/accounts/allowedinstances.txt'
# instance allow list
if not short_domain:
if domain not in open(allow_filename, encoding='utf-8').read():
if not text_in_file(domain, allow_filename):
return True
else:
if short_domain not in open(allow_filename,
encoding='utf-8').read():
if not text_in_file(short_domain, allow_filename):
return True
return False
@ -407,44 +399,37 @@ def is_blocked(base_dir: str, nickname: str, domain: str,
else:
global_blocks_filename = base_dir + '/accounts/blocking.txt'
if os.path.isfile(global_blocks_filename):
if '*@' + block_domain in open(global_blocks_filename,
encoding='utf-8').read():
if text_in_file('*@' + block_domain, global_blocks_filename):
return True
if block_handle:
block_str = block_handle + '\n'
if block_str in open(global_blocks_filename,
encoding='utf-8').read():
if text_in_file(block_str, global_blocks_filename):
return True
else:
# instance allow list
allow_filename = base_dir + '/accounts/allowedinstances.txt'
short_domain = _get_short_domain(block_domain)
if not short_domain:
if block_domain + '\n' not in open(allow_filename,
encoding='utf-8').read():
if not text_in_file(block_domain + '\n', allow_filename):
return True
else:
if short_domain + '\n' not in open(allow_filename,
encoding='utf-8').read():
if not text_in_file(short_domain + '\n', allow_filename):
return True
# account level allow list
account_dir = acct_dir(base_dir, nickname, domain)
allow_filename = account_dir + '/allowedinstances.txt'
if os.path.isfile(allow_filename):
if block_domain + '\n' not in open(allow_filename,
encoding='utf-8').read():
if not text_in_file(block_domain + '\n', allow_filename):
return True
# account level block list
blocking_filename = account_dir + '/blocking.txt'
if os.path.isfile(blocking_filename):
if '*@' + block_domain + '\n' in open(blocking_filename,
encoding='utf-8').read():
if text_in_file('*@' + block_domain + '\n', blocking_filename):
return True
if block_handle:
if block_handle + '\n' in open(blocking_filename,
encoding='utf-8').read():
if text_in_file(block_handle + '\n', blocking_filename):
return True
return False

View File

@ -40,6 +40,20 @@ INVALID_CHARACTERS = (
)
def text_in_file(text: str, filename: str) -> bool:
"""is the given text in the given file?
"""
try:
with open(filename, 'r', encoding='utf-8') as file:
content = file.read()
if content:
if text in content:
return True
except OSError:
pass
return False
def local_actor_url(http_prefix: str, nickname: str, domain_full: str) -> str:
"""Returns the url for an actor on this instance
"""
@ -1320,8 +1334,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
# was this person previously unfollowed?
unfollowed_filename = base_dir + '/accounts/' + handle + '/unfollowed.txt'
if os.path.isfile(unfollowed_filename):
if handle_to_follow in open(unfollowed_filename,
encoding='utf-8').read():
if text_in_file(handle_to_follow, unfollowed_filename):
# remove them from the unfollowed file
new_lines = ''
with open(unfollowed_filename, 'r',
@ -1341,7 +1354,7 @@ def follow_person(base_dir: str, nickname: str, domain: str,
handle_to_follow = '!' + handle_to_follow
filename = base_dir + '/accounts/' + handle + '/' + follow_file
if os.path.isfile(filename):
if handle_to_follow in open(filename, encoding='utf-8').read():
if text_in_file(handle_to_follow, filename):
if debug:
print('DEBUG: follow already exists')
return True
@ -1648,7 +1661,7 @@ def remove_moderation_post_from_index(base_dir: str, post_url: str,
if not os.path.isfile(moderation_index_file):
return
post_id = remove_id_ending(post_url)
if post_id in open(moderation_index_file, encoding='utf-8').read():
if text_in_file(post_id, moderation_index_file):
with open(moderation_index_file, 'r',
encoding='utf-8') as file1:
lines = file1.readlines()
@ -1679,7 +1692,7 @@ def _is_reply_to_blog_post(base_dir: str, nickname: str, domain: str,
return False
post_id = remove_id_ending(post_json_object['object']['inReplyTo'])
post_id = post_id.replace('/', '#')
if post_id in open(blogs_index_filename, encoding='utf-8').read():
if text_in_file(post_id, blogs_index_filename):
return True
return False
@ -1720,8 +1733,7 @@ def _is_bookmarked(base_dir: str, nickname: str, domain: str,
acct_dir(base_dir, nickname, domain) + '/bookmarks.index'
if os.path.isfile(bookmarks_index_filename):
bookmark_index = post_filename.split('/')[-1] + '\n'
if bookmark_index in open(bookmarks_index_filename,
encoding='utf-8').read():
if text_in_file(bookmark_index, bookmarks_index_filename):
return True
return False
@ -3024,8 +3036,7 @@ def dm_allowed_from_domain(base_dir: str,
acct_dir(base_dir, nickname, domain) + '/dmAllowedInstances.txt'
if not os.path.isfile(dm_allowed_instances_file):
return False
if sending_actor_domain + '\n' in open(dm_allowed_instances_file,
encoding='utf-8').read():
if text_in_file(sending_actor_domain + '\n', dm_allowed_instances_file):
return True
return False
@ -3339,8 +3350,7 @@ def is_group_actor(base_dir: str, actor: str, person_cache: {},
if debug:
print('Cached actor file not found ' + cached_actor_filename)
return False
if '"type": "Group"' in open(cached_actor_filename,
encoding='utf-8').read():
if text_in_file('"type": "Group"', cached_actor_filename):
if debug:
print('Group type found in ' + cached_actor_filename)
return True
@ -3353,8 +3363,7 @@ def is_group_account(base_dir: str, nickname: str, domain: str) -> bool:
account_filename = acct_dir(base_dir, nickname, domain) + '.json'
if not os.path.isfile(account_filename):
return False
if '"type": "Group"' in open(account_filename,
encoding='utf-8').read():
if text_in_file('"type": "Group"', account_filename):
return True
return False