__filename__ = "categories.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.5.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "RSS Feeds" import os import datetime from utils import date_utcnow from utils import date_epoch MAX_TAG_LENGTH = 42 INVALID_HASHTAG_CHARS = (',', ' ', '<', ';', '\\', '"', '&', '#') def get_hashtag_category(base_dir: str, hashtag: str) -> str: """Returns the category for the hashtag """ category_filename = base_dir + '/tags/' + hashtag + '.category' if not os.path.isfile(category_filename): category_filename = base_dir + '/tags/' + hashtag.title() + '.category' if not os.path.isfile(category_filename): category_filename = \ base_dir + '/tags/' + hashtag.upper() + '.category' if not os.path.isfile(category_filename): return '' category_str = None try: with open(category_filename, 'r', encoding='utf-8') as category_file: category_str = category_file.read() except OSError: print('EX: unable to read category ' + category_filename) except UnicodeEncodeError as ex: print('EX: unable to read category unicode ' + category_filename + ' ' + str(ex)) if category_str: return category_str return '' def load_city_hashtags(base_dir: str, translate: {}) -> None: """create hashtag categories for cities """ category_str = 'places' if translate.get(category_str): category_str = translate[category_str] for _, _, files in os.walk(base_dir + '/data/cities'): for cities_file in files: if not cities_file.endswith('.txt'): continue cities_filename = base_dir + '/data/cities/' + cities_file if not os.path.isfile(cities_filename): continue cities = [] try: with open(cities_filename, 'r', encoding='utf-8') as fp_cities: cities = fp_cities.read().split('\n') except OSError: print('EX: unable to load cities file ' + cities_filename) if not cities: continue for hashtag in cities: hashtag = hashtag.lower().strip() hashtag = hashtag.replace(' & ', ' and ') hashtag = hashtag.replace('/', '') hashtag2 = hashtag.replace('-', '').replace(' ', '') city_filename = base_dir + '/tags/' + hashtag2 + '.category' if not os.path.isfile(city_filename): try: with open(city_filename, 'w+', encoding='utf-8') as fp_city: fp_city.write(category_str) except OSError: print('EX: unable to write city category ' + city_filename) if '-' in hashtag: section = hashtag.split('-') new_hashtag = '' for text in section: new_hashtag += text.lower().title() hashtag2 = new_hashtag city_filename = \ base_dir + '/tags/' + hashtag2 + '.category' if not os.path.isfile(city_filename): try: with open(city_filename, 'w+', encoding='utf-8') as fp_city: fp_city.write(category_str) except OSError: print('EX: unable to write city category2 ' + city_filename) if ' ' in hashtag: section = hashtag.split(' ') new_hashtag = '' for text in section: new_hashtag += text.lower().title() hashtag2 = new_hashtag city_filename = \ base_dir + '/tags/' + hashtag2 + '.category' if not os.path.isfile(city_filename): try: with open(city_filename, 'w+', encoding='utf-8') as fp_city: fp_city.write(category_str) except OSError: print('EX: unable to write city category3 ' + city_filename) def get_hashtag_categories(base_dir: str, recent: bool, category: str) -> None: """Returns a dictionary containing hashtag categories """ hashtag_categories = {} if recent: curr_time = date_utcnow() days_since_epoch = (curr_time - date_epoch()).days recently = days_since_epoch - 1 for _, _, files in os.walk(base_dir + '/tags'): for catfile in files: if not catfile.endswith('.category'): continue category_filename = os.path.join(base_dir + '/tags', catfile) if not os.path.isfile(category_filename): continue hashtag = catfile.split('.')[0] if len(hashtag) > MAX_TAG_LENGTH: continue category_str = None try: with open(category_filename, 'r', encoding='utf-8') as fp_category: category_str = fp_category.read() except OSError: print('EX: get_hashtag_categories ' + category_filename) except UnicodeEncodeError as ex: print('EX: get_hashtag_categories unicode ' + category_filename + ' ' + str(ex)) if not category_str: continue if category: # only return a dictionary for a specific category if category_str != category: continue if recent: tags_filename = base_dir + '/tags/' + hashtag + '.txt' if not os.path.isfile(tags_filename): continue mod_time_since_epoc = \ os.path.getmtime(tags_filename) last_modified_date = \ datetime.datetime.fromtimestamp(mod_time_since_epoc, datetime.timezone.utc) file_days_since_epoch = \ (last_modified_date - date_epoch()).days if file_days_since_epoch < recently: continue if not hashtag_categories.get(category_str): hashtag_categories[category_str] = [hashtag] else: if hashtag not in hashtag_categories[category_str]: hashtag_categories[category_str].append(hashtag) break return hashtag_categories def update_hashtag_categories(base_dir: str) -> None: """Regenerates the list of hashtag categories """ category_list_filename = base_dir + '/accounts/categoryList.txt' hashtag_categories = get_hashtag_categories(base_dir, False, None) if not hashtag_categories: if os.path.isfile(category_list_filename): try: os.remove(category_list_filename) except OSError: print('EX: update_hashtag_categories ' + 'unable to delete cached category list ' + category_list_filename) return category_list = [] for category_str, _ in hashtag_categories.items(): category_list.append(category_str) category_list.sort() category_list_str = '' for category_str in category_list: category_list_str += category_str + '\n' # save a list of available categories for quick lookup try: with open(category_list_filename, 'w+', encoding='utf-8') as fp_category: fp_category.write(category_list_str) except OSError: print('EX: unable to write category ' + category_list_filename) def _valid_hashtag_category(category: str) -> bool: """Returns true if the category name is valid """ if not category: return False for char in INVALID_HASHTAG_CHARS: if char in category: return False # too long if len(category) > 40: return False return True def set_hashtag_category(base_dir: str, hashtag: str, category: str, update: bool, force: bool = False) -> bool: """Sets the category for the hashtag """ if not _valid_hashtag_category(category): return False if not force: hashtag_filename = base_dir + '/tags/' + hashtag + '.txt' if not os.path.isfile(hashtag_filename): hashtag = hashtag.title() hashtag_filename = base_dir + '/tags/' + hashtag + '.txt' if not os.path.isfile(hashtag_filename): hashtag = hashtag.upper() hashtag_filename = base_dir + '/tags/' + hashtag + '.txt' if not os.path.isfile(hashtag_filename): return False if not os.path.isdir(base_dir + '/tags'): os.mkdir(base_dir + '/tags') category_filename = base_dir + '/tags/' + hashtag + '.category' if force: # don't overwrite any existing categories if os.path.isfile(category_filename): return False category_written = False try: with open(category_filename, 'w+', encoding='utf-8') as fp_category: fp_category.write(category) category_written = True except OSError as ex: print('EX: unable to write category ' + category_filename + ' ' + str(ex)) except UnicodeEncodeError as ex: print('EX: unable to write category unicode ' + category_filename + ' ' + str(ex)) if category_written: if update: update_hashtag_categories(base_dir) return True return False def guess_hashtag_category(tag_name: str, hashtag_categories: {}, min_tag_length: int) -> str: """Tries to guess a category for the given hashtag. This works by trying to find the longest similar hashtag """ if len(tag_name) < min_tag_length: return '' category_matched = '' tag_matched_len = 0 finished = False for category_str, hashtag_list in hashtag_categories.items(): if finished: break for hashtag in hashtag_list: if hashtag == tag_name: # exact match category_matched = category_str finished = True break if len(hashtag) < min_tag_length: # avoid matching very small strings which often # lead to spurious categories continue if hashtag not in tag_name: if tag_name not in hashtag: continue if not category_matched: tag_matched_len = len(hashtag) category_matched = category_str else: # match the longest tag if len(hashtag) > tag_matched_len: category_matched = category_str if not category_matched: return '' return category_matched