2020-04-03 10:11:54 +00:00
|
|
|
__filename__ = "filters.py"
|
|
|
|
__author__ = "Bob Mottram"
|
|
|
|
__license__ = "AGPL3+"
|
2021-01-26 10:07:42 +00:00
|
|
|
__version__ = "1.2.0"
|
2020-04-03 10:11:54 +00:00
|
|
|
__maintainer__ = "Bob Mottram"
|
2021-09-10 16:14:50 +00:00
|
|
|
__email__ = "bob@libreserver.org"
|
2020-04-03 10:11:54 +00:00
|
|
|
__status__ = "Production"
|
2021-06-26 11:16:41 +00:00
|
|
|
__module_group__ = "Moderation"
|
2019-07-14 20:50:27 +00:00
|
|
|
|
|
|
|
import os
|
2021-07-13 21:59:53 +00:00
|
|
|
from utils import acctDir
|
2019-07-14 20:50:27 +00:00
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
|
|
|
|
def addFilter(baseDir: str, nickname: str, domain: str, words: str) -> bool:
|
2019-07-14 20:50:27 +00:00
|
|
|
"""Adds a filter for particular words within the content of a incoming posts
|
|
|
|
"""
|
2021-07-13 21:59:53 +00:00
|
|
|
filtersFilename = acctDir(baseDir, nickname, domain) + '/filters.txt'
|
2019-07-14 20:50:27 +00:00
|
|
|
if os.path.isfile(filtersFilename):
|
|
|
|
if words in open(filtersFilename).read():
|
|
|
|
return False
|
2021-11-26 14:35:26 +00:00
|
|
|
try:
|
|
|
|
with open(filtersFilename, 'a+') as filtersFile:
|
|
|
|
filtersFile.write(words + '\n')
|
|
|
|
except OSError:
|
|
|
|
print('EX: unable to append filters ' + filtersFilename)
|
2019-07-14 20:50:27 +00:00
|
|
|
return True
|
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
|
2020-12-19 11:29:55 +00:00
|
|
|
def addGlobalFilter(baseDir: str, words: str) -> bool:
|
|
|
|
"""Adds a global filter for particular words within
|
|
|
|
the content of a incoming posts
|
|
|
|
"""
|
2020-12-19 13:10:32 +00:00
|
|
|
if not words:
|
|
|
|
return False
|
|
|
|
if len(words) < 2:
|
|
|
|
return False
|
2020-12-19 11:29:55 +00:00
|
|
|
filtersFilename = baseDir + '/accounts/filters.txt'
|
|
|
|
if os.path.isfile(filtersFilename):
|
|
|
|
if words in open(filtersFilename).read():
|
|
|
|
return False
|
2021-11-26 14:35:26 +00:00
|
|
|
try:
|
|
|
|
with open(filtersFilename, 'a+') as filtersFile:
|
|
|
|
filtersFile.write(words + '\n')
|
|
|
|
except OSError:
|
|
|
|
print('EX: unable to append filters ' + filtersFilename)
|
2020-12-19 11:29:55 +00:00
|
|
|
return True
|
|
|
|
|
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
def removeFilter(baseDir: str, nickname: str, domain: str,
|
2019-07-14 20:50:27 +00:00
|
|
|
words: str) -> bool:
|
|
|
|
"""Removes a word filter
|
|
|
|
"""
|
2021-07-13 21:59:53 +00:00
|
|
|
filtersFilename = acctDir(baseDir, nickname, domain) + '/filters.txt'
|
2021-06-07 08:56:08 +00:00
|
|
|
if not os.path.isfile(filtersFilename):
|
|
|
|
return False
|
|
|
|
if words not in open(filtersFilename).read():
|
|
|
|
return False
|
|
|
|
newFiltersFilename = filtersFilename + '.new'
|
2021-11-26 14:35:26 +00:00
|
|
|
try:
|
|
|
|
with open(filtersFilename, 'r') as fp:
|
|
|
|
with open(newFiltersFilename, 'w+') as fpnew:
|
|
|
|
for line in fp:
|
|
|
|
line = line.replace('\n', '')
|
|
|
|
if line != words:
|
|
|
|
fpnew.write(line + '\n')
|
|
|
|
except OSError as e:
|
|
|
|
print('EX: unable to remove filter ' + filtersFilename + ' ' + str(e))
|
2021-06-07 08:56:08 +00:00
|
|
|
if os.path.isfile(newFiltersFilename):
|
|
|
|
os.rename(newFiltersFilename, filtersFilename)
|
|
|
|
return True
|
2019-07-14 20:50:27 +00:00
|
|
|
return False
|
2020-02-05 14:57:10 +00:00
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
|
2020-12-19 11:29:55 +00:00
|
|
|
def removeGlobalFilter(baseDir: str, words: str) -> bool:
|
|
|
|
"""Removes a global word filter
|
|
|
|
"""
|
|
|
|
filtersFilename = baseDir + '/accounts/filters.txt'
|
2021-06-07 08:56:08 +00:00
|
|
|
if not os.path.isfile(filtersFilename):
|
|
|
|
return False
|
|
|
|
if words not in open(filtersFilename).read():
|
|
|
|
return False
|
|
|
|
newFiltersFilename = filtersFilename + '.new'
|
2021-11-26 14:35:26 +00:00
|
|
|
try:
|
|
|
|
with open(filtersFilename, 'r') as fp:
|
|
|
|
with open(newFiltersFilename, 'w+') as fpnew:
|
|
|
|
for line in fp:
|
|
|
|
line = line.replace('\n', '')
|
|
|
|
if line != words:
|
|
|
|
fpnew.write(line + '\n')
|
|
|
|
except OSError as e:
|
|
|
|
print('EX: unable to remove global filter ' +
|
|
|
|
filtersFilename + ' ' + str(e))
|
2021-06-07 08:56:08 +00:00
|
|
|
if os.path.isfile(newFiltersFilename):
|
|
|
|
os.rename(newFiltersFilename, filtersFilename)
|
|
|
|
return True
|
2020-12-19 11:29:55 +00:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
2020-12-22 18:06:23 +00:00
|
|
|
def _isTwitterPost(content: str) -> bool:
|
2020-02-05 14:57:10 +00:00
|
|
|
"""Returns true if the given post content is a retweet or twitter crosspost
|
|
|
|
"""
|
2020-02-05 16:56:45 +00:00
|
|
|
if '/twitter.' in content or '@twitter.' in content:
|
2020-02-05 14:57:10 +00:00
|
|
|
return True
|
2020-02-05 16:56:45 +00:00
|
|
|
elif '>RT <' in content:
|
2020-02-05 14:57:10 +00:00
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
|
2020-12-22 18:06:23 +00:00
|
|
|
def _isFilteredBase(filename: str, content: str) -> bool:
|
2020-12-19 13:21:06 +00:00
|
|
|
"""Uses the given file containing filtered words to check
|
|
|
|
the given content
|
|
|
|
"""
|
2020-12-19 13:23:30 +00:00
|
|
|
if not os.path.isfile(filename):
|
|
|
|
return False
|
|
|
|
|
2021-11-26 14:35:26 +00:00
|
|
|
try:
|
|
|
|
with open(filename, 'r') as fp:
|
|
|
|
for line in fp:
|
|
|
|
filterStr = line.replace('\n', '').replace('\r', '')
|
|
|
|
if not filterStr:
|
|
|
|
continue
|
|
|
|
if len(filterStr) < 2:
|
|
|
|
continue
|
|
|
|
if '+' not in filterStr:
|
|
|
|
if filterStr in content:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
filterWords = filterStr.replace('"', '').split('+')
|
|
|
|
for word in filterWords:
|
|
|
|
if word not in content:
|
|
|
|
return False
|
2020-12-19 13:21:06 +00:00
|
|
|
return True
|
2021-11-26 14:35:26 +00:00
|
|
|
except OSError as e:
|
|
|
|
print('EX: _isFilteredBase ' + filename + ' ' + str(e))
|
2020-12-19 13:21:06 +00:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
2021-07-28 21:28:41 +00:00
|
|
|
def isFilteredGlobally(baseDir: str, content: str) -> bool:
|
|
|
|
"""Is the given content globally filtered?
|
|
|
|
"""
|
|
|
|
globalFiltersFilename = baseDir + '/accounts/filters.txt'
|
|
|
|
if _isFilteredBase(globalFiltersFilename, content):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
2020-04-03 10:11:54 +00:00
|
|
|
def isFiltered(baseDir: str, nickname: str, domain: str, content: str) -> bool:
|
2019-07-14 20:50:27 +00:00
|
|
|
"""Should the given content be filtered out?
|
|
|
|
This is a simple type of filter which just matches words, not a regex
|
|
|
|
You can add individual words or use word1+word2 to indicate that two
|
|
|
|
words must be present although not necessarily adjacent
|
|
|
|
"""
|
2021-07-28 21:28:41 +00:00
|
|
|
if isFilteredGlobally(baseDir, content):
|
2020-12-19 13:21:06 +00:00
|
|
|
return True
|
2020-12-19 11:41:40 +00:00
|
|
|
|
|
|
|
if not nickname or not domain:
|
|
|
|
return False
|
|
|
|
|
2020-02-05 14:57:10 +00:00
|
|
|
# optionally remove retweets
|
2021-07-13 21:59:53 +00:00
|
|
|
removeTwitter = acctDir(baseDir, nickname, domain) + '/.removeTwitter'
|
2020-02-05 14:57:10 +00:00
|
|
|
if os.path.isfile(removeTwitter):
|
2020-12-22 18:06:23 +00:00
|
|
|
if _isTwitterPost(content):
|
2020-02-05 14:57:10 +00:00
|
|
|
return True
|
|
|
|
|
2021-07-13 21:59:53 +00:00
|
|
|
accountFiltersFilename = \
|
|
|
|
acctDir(baseDir, nickname, domain) + '/filters.txt'
|
2020-12-22 18:06:23 +00:00
|
|
|
return _isFilteredBase(accountFiltersFilename, content)
|