mirror of https://gitlab.com/bashrc2/epicyon
Move hashtag categories functions to their own module
parent
679c06b20e
commit
96e813181b
|
@ -0,0 +1,184 @@
|
|||
__filename__ = "categories.py"
|
||||
__author__ = "Bob Mottram"
|
||||
__license__ = "AGPL3+"
|
||||
__version__ = "1.1.0"
|
||||
__maintainer__ = "Bob Mottram"
|
||||
__email__ = "bob@freedombone.net"
|
||||
__status__ = "Production"
|
||||
|
||||
import os
|
||||
import datetime
|
||||
|
||||
|
||||
def getHashtagCategory(baseDir: str, hashtag: str) -> str:
|
||||
"""Returns the category for the hashtag
|
||||
"""
|
||||
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
categoryFilename = baseDir + '/tags/' + hashtag.title() + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
categoryFilename = \
|
||||
baseDir + '/tags/' + hashtag.upper() + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
return ''
|
||||
|
||||
with open(categoryFilename, 'r') as fp:
|
||||
categoryStr = fp.read()
|
||||
if categoryStr:
|
||||
return categoryStr
|
||||
return ''
|
||||
|
||||
|
||||
def getHashtagCategories(baseDir: str, recent=False, category=None) -> None:
|
||||
"""Returns a dictionary containing hashtag categories
|
||||
"""
|
||||
hashtagCategories = {}
|
||||
|
||||
if recent:
|
||||
currTime = datetime.datetime.utcnow()
|
||||
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
|
||||
recently = daysSinceEpoch - 1
|
||||
|
||||
for subdir, dirs, files in os.walk(baseDir + '/tags'):
|
||||
for f in files:
|
||||
if not f.endswith('.category'):
|
||||
continue
|
||||
categoryFilename = os.path.join(baseDir + '/tags', f)
|
||||
if not os.path.isfile(categoryFilename):
|
||||
continue
|
||||
hashtag = f.split('.')[0]
|
||||
with open(categoryFilename, 'r') as fp:
|
||||
categoryStr = fp.read()
|
||||
|
||||
if not categoryStr:
|
||||
continue
|
||||
|
||||
if category:
|
||||
# only return a dictionary for a specific category
|
||||
if categoryStr != category:
|
||||
continue
|
||||
|
||||
if recent:
|
||||
tagsFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(tagsFilename):
|
||||
continue
|
||||
modTimesinceEpoc = \
|
||||
os.path.getmtime(tagsFilename)
|
||||
lastModifiedDate = \
|
||||
datetime.datetime.fromtimestamp(modTimesinceEpoc)
|
||||
fileDaysSinceEpoch = \
|
||||
(lastModifiedDate -
|
||||
datetime.datetime(1970, 1, 1)).days
|
||||
if fileDaysSinceEpoch < recently:
|
||||
continue
|
||||
|
||||
if not hashtagCategories.get(categoryStr):
|
||||
hashtagCategories[categoryStr] = [hashtag]
|
||||
else:
|
||||
if hashtag not in hashtagCategories[categoryStr]:
|
||||
hashtagCategories[categoryStr].append(hashtag)
|
||||
break
|
||||
return hashtagCategories
|
||||
|
||||
|
||||
def _updateHashtagCategories(baseDir: str) -> None:
|
||||
"""Regenerates the list of hashtag categories
|
||||
"""
|
||||
categoryListFilename = baseDir + '/accounts/categoryList.txt'
|
||||
hashtagCategories = getHashtagCategories(baseDir)
|
||||
if not hashtagCategories:
|
||||
if os.path.isfile(categoryListFilename):
|
||||
os.remove(categoryListFilename)
|
||||
return
|
||||
|
||||
categoryList = []
|
||||
for categoryStr, hashtagList in hashtagCategories.items():
|
||||
categoryList.append(categoryStr)
|
||||
categoryList.sort()
|
||||
|
||||
categoryListStr = ''
|
||||
for categoryStr in categoryList:
|
||||
categoryListStr += categoryStr + '\n'
|
||||
|
||||
# save a list of available categories for quick lookup
|
||||
with open(categoryListFilename, 'w+') as fp:
|
||||
fp.write(categoryListStr)
|
||||
|
||||
|
||||
def _validHashtagCategory(category: str) -> bool:
|
||||
"""Returns true if the category name is valid
|
||||
"""
|
||||
if not category:
|
||||
return False
|
||||
|
||||
invalidChars = (',', ' ', '<', ';', '\\')
|
||||
for ch in invalidChars:
|
||||
if ch in category:
|
||||
return False
|
||||
|
||||
# too long
|
||||
if len(category) > 40:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def setHashtagCategory(baseDir: str, hashtag: str, category: str,
|
||||
force=False) -> bool:
|
||||
"""Sets the category for the hashtag
|
||||
"""
|
||||
if not _validHashtagCategory(category):
|
||||
return False
|
||||
|
||||
if not force:
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
hashtag = hashtag.title()
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
hashtag = hashtag.upper()
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
return False
|
||||
|
||||
if not os.path.isdir(baseDir + '/tags'):
|
||||
os.mkdir(baseDir + '/tags')
|
||||
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
|
||||
if force:
|
||||
# don't overwrite any existing categories
|
||||
if os.path.isfile(categoryFilename):
|
||||
return False
|
||||
with open(categoryFilename, 'w+') as fp:
|
||||
fp.write(category)
|
||||
_updateHashtagCategories(baseDir)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def guessHashtagCategory(tagName: str, hashtagCategories: {}) -> str:
|
||||
"""Tries to guess a category for the given hashtag.
|
||||
This works by trying to find the longest similar hashtag
|
||||
"""
|
||||
categoryMatched = ''
|
||||
tagMatchedLen = 0
|
||||
|
||||
for categoryStr, hashtagList in hashtagCategories.items():
|
||||
for hashtag in hashtagList:
|
||||
if len(hashtag) < 3:
|
||||
# avoid matching very small strings which often
|
||||
# lead to spurious categories
|
||||
continue
|
||||
if hashtag not in tagName:
|
||||
if tagName not in hashtag:
|
||||
continue
|
||||
if not categoryMatched:
|
||||
tagMatchedLen = len(hashtag)
|
||||
categoryMatched = categoryStr
|
||||
else:
|
||||
# match the longest tag
|
||||
if len(hashtag) > tagMatchedLen:
|
||||
categoryMatched = categoryStr
|
||||
if not categoryMatched:
|
||||
return
|
||||
return categoryMatched
|
|
@ -174,7 +174,7 @@ from shares import removeShare
|
|||
from shares import expireShares
|
||||
from utils import getFullDomain
|
||||
from utils import removeHtml
|
||||
from utils import setHashtagCategory
|
||||
from categories import setHashtagCategory
|
||||
from utils import isEditor
|
||||
from utils import getImageExtensions
|
||||
from utils import mediaFileMimeType
|
||||
|
|
33
inbox.py
33
inbox.py
|
@ -32,8 +32,8 @@ from utils import loadJson
|
|||
from utils import saveJson
|
||||
from utils import updateLikesCollection
|
||||
from utils import undoLikesCollectionEntry
|
||||
from utils import getHashtagCategories
|
||||
from utils import setHashtagCategory
|
||||
from categories import getHashtagCategories
|
||||
from categories import setHashtagCategory
|
||||
from httpsig import verifyPostHeaders
|
||||
from session import createSession
|
||||
from session import getJson
|
||||
|
@ -70,34 +70,7 @@ from content import dangerousMarkup
|
|||
from happening import saveEventPost
|
||||
from delete import removeOldHashtags
|
||||
from follow import isFollowingActor
|
||||
|
||||
|
||||
def guessHashtagCategory(tagName: str, hashtagCategories: {}) -> str:
|
||||
"""Tries to guess a category for the given hashtag.
|
||||
This works by trying to find the longest similar hashtag
|
||||
"""
|
||||
categoryMatched = ''
|
||||
tagMatchedLen = 0
|
||||
|
||||
for categoryStr, hashtagList in hashtagCategories.items():
|
||||
for hashtag in hashtagList:
|
||||
if len(hashtag) < 3:
|
||||
# avoid matching very small strings which often
|
||||
# lead to spurious categories
|
||||
continue
|
||||
if hashtag not in tagName:
|
||||
if tagName not in hashtag:
|
||||
continue
|
||||
if not categoryMatched:
|
||||
tagMatchedLen = len(hashtag)
|
||||
categoryMatched = categoryStr
|
||||
else:
|
||||
# match the longest tag
|
||||
if len(hashtag) > tagMatchedLen:
|
||||
categoryMatched = categoryStr
|
||||
if not categoryMatched:
|
||||
return
|
||||
return categoryMatched
|
||||
from categories import guessHashtagCategory
|
||||
|
||||
|
||||
def storeHashTags(baseDir: str, nickname: str, postJsonObject: {}) -> None:
|
||||
|
|
|
@ -15,7 +15,7 @@ from datetime import timedelta
|
|||
from datetime import timezone
|
||||
from collections import OrderedDict
|
||||
from utils import validPostDate
|
||||
from utils import setHashtagCategory
|
||||
from categories import setHashtagCategory
|
||||
from utils import firstParagraphFromString
|
||||
from utils import isPublicPost
|
||||
from utils import locatePost
|
||||
|
|
146
utils.py
146
utils.py
|
@ -78,152 +78,6 @@ def isDormant(baseDir: str, nickname: str, domain: str, actor: str,
|
|||
return False
|
||||
|
||||
|
||||
def getHashtagCategory(baseDir: str, hashtag: str) -> str:
|
||||
"""Returns the category for the hashtag
|
||||
"""
|
||||
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
categoryFilename = baseDir + '/tags/' + hashtag.title() + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
categoryFilename = \
|
||||
baseDir + '/tags/' + hashtag.upper() + '.category'
|
||||
if not os.path.isfile(categoryFilename):
|
||||
return ''
|
||||
|
||||
with open(categoryFilename, 'r') as fp:
|
||||
categoryStr = fp.read()
|
||||
if categoryStr:
|
||||
return categoryStr
|
||||
return ''
|
||||
|
||||
|
||||
def getHashtagCategories(baseDir: str, recent=False, category=None) -> None:
|
||||
"""Returns a dictionary containing hashtag categories
|
||||
"""
|
||||
hashtagCategories = {}
|
||||
|
||||
if recent:
|
||||
currTime = datetime.datetime.utcnow()
|
||||
daysSinceEpoch = (currTime - datetime.datetime(1970, 1, 1)).days
|
||||
recently = daysSinceEpoch - 1
|
||||
|
||||
for subdir, dirs, files in os.walk(baseDir + '/tags'):
|
||||
for f in files:
|
||||
if not f.endswith('.category'):
|
||||
continue
|
||||
categoryFilename = os.path.join(baseDir + '/tags', f)
|
||||
if not os.path.isfile(categoryFilename):
|
||||
continue
|
||||
hashtag = f.split('.')[0]
|
||||
with open(categoryFilename, 'r') as fp:
|
||||
categoryStr = fp.read()
|
||||
|
||||
if not categoryStr:
|
||||
continue
|
||||
|
||||
if category:
|
||||
# only return a dictionary for a specific category
|
||||
if categoryStr != category:
|
||||
continue
|
||||
|
||||
if recent:
|
||||
tagsFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(tagsFilename):
|
||||
continue
|
||||
modTimesinceEpoc = \
|
||||
os.path.getmtime(tagsFilename)
|
||||
lastModifiedDate = \
|
||||
datetime.datetime.fromtimestamp(modTimesinceEpoc)
|
||||
fileDaysSinceEpoch = \
|
||||
(lastModifiedDate -
|
||||
datetime.datetime(1970, 1, 1)).days
|
||||
if fileDaysSinceEpoch < recently:
|
||||
continue
|
||||
|
||||
if not hashtagCategories.get(categoryStr):
|
||||
hashtagCategories[categoryStr] = [hashtag]
|
||||
else:
|
||||
if hashtag not in hashtagCategories[categoryStr]:
|
||||
hashtagCategories[categoryStr].append(hashtag)
|
||||
break
|
||||
return hashtagCategories
|
||||
|
||||
|
||||
def updateHashtagCategories(baseDir: str) -> None:
|
||||
"""Regenerates the list of hashtag categories
|
||||
"""
|
||||
categoryListFilename = baseDir + '/accounts/categoryList.txt'
|
||||
hashtagCategories = getHashtagCategories(baseDir)
|
||||
if not hashtagCategories:
|
||||
if os.path.isfile(categoryListFilename):
|
||||
os.remove(categoryListFilename)
|
||||
return
|
||||
|
||||
categoryList = []
|
||||
for categoryStr, hashtagList in hashtagCategories.items():
|
||||
categoryList.append(categoryStr)
|
||||
categoryList.sort()
|
||||
|
||||
categoryListStr = ''
|
||||
for categoryStr in categoryList:
|
||||
categoryListStr += categoryStr + '\n'
|
||||
|
||||
# save a list of available categories for quick lookup
|
||||
with open(categoryListFilename, 'w+') as fp:
|
||||
fp.write(categoryListStr)
|
||||
|
||||
|
||||
def validHashtagCategory(category: str) -> bool:
|
||||
"""Returns true if the category name is valid
|
||||
"""
|
||||
if not category:
|
||||
return False
|
||||
|
||||
invalidChars = (',', ' ', '<', ';', '\\')
|
||||
for ch in invalidChars:
|
||||
if ch in category:
|
||||
return False
|
||||
|
||||
# too long
|
||||
if len(category) > 40:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def setHashtagCategory(baseDir: str, hashtag: str, category: str,
|
||||
force=False) -> bool:
|
||||
"""Sets the category for the hashtag
|
||||
"""
|
||||
if not validHashtagCategory(category):
|
||||
return False
|
||||
|
||||
if not force:
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
hashtag = hashtag.title()
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
hashtag = hashtag.upper()
|
||||
hashtagFilename = baseDir + '/tags/' + hashtag + '.txt'
|
||||
if not os.path.isfile(hashtagFilename):
|
||||
return False
|
||||
|
||||
if not os.path.isdir(baseDir + '/tags'):
|
||||
os.mkdir(baseDir + '/tags')
|
||||
categoryFilename = baseDir + '/tags/' + hashtag + '.category'
|
||||
if force:
|
||||
# don't overwrite any existing categories
|
||||
if os.path.isfile(categoryFilename):
|
||||
return False
|
||||
with open(categoryFilename, 'w+') as fp:
|
||||
fp.write(category)
|
||||
updateHashtagCategories(baseDir)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def isEditor(baseDir: str, nickname: str) -> bool:
|
||||
"""Returns true if the given nickname is an editor
|
||||
"""
|
||||
|
|
|
@ -10,8 +10,8 @@ import os
|
|||
from shutil import copyfile
|
||||
from datetime import datetime
|
||||
from utils import getNicknameFromActor
|
||||
from utils import getHashtagCategories
|
||||
from utils import getHashtagCategory
|
||||
from categories import getHashtagCategories
|
||||
from categories import getHashtagCategory
|
||||
from webapp_utils import getSearchBannerFile
|
||||
from webapp_utils import getContentWarningButton
|
||||
from webapp_utils import htmlHeaderWithExternalStyle
|
||||
|
|
|
@ -19,7 +19,7 @@ from utils import locatePost
|
|||
from utils import isPublicPost
|
||||
from utils import firstParagraphFromString
|
||||
from utils import searchBoxPosts
|
||||
from utils import getHashtagCategory
|
||||
from categories import getHashtagCategory
|
||||
from feeds import rss2TagHeader
|
||||
from feeds import rss2TagFooter
|
||||
from webapp_utils import getAltPath
|
||||
|
|
Loading…
Reference in New Issue