epicyon/categories.py

316 lines
11 KiB
Python

__filename__ = "categories.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.5.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "RSS Feeds"
import os
import datetime
from utils import data_dir
from utils import date_utcnow
from utils import date_epoch
MAX_TAG_LENGTH = 42
INVALID_HASHTAG_CHARS = (',', ' ', '<', ';', '\\', '"', '&', '#')
def get_hashtag_category(base_dir: str, hashtag: str) -> str:
"""Returns the category for the hashtag
"""
category_filename = base_dir + '/tags/' + hashtag + '.category'
if not os.path.isfile(category_filename):
category_filename = base_dir + '/tags/' + hashtag.title() + '.category'
if not os.path.isfile(category_filename):
category_filename = \
base_dir + '/tags/' + hashtag.upper() + '.category'
if not os.path.isfile(category_filename):
return ''
category_str = None
try:
with open(category_filename, 'r', encoding='utf-8') as category_file:
category_str = category_file.read()
except OSError:
print('EX: unable to read category ' + category_filename)
except UnicodeEncodeError as ex:
print('EX: unable to read category unicode ' + category_filename +
' ' + str(ex))
if category_str:
return category_str
return ''
def load_city_hashtags(base_dir: str, translate: {}) -> None:
"""create hashtag categories for cities
"""
category_str = 'places'
if translate.get(category_str):
category_str = translate[category_str]
for _, _, files in os.walk(base_dir + '/data/cities'):
for cities_file in files:
if not cities_file.endswith('.txt'):
continue
cities_filename = base_dir + '/data/cities/' + cities_file
if not os.path.isfile(cities_filename):
continue
cities = []
try:
with open(cities_filename, 'r', encoding='utf-8') as fp_cities:
cities = fp_cities.read().split('\n')
except OSError:
print('EX: unable to load cities file ' + cities_filename)
if not cities:
continue
for hashtag in cities:
hashtag = hashtag.lower().strip()
hashtag = hashtag.replace(' & ', ' and ')
hashtag = hashtag.replace('/', '')
hashtag2 = hashtag.replace('-', '').replace(' ', '')
city_filename = base_dir + '/tags/' + hashtag2 + '.category'
if not os.path.isfile(city_filename):
try:
with open(city_filename, 'w+',
encoding='utf-8') as fp_city:
fp_city.write(category_str)
except OSError:
print('EX: unable to write city category ' +
city_filename)
if '-' in hashtag:
section = hashtag.split('-')
new_hashtag = ''
for text in section:
new_hashtag += text.lower().title()
hashtag2 = new_hashtag
city_filename = \
base_dir + '/tags/' + hashtag2 + '.category'
if not os.path.isfile(city_filename):
try:
with open(city_filename, 'w+',
encoding='utf-8') as fp_city:
fp_city.write(category_str)
except OSError:
print('EX: unable to write city category2 ' +
city_filename)
if ' ' in hashtag:
section = hashtag.split(' ')
new_hashtag = ''
for text in section:
new_hashtag += text.lower().title()
hashtag2 = new_hashtag
city_filename = \
base_dir + '/tags/' + hashtag2 + '.category'
if not os.path.isfile(city_filename):
try:
with open(city_filename, 'w+',
encoding='utf-8') as fp_city:
fp_city.write(category_str)
except OSError:
print('EX: unable to write city category3 ' +
city_filename)
def get_hashtag_categories(base_dir: str,
recent: bool, category: str) -> None:
"""Returns a dictionary containing hashtag categories
"""
hashtag_categories = {}
if recent:
curr_time = date_utcnow()
days_since_epoch = (curr_time - date_epoch()).days
recently = days_since_epoch - 1
for _, _, files in os.walk(base_dir + '/tags'):
for catfile in files:
if not catfile.endswith('.category'):
continue
category_filename = os.path.join(base_dir + '/tags', catfile)
if not os.path.isfile(category_filename):
continue
hashtag = catfile.split('.')[0]
if len(hashtag) > MAX_TAG_LENGTH:
continue
category_str = None
try:
with open(category_filename, 'r',
encoding='utf-8') as fp_category:
category_str = fp_category.read()
except OSError:
print('EX: get_hashtag_categories ' + category_filename)
except UnicodeEncodeError as ex:
print('EX: get_hashtag_categories unicode ' +
category_filename + ' ' + str(ex))
if not category_str:
continue
if category:
# only return a dictionary for a specific category
if category_str != category:
continue
if recent:
tags_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(tags_filename):
continue
mod_time_since_epoc = \
os.path.getmtime(tags_filename)
last_modified_date = \
datetime.datetime.fromtimestamp(mod_time_since_epoc,
datetime.timezone.utc)
file_days_since_epoch = \
(last_modified_date - date_epoch()).days
if file_days_since_epoch < recently:
continue
if not hashtag_categories.get(category_str):
hashtag_categories[category_str] = [hashtag]
else:
if hashtag not in hashtag_categories[category_str]:
hashtag_categories[category_str].append(hashtag)
break
return hashtag_categories
def update_hashtag_categories(base_dir: str) -> None:
"""Regenerates the list of hashtag categories
"""
category_list_filename = data_dir(base_dir) + '/categoryList.txt'
hashtag_categories = get_hashtag_categories(base_dir, False, None)
if not hashtag_categories:
if os.path.isfile(category_list_filename):
try:
os.remove(category_list_filename)
except OSError:
print('EX: update_hashtag_categories ' +
'unable to delete cached category list ' +
category_list_filename)
return
category_list = []
for category_str, _ in hashtag_categories.items():
category_list.append(category_str)
category_list.sort()
category_list_str = ''
for category_str in category_list:
category_list_str += category_str + '\n'
# save a list of available categories for quick lookup
try:
with open(category_list_filename, 'w+',
encoding='utf-8') as fp_category:
fp_category.write(category_list_str)
except OSError:
print('EX: unable to write category ' + category_list_filename)
def _valid_hashtag_category(category: str) -> bool:
"""Returns true if the category name is valid
"""
if not category:
return False
for char in INVALID_HASHTAG_CHARS:
if char in category:
return False
# too long
if len(category) > 40:
return False
return True
def set_hashtag_category(base_dir: str, hashtag: str, category: str,
update: bool, force: bool) -> bool:
"""Sets the category for the hashtag
"""
if not _valid_hashtag_category(category):
return False
if not force:
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
hashtag = hashtag.title()
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
hashtag = hashtag.upper()
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
return False
if not os.path.isdir(base_dir + '/tags'):
os.mkdir(base_dir + '/tags')
category_filename = base_dir + '/tags/' + hashtag + '.category'
if force:
# don't overwrite any existing categories
if os.path.isfile(category_filename):
return False
category_written = False
try:
with open(category_filename, 'w+', encoding='utf-8') as fp_category:
fp_category.write(category)
category_written = True
except OSError as ex:
print('EX: unable to write category ' + category_filename +
' ' + str(ex))
except UnicodeEncodeError as ex:
print('EX: unable to write category unicode ' + category_filename +
' ' + str(ex))
if category_written:
if update:
update_hashtag_categories(base_dir)
return True
return False
def guess_hashtag_category(tag_name: str, hashtag_categories: {},
min_tag_length: int) -> str:
"""Tries to guess a category for the given hashtag.
This works by trying to find the longest similar hashtag
"""
if len(tag_name) < min_tag_length:
return ''
category_matched = ''
tag_matched_len = 0
finished = False
for category_str, hashtag_list in hashtag_categories.items():
if finished:
break
for hashtag in hashtag_list:
if hashtag == tag_name:
# exact match
category_matched = category_str
finished = True
break
if len(hashtag) < min_tag_length:
# avoid matching very small strings which often
# lead to spurious categories
continue
if hashtag not in tag_name:
if tag_name not in hashtag:
continue
if not category_matched:
tag_matched_len = len(hashtag)
category_matched = category_str
else:
# match the longest tag
if len(hashtag) > tag_matched_len:
category_matched = category_str
if not category_matched:
return ''
return category_matched