__filename__ = "content.py" __author__ = "Bob Mottram" __license__ = "AGPL3+" __version__ = "1.2.0" __maintainer__ = "Bob Mottram" __email__ = "bob@libreserver.org" __status__ = "Production" __module_group__ = "Core" import os import email.parser import urllib.parse from shutil import copyfile from utils import dangerousSVG from utils import removeDomainPort from utils import isValidLanguage from utils import getImageExtensions from utils import loadJson from utils import saveJson from utils import fileLastModified from utils import getLinkPrefixes from utils import dangerousMarkup from utils import isPGPEncrypted from utils import containsPGPPublicKey from utils import acctDir from utils import isfloat from utils import getCurrencies from utils import removeHtml from petnames import getPetName from session import downloadImage def removeHtmlTag(htmlStr: str, tag: str) -> str: """Removes a given tag from a html string """ tagFound = True while tagFound: matchStr = ' ' + tag + '="' if matchStr not in htmlStr: tagFound = False break sections = htmlStr.split(matchStr, 1) if '"' not in sections[1]: tagFound = False break htmlStr = sections[0] + sections[1].split('"', 1)[1] return htmlStr def _removeQuotesWithinQuotes(content: str) -> str: """Removes any blockquote inside blockquote """ if '
' not in content: return content if '
' not in content: return content ctr = 1 found = True while found: prefix = content.split('
', ctr)[0] + '
' quotedStr = content.split('
', ctr)[1] if '
' not in quotedStr: found = False else: endStr = quotedStr.split('
')[1] quotedStr = quotedStr.split('
')[0] if '
' not in endStr: found = False if '
' in quotedStr: quotedStr = quotedStr.replace('
', '') content = prefix + quotedStr + '
' + endStr ctr += 1 return content def htmlReplaceEmailQuote(content: str) -> str: """Replaces an email style quote "> Some quote" with html blockquote """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content # replace quote paragraph if '

"' in content: if '"

' in content: if content.count('

"') == content.count('"

'): content = content.replace('

"', '

') content = content.replace('"

', '

') if '>\u201c' in content: if '\u201d<' in content: if content.count('>\u201c') == content.count('\u201d<'): content = content.replace('>\u201c', '>
') content = content.replace('\u201d<', '
<') # replace email style quote if '>> ' not in content: return content contentStr = content.replace('

', '') contentLines = contentStr.split('

') newContent = '' for lineStr in contentLines: if not lineStr: continue if '>> ' not in lineStr: if lineStr.startswith('> '): lineStr = lineStr.replace('> ', '
') lineStr = lineStr.replace('>', '
') newContent += '

' + lineStr + '

' else: newContent += '

' + lineStr + '

' else: lineStr = lineStr.replace('>> ', '>
') if lineStr.startswith('>'): lineStr = lineStr.replace('>', '
', 1) else: lineStr = lineStr.replace('>', '
') newContent += '

' + lineStr + '

' return _removeQuotesWithinQuotes(newContent) def htmlReplaceQuoteMarks(content: str) -> str: """Replaces quotes with html formatting "hello" becomes hello """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content if '"' not in content: if '"' not in content: return content # only if there are a few quote marks if content.count('"') > 4: return content if content.count('"') > 4: return content newContent = content if '"' in content: sections = content.split('"') if len(sections) > 1: newContent = '' openQuote = True markup = False for ch in content: currChar = ch if ch == '<': markup = True elif ch == '>': markup = False elif ch == '"' and not markup: if openQuote: currChar = '“' else: currChar = '”' openQuote = not openQuote newContent += currChar if '"' in newContent: openQuote = True content = newContent newContent = '' ctr = 0 sections = content.split('"') noOfSections = len(sections) for s in sections: newContent += s if ctr < noOfSections - 1: if openQuote: newContent += '“' else: newContent += '”' openQuote = not openQuote ctr += 1 return newContent def dangerousCSS(filename: str, allow_local_network_access: bool) -> bool: """Returns true is the css file contains code which can create security problems """ if not os.path.isfile(filename): return False content = None try: with open(filename, 'r') as fp: content = fp.read().lower() except OSError: print('EX: unable to read css file ' + filename) if content: cssMatches = ('behavior:', ':expression', '?php', '.php', 'google', 'regexp', 'localhost', '127.0.', '192.168', '10.0.', '@import') for cssmatch in cssMatches: if cssmatch in content: return True # search for non-local web links if 'url(' in content: urlList = content.split('url(') ctr = 0 for urlStr in urlList: if ctr > 0: if ')' in urlStr: urlStr = urlStr.split(')')[0] if 'http' in urlStr: print('ERROR: non-local web link in CSS ' + filename) return True ctr += 1 # an attacker can include html inside of the css # file as a comment and this may then be run from the html if dangerousMarkup(content, allow_local_network_access): return True return False def switchWords(base_dir: str, nickname: str, domain: str, content: str, rules: [] = []) -> str: """Performs word replacements. eg. Trump -> The Orange Menace """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content if not rules: switchWordsFilename = \ acctDir(base_dir, nickname, domain) + '/replacewords.txt' if not os.path.isfile(switchWordsFilename): return content try: with open(switchWordsFilename, 'r') as fp: rules = fp.readlines() except OSError: print('EX: unable to read switches ' + switchWordsFilename) for line in rules: replaceStr = line.replace('\n', '').replace('\r', '') splitters = ('->', ':', ',', ';', '-') wordTransform = None for splitStr in splitters: if splitStr in replaceStr: wordTransform = replaceStr.split(splitStr) break if not wordTransform: continue if len(wordTransform) == 2: replaceStr1 = wordTransform[0].strip().replace('"', '') replaceStr2 = wordTransform[1].strip().replace('"', '') content = content.replace(replaceStr1, replaceStr2) return content def _saveCustomEmoji(session, base_dir: str, emojiName: str, url: str, debug: bool) -> None: """Saves custom emoji to file """ if not session: if debug: print('EX: _saveCustomEmoji no session') return if '.' not in url: return ext = url.split('.')[-1] if ext != 'png': if debug: print('EX: Custom emoji is wrong format ' + url) return emojiName = emojiName.replace(':', '').strip().lower() customEmojiDir = base_dir + '/emojicustom' if not os.path.isdir(customEmojiDir): os.mkdir(customEmojiDir) emojiImageFilename = customEmojiDir + '/' + emojiName + '.' + ext if not downloadImage(session, base_dir, url, emojiImageFilename, debug, False): if debug: print('EX: custom emoji not downloaded ' + url) return emojiJsonFilename = customEmojiDir + '/emoji.json' emojiJson = {} if os.path.isfile(emojiJsonFilename): emojiJson = loadJson(emojiJsonFilename, 0, 1) if not emojiJson: emojiJson = {} if not emojiJson.get(emojiName): emojiJson[emojiName] = emojiName saveJson(emojiJson, emojiJsonFilename) if debug: print('EX: Saved custom emoji ' + emojiJsonFilename) elif debug: print('EX: cusom emoji already saved') def replaceEmojiFromTags(session, base_dir: str, content: str, tag: [], messageType: str, debug: bool) -> str: """Uses the tags to replace :emoji: with html image markup """ for tagItem in tag: if not tagItem.get('type'): continue if tagItem['type'] != 'Emoji': continue if not tagItem.get('name'): continue if not tagItem.get('icon'): continue if not tagItem['icon'].get('url'): continue if '/' not in tagItem['icon']['url']: continue if tagItem['name'] not in content: continue iconName = tagItem['icon']['url'].split('/')[-1] if iconName: if len(iconName) > 1: if iconName[0].isdigit(): if '.' in iconName: iconName = iconName.split('.')[0] # see https://unicode.org/ # emoji/charts/full-emoji-list.html if '-' not in iconName: # a single code replaced = False try: replaceChar = chr(int("0x" + iconName, 16)) content = content.replace(tagItem['name'], replaceChar) replaced = True except BaseException: print('EX: replaceEmojiFromTags 1 ' + 'no conversion of ' + str(iconName) + ' to chr ' + tagItem['name'] + ' ' + tagItem['icon']['url']) if not replaced: _saveCustomEmoji(session, base_dir, tagItem['name'], tagItem['icon']['url'], debug) else: # sequence of codes iconCodes = iconName.split('-') iconCodeSequence = '' for icode in iconCodes: replaced = False try: iconCodeSequence += chr(int("0x" + icode, 16)) replaced = True except BaseException: iconCodeSequence = '' print('EX: replaceEmojiFromTags 2 ' + 'no conversion of ' + str(icode) + ' to chr ' + tagItem['name'] + ' ' + tagItem['icon']['url']) if not replaced: _saveCustomEmoji(session, base_dir, tagItem['name'], tagItem['icon']['url'], debug) if iconCodeSequence: content = content.replace(tagItem['name'], iconCodeSequence) htmlClass = 'emoji' if messageType == 'post header': htmlClass = 'emojiheader' if messageType == 'profile': htmlClass = 'emojiprofile' emojiHtml = "\""" content = content.replace(tagItem['name'], emojiHtml) return content def _addMusicTag(content: str, tag: str) -> str: """If a music link is found then ensure that the post is tagged appropriately """ if '#podcast' in content or '#documentary' in content: return content if '#' not in tag: tag = '#' + tag if tag in content: return content musicSites = ('soundcloud.com', 'bandcamp.com') musicSiteFound = False for site in musicSites: if site + '/' in content: musicSiteFound = True break if not musicSiteFound: return content return ':music: ' + content + ' ' + tag + ' ' def addWebLinks(content: str) -> str: """Adds markup for web links """ if ':' not in content: return content prefixes = getLinkPrefixes() # do any of these prefixes exist within the content? prefixFound = False for prefix in prefixes: if prefix in content: prefixFound = True break # if there are no prefixes then just keep the content we have if not prefixFound: return content maxLinkLength = 40 content = content.replace('\r', '') words = content.replace('\n', ' --linebreak-- ').split(' ') replaceDict = {} for w in words: if ':' not in w: continue # does the word begin with a prefix? prefixFound = False for prefix in prefixes: if w.startswith(prefix): prefixFound = True break if not prefixFound: continue # the word contains a prefix if w.endswith('.') or w.endswith(';'): w = w[:-1] markup = '' for prefix in prefixes: if w.startswith(prefix): markup += '' break linkText = w for prefix in prefixes: linkText = linkText.replace(prefix, '') # prevent links from becoming too long if len(linkText) > maxLinkLength: markup += '' + \ linkText[:maxLinkLength] + '' markup += '' else: markup += '' + linkText + '' replaceDict[w] = markup # do the replacements for url, markup in replaceDict.items(): content = content.replace(url, markup) # replace any line breaks content = content.replace(' --linebreak-- ', '
') return content def validHashTag(hashtag: str) -> bool: """Returns true if the give hashtag contains valid characters """ # long hashtags are not valid if len(hashtag) >= 32: return False validChars = set('0123456789' + 'abcdefghijklmnopqrstuvwxyz' + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + '¡¿ÄäÀàÁáÂâÃãÅåǍǎĄąĂăÆæĀā' + 'ÇçĆćĈĉČčĎđĐďðÈèÉéÊêËëĚěĘęĖėĒē' + 'ĜĝĢģĞğĤĥÌìÍíÎîÏïıĪīĮįĴĵĶķ' + 'ĹĺĻļŁłĽľĿŀÑñŃńŇňŅņÖöÒòÓóÔôÕõŐőØøŒœ' + 'ŔŕŘřẞߌśŜŝŞşŠšȘșŤťŢţÞþȚțÜüÙùÚúÛûŰűŨũŲųŮůŪū' + 'ŴŵÝýŸÿŶŷŹźŽžŻż') if set(hashtag).issubset(validChars): return True if isValidLanguage(hashtag): return True return False def _addHashTags(wordStr: str, http_prefix: str, domain: str, replaceHashTags: {}, postHashtags: {}) -> bool: """Detects hashtags and adds them to the replacements dict Also updates the hashtags list to be added to the post """ if replaceHashTags.get(wordStr): return True hashtag = wordStr[1:] if not validHashTag(hashtag): return False hashtagUrl = http_prefix + "://" + domain + "/tags/" + hashtag postHashtags[hashtag] = { 'href': hashtagUrl, 'name': '#' + hashtag, 'type': 'Hashtag' } replaceHashTags[wordStr] = "#" + \ hashtag + "" return True def _addEmoji(base_dir: str, wordStr: str, http_prefix: str, domain: str, replaceEmoji: {}, postTags: {}, emojiDict: {}) -> bool: """Detects Emoji and adds them to the replacements dict Also updates the tags list to be added to the post """ if not wordStr.startswith(':'): return False if not wordStr.endswith(':'): return False if len(wordStr) < 3: return False if replaceEmoji.get(wordStr): return True # remove leading and trailing : characters emoji = wordStr[1:] emoji = emoji[:-1] # is the text of the emoji valid? if not validHashTag(emoji): return False if not emojiDict.get(emoji): return False emojiFilename = base_dir + '/emoji/' + emojiDict[emoji] + '.png' if not os.path.isfile(emojiFilename): return False emojiUrl = http_prefix + "://" + domain + \ "/emoji/" + emojiDict[emoji] + '.png' postTags[emoji] = { 'icon': { 'mediaType': 'image/png', 'type': 'Image', 'url': emojiUrl }, 'name': ':' + emoji + ':', "updated": fileLastModified(emojiFilename), "id": emojiUrl.replace('.png', ''), 'type': 'Emoji' } return True def tagExists(tagType: str, tagName: str, tags: {}) -> bool: """Returns true if a tag exists in the given dict """ for tag in tags: if tag['name'] == tagName and tag['type'] == tagType: return True return False def _addMention(wordStr: str, http_prefix: str, following: str, petnames: str, replaceMentions: {}, recipients: [], tags: {}) -> bool: """Detects mentions and adds them to the replacements dict and recipients list """ possibleHandle = wordStr[1:] # @nick if following and '@' not in possibleHandle: # fall back to a best effort match against the following list # if no domain was specified. eg. @nick possibleNickname = possibleHandle for follow in following: if '@' not in follow: continue followNick = follow.split('@')[0] if possibleNickname == followNick: followStr = follow.replace('\n', '').replace('\r', '') replaceDomain = followStr.split('@')[1] recipientActor = http_prefix + "://" + \ replaceDomain + "/@" + possibleNickname if recipientActor not in recipients: recipients.append(recipientActor) tags[wordStr] = { 'href': recipientActor, 'name': wordStr, 'type': 'Mention' } replaceMentions[wordStr] = \ "@" + possibleNickname + \ "" return True # try replacing petnames with mentions followCtr = 0 for follow in following: if '@' not in follow: followCtr += 1 continue pet = petnames[followCtr].replace('\n', '') if pet: if possibleNickname == pet: followStr = follow.replace('\n', '').replace('\r', '') replaceNickname = followStr.split('@')[0] replaceDomain = followStr.split('@')[1] recipientActor = http_prefix + "://" + \ replaceDomain + "/@" + replaceNickname if recipientActor not in recipients: recipients.append(recipientActor) tags[wordStr] = { 'href': recipientActor, 'name': wordStr, 'type': 'Mention' } replaceMentions[wordStr] = \ "@" + \ replaceNickname + "" return True followCtr += 1 return False possibleNickname = None possibleDomain = None if '@' not in possibleHandle: return False possibleNickname = possibleHandle.split('@')[0] if not possibleNickname: return False possibleDomain = \ possibleHandle.split('@')[1].strip('\n').strip('\r') if not possibleDomain: return False if following: for follow in following: if follow.replace('\n', '').replace('\r', '') != possibleHandle: continue recipientActor = http_prefix + "://" + \ possibleDomain + "/@" + possibleNickname if recipientActor not in recipients: recipients.append(recipientActor) tags[wordStr] = { 'href': recipientActor, 'name': wordStr, 'type': 'Mention' } replaceMentions[wordStr] = \ "@" + possibleNickname + \ "" return True # @nick@domain if not (possibleDomain == 'localhost' or '.' in possibleDomain): return False recipientActor = http_prefix + "://" + \ possibleDomain + "/@" + possibleNickname if recipientActor not in recipients: recipients.append(recipientActor) tags[wordStr] = { 'href': recipientActor, 'name': wordStr, 'type': 'Mention' } replaceMentions[wordStr] = \ "@" + possibleNickname + \ "" return True def replaceContentDuplicates(content: str) -> str: """Replaces invalid duplicates within content """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content while '<<' in content: content = content.replace('<<', '<') while '>>' in content: content = content.replace('>>', '>') content = content.replace('<\\p>', '') return content def removeTextFormatting(content: str) -> str: """Removes markup for bold, italics, etc """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content if '<' not in content: return content removeMarkup = ('b', 'i', 'ul', 'ol', 'li', 'em', 'strong', 'blockquote', 'h1', 'h2', 'h3', 'h4', 'h5') for markup in removeMarkup: content = content.replace('<' + markup + '>', '') content = content.replace('', '') content = content.replace('<' + markup.upper() + '>', '') content = content.replace('', '') return content def removeLongWords(content: str, maxWordLength: int, longWordsList: []) -> str: """Breaks up long words so that on mobile screens this doesn't disrupt the layout """ if isPGPEncrypted(content) or containsPGPPublicKey(content): return content content = replaceContentDuplicates(content) if ' ' not in content: # handle a single very long string with no spaces contentStr = content.replace('

', '').replace(r'<\p>', '') if '://' not in contentStr: if len(contentStr) > maxWordLength: if '

' in content: content = '

' + contentStr[:maxWordLength] + r'<\p>' else: content = content[:maxWordLength] return content words = content.split(' ') if not longWordsList: longWordsList = [] for wordStr in words: if len(wordStr) > maxWordLength: if wordStr not in longWordsList: longWordsList.append(wordStr) for wordStr in longWordsList: if wordStr.startswith('

'): wordStr = wordStr.replace('

', '') if wordStr.startswith('<'): continue if len(wordStr) == 76: if wordStr.upper() == wordStr: # tox address continue if '=\"' in wordStr: continue if '@' in wordStr: if '@@' not in wordStr: continue if '=.ed25519' in wordStr: continue if '.onion' in wordStr: continue if '.i2p' in wordStr: continue if 'https:' in wordStr: continue elif 'http:' in wordStr: continue elif 'i2p:' in wordStr: continue elif 'gnunet:' in wordStr: continue elif 'dat:' in wordStr: continue elif 'rad:' in wordStr: continue elif 'hyper:' in wordStr: continue elif 'briar:' in wordStr: continue if '<' in wordStr: replaceWord = wordStr.split('<', 1)[0] # if len(replaceWord) > maxWordLength: # replaceWord = replaceWord[:maxWordLength] content = content.replace(wordStr, replaceWord) wordStr = replaceWord if '/' in wordStr: continue if len(wordStr[maxWordLength:]) < maxWordLength: content = content.replace(wordStr, wordStr[:maxWordLength] + '\n' + wordStr[maxWordLength:]) else: content = content.replace(wordStr, wordStr[:maxWordLength]) if content.startswith('

'): if not content.endswith('

'): content = content.strip() + '

' return content def _loadAutoTags(base_dir: str, nickname: str, domain: str) -> []: """Loads automatic tags file and returns a list containing the lines of the file """ filename = acctDir(base_dir, nickname, domain) + '/autotags.txt' if not os.path.isfile(filename): return [] try: with open(filename, 'r') as f: return f.readlines() except OSError: print('EX: unable to read auto tags ' + filename) return [] def _autoTag(base_dir: str, nickname: str, domain: str, wordStr: str, autoTagList: [], appendTags: []): """Generates a list of tags to be automatically appended to the content """ for tagRule in autoTagList: if wordStr not in tagRule: continue if '->' not in tagRule: continue rulematch = tagRule.split('->')[0].strip() if rulematch != wordStr: continue tagName = tagRule.split('->')[1].strip() if tagName.startswith('#'): if tagName not in appendTags: appendTags.append(tagName) else: if '#' + tagName not in appendTags: appendTags.append('#' + tagName) def addHtmlTags(base_dir: str, http_prefix: str, nickname: str, domain: str, content: str, recipients: [], hashtags: {}, isJsonContent: bool = False) -> str: """ Replaces plaintext mentions such as @nick@domain into html by matching against known following accounts """ if content.startswith('

'): content = htmlReplaceEmailQuote(content) return htmlReplaceQuoteMarks(content) maxWordLength = 40 content = content.replace('\r', '') content = content.replace('\n', ' --linebreak-- ') content = _addMusicTag(content, 'nowplaying') contentSimplified = \ content.replace(',', ' ').replace(';', ' ').replace('- ', ' ') contentSimplified = contentSimplified.replace('. ', ' ').strip() if contentSimplified.endswith('.'): contentSimplified = contentSimplified[:len(contentSimplified)-1] words = contentSimplified.split(' ') # remove . for words which are not mentions newWords = [] for wordIndex in range(0, len(words)): wordStr = words[wordIndex] if wordStr.endswith('.'): if not wordStr.startswith('@'): wordStr = wordStr[:-1] if wordStr.startswith('.'): wordStr = wordStr[1:] newWords.append(wordStr) words = newWords replaceMentions = {} replaceHashTags = {} replaceEmoji = {} emojiDict = {} originalDomain = domain domain = removeDomainPort(domain) followingFilename = acctDir(base_dir, nickname, domain) + '/following.txt' # read the following list so that we can detect just @nick # in addition to @nick@domain following = None petnames = None if '@' in words: if os.path.isfile(followingFilename): following = [] try: with open(followingFilename, 'r') as f: following = f.readlines() except OSError: print('EX: unable to read ' + followingFilename) for handle in following: pet = getPetName(base_dir, nickname, domain, handle) if pet: petnames.append(pet + '\n') # extract mentions and tags from words longWordsList = [] prevWordStr = '' autoTagsList = _loadAutoTags(base_dir, nickname, domain) appendTags = [] for wordStr in words: wordLen = len(wordStr) if wordLen > 2: if wordLen > maxWordLength: longWordsList.append(wordStr) firstChar = wordStr[0] if firstChar == '@': if _addMention(wordStr, http_prefix, following, petnames, replaceMentions, recipients, hashtags): prevWordStr = '' continue elif firstChar == '#': # remove any endings from the hashtag hashTagEndings = ('.', ':', ';', '-', '\n') for ending in hashTagEndings: if wordStr.endswith(ending): wordStr = wordStr[:len(wordStr) - 1] break if _addHashTags(wordStr, http_prefix, originalDomain, replaceHashTags, hashtags): prevWordStr = '' continue elif ':' in wordStr: wordStr2 = wordStr.split(':')[1] # print('TAG: emoji located - ' + wordStr) if not emojiDict: # emoji.json is generated so that it can be customized and # the changes will be retained even if default_emoji.json # is subsequently updated if not os.path.isfile(base_dir + '/emoji/emoji.json'): copyfile(base_dir + '/emoji/default_emoji.json', base_dir + '/emoji/emoji.json') emojiDict = loadJson(base_dir + '/emoji/emoji.json') # append custom emoji to the dict if os.path.isfile(base_dir + '/emojicustom/emoji.json'): customEmojiDict = \ loadJson(base_dir + '/emojicustom/emoji.json') if customEmojiDict: emojiDict = dict(emojiDict, **customEmojiDict) # print('TAG: looking up emoji for :' + wordStr2 + ':') _addEmoji(base_dir, ':' + wordStr2 + ':', http_prefix, originalDomain, replaceEmoji, hashtags, emojiDict) else: if _autoTag(base_dir, nickname, domain, wordStr, autoTagsList, appendTags): prevWordStr = '' continue if prevWordStr: if _autoTag(base_dir, nickname, domain, prevWordStr + ' ' + wordStr, autoTagsList, appendTags): prevWordStr = '' continue prevWordStr = wordStr # add any auto generated tags for appended in appendTags: content = content + ' ' + appended _addHashTags(appended, http_prefix, originalDomain, replaceHashTags, hashtags) # replace words with their html versions for wordStr, replaceStr in replaceMentions.items(): content = content.replace(wordStr, replaceStr) for wordStr, replaceStr in replaceHashTags.items(): content = content.replace(wordStr, replaceStr) if not isJsonContent: for wordStr, replaceStr in replaceEmoji.items(): content = content.replace(wordStr, replaceStr) content = addWebLinks(content) if longWordsList: content = removeLongWords(content, maxWordLength, longWordsList) content = limitRepeatedWords(content, 6) content = content.replace(' --linebreak-- ', '

') content = htmlReplaceEmailQuote(content) return '

' + htmlReplaceQuoteMarks(content) + '

' def getMentionsFromHtml(htmlText: str, matchStr=" []: """Extracts mentioned actors from the given html content string """ mentions = [] if matchStr not in htmlText: return mentions mentionsList = htmlText.split(matchStr) for mentionStr in mentionsList: if '"' not in mentionStr: continue actorStr = mentionStr.split('"')[0] if actorStr.startswith('http') or \ actorStr.startswith('gnunet') or \ actorStr.startswith('i2p') or \ actorStr.startswith('hyper') or \ actorStr.startswith('dat:'): if actorStr not in mentions: mentions.append(actorStr) return mentions def extractMediaInFormPOST(postBytes, boundary, name: str): """Extracts the binary encoding for image/video/audio within a http form POST Returns the media bytes and the remaining bytes """ imageStartBoundary = b'Content-Disposition: form-data; name="' + \ name.encode('utf8', 'ignore') + b'";' imageStartLocation = postBytes.find(imageStartBoundary) if imageStartLocation == -1: return None, postBytes # bytes after the start boundary appears mediaBytes = postBytes[imageStartLocation:] # look for the next boundary imageEndBoundary = boundary.encode('utf8', 'ignore') imageEndLocation = mediaBytes.find(imageEndBoundary) if imageEndLocation == -1: # no ending boundary return mediaBytes, postBytes[:imageStartLocation] # remaining bytes after the end of the image remainder = mediaBytes[imageEndLocation:] # remove bytes after the end boundary mediaBytes = mediaBytes[:imageEndLocation] # return the media and the before+after bytes return mediaBytes, postBytes[:imageStartLocation] + remainder def saveMediaInFormPOST(mediaBytes, debug: bool, filenameBase: str = None) -> (str, str): """Saves the given media bytes extracted from http form POST Returns the filename and attachment type """ if not mediaBytes: if filenameBase: # remove any existing files extensionTypes = getImageExtensions() for ex in extensionTypes: possibleOtherFormat = filenameBase + '.' + ex if os.path.isfile(possibleOtherFormat): try: os.remove(possibleOtherFormat) except OSError: if debug: print('EX: saveMediaInFormPOST ' + 'unable to delete other ' + str(possibleOtherFormat)) if os.path.isfile(filenameBase): try: os.remove(filenameBase) except OSError: if debug: print('EX: saveMediaInFormPOST ' + 'unable to delete ' + str(filenameBase)) if debug: print('DEBUG: No media found within POST') return None, None mediaLocation = -1 searchStr = '' filename = None # directly search the binary array for the beginning # of an image extensionList = { 'png': 'image/png', 'jpeg': 'image/jpeg', 'gif': 'image/gif', 'svg': 'image/svg+xml', 'webp': 'image/webp', 'avif': 'image/avif', 'mp4': 'video/mp4', 'ogv': 'video/ogv', 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', 'flac': 'audio/flac', 'zip': 'application/zip' } detectedExtension = None for extension, contentType in extensionList.items(): searchStr = b'Content-Type: ' + contentType.encode('utf8', 'ignore') mediaLocation = mediaBytes.find(searchStr) if mediaLocation > -1: # image/video/audio binaries if extension == 'jpeg': extension = 'jpg' elif extension == 'mpeg': extension = 'mp3' if filenameBase: filename = filenameBase + '.' + extension attachmentMediaType = \ searchStr.decode().split('/')[0].replace('Content-Type: ', '') detectedExtension = extension break if not filename: return None, None # locate the beginning of the image, after any # carriage returns startPos = mediaLocation + len(searchStr) for offset in range(1, 8): if mediaBytes[startPos+offset] != 10: if mediaBytes[startPos+offset] != 13: startPos += offset break # remove any existing image files with a different format if detectedExtension != 'zip': extensionTypes = getImageExtensions() for ex in extensionTypes: if ex == detectedExtension: continue possibleOtherFormat = \ filename.replace('.temp', '').replace('.' + detectedExtension, '.' + ex) if os.path.isfile(possibleOtherFormat): try: os.remove(possibleOtherFormat) except OSError: if debug: print('EX: saveMediaInFormPOST ' + 'unable to delete other 2 ' + str(possibleOtherFormat)) # don't allow scripts within svg files if detectedExtension == 'svg': svgStr = mediaBytes[startPos:] svgStr = svgStr.decode() if dangerousSVG(svgStr, False): return None, None try: with open(filename, 'wb') as fp: fp.write(mediaBytes[startPos:]) except OSError: print('EX: unable to write media') if not os.path.isfile(filename): print('WARN: Media file could not be written to file: ' + filename) return None, None print('Uploaded media file written: ' + filename) return filename, attachmentMediaType def extractTextFieldsInPOST(postBytes, boundary: str, debug: bool, unit_testData: str = None) -> {}: """Returns a dictionary containing the text fields of a http form POST The boundary argument comes from the http header """ if not unit_testData: msgBytes = email.parser.BytesParser().parsebytes(postBytes) messageFields = msgBytes.get_payload(decode=True).decode('utf-8') else: messageFields = unit_testData if debug: print('DEBUG: POST arriving ' + messageFields) messageFields = messageFields.split(boundary) fields = {} fieldsWithSemicolonAllowed = ( 'message', 'bio', 'autoCW', 'password', 'passwordconfirm', 'instanceDescription', 'instanceDescriptionShort', 'subject', 'location', 'imageDescription' ) # examine each section of the POST, separated by the boundary for f in messageFields: if f == '--': continue if ' name="' not in f: continue postStr = f.split(' name="', 1)[1] if '"' not in postStr: continue postKey = postStr.split('"', 1)[0] postValueStr = postStr.split('"', 1)[1] if ';' in postValueStr: if postKey not in fieldsWithSemicolonAllowed and \ not postKey.startswith('edited'): continue if '\r\n' not in postValueStr: continue postLines = postValueStr.split('\r\n') postValue = '' if len(postLines) > 2: for line in range(2, len(postLines)-1): if line > 2: postValue += '\n' postValue += postLines[line] fields[postKey] = urllib.parse.unquote(postValue) return fields def limitRepeatedWords(text: str, maxRepeats: int) -> str: """Removes words which are repeated many times """ words = text.replace('\n', ' ').split(' ') repeatCtr = 0 repeatedText = '' replacements = {} prevWord = '' for word in words: if word == prevWord: repeatCtr += 1 if repeatedText: repeatedText += ' ' + word else: repeatedText = word + ' ' + word else: if repeatCtr > maxRepeats: newText = ((prevWord + ' ') * maxRepeats).strip() replacements[prevWord] = [repeatedText, newText] repeatCtr = 0 repeatedText = '' prevWord = word if repeatCtr > maxRepeats: newText = ((prevWord + ' ') * maxRepeats).strip() replacements[prevWord] = [repeatedText, newText] for word, item in replacements.items(): text = text.replace(item[0], item[1]) return text def getPriceFromString(priceStr: str) -> (str, str): """Returns the item price and currency """ currencies = getCurrencies() for symbol, name in currencies.items(): if symbol in priceStr: price = priceStr.replace(symbol, '') if isfloat(price): return price, name elif name in priceStr: price = priceStr.replace(name, '') if isfloat(price): return price, name if isfloat(priceStr): return priceStr, "EUR" return "0.00", "EUR" def _wordsSimilarityHistogram(words: []) -> {}: """Returns a histogram for word combinations """ histogram = {} for index in range(1, len(words)): combinedWords = words[index - 1] + words[index] if histogram.get(combinedWords): histogram[combinedWords] += 1 else: histogram[combinedWords] = 1 return histogram def _wordsSimilarityWordsList(content: str) -> []: """Returns a list of words for the given content """ removePunctuation = ('.', ',', ';', '-', ':', '"') content = removeHtml(content).lower() for p in removePunctuation: content = content.replace(p, ' ') content = content.replace(' ', ' ') return content.split(' ') def wordsSimilarity(content1: str, content2: str, minWords: int) -> int: """Returns percentage similarity """ if content1 == content2: return 100 words1 = _wordsSimilarityWordsList(content1) if len(words1) < minWords: return 0 words2 = _wordsSimilarityWordsList(content2) if len(words2) < minWords: return 0 histogram1 = _wordsSimilarityHistogram(words1) histogram2 = _wordsSimilarityHistogram(words2) diff = 0 for combinedWords, hits in histogram1.items(): if not histogram2.get(combinedWords): diff += 1 else: diff += abs(histogram2[combinedWords] - histogram1[combinedWords]) return 100 - int(diff * 100 / len(histogram1.items())) def containsInvalidLocalLinks(content: str) -> bool: """Returns true if the given content has invalid links """ invalidStrings = ( 'mute', 'unmute', 'editeventpost', 'notifypost', 'delete', 'options', 'page', 'repeat', 'bm', 'tl', 'actor', 'unrepeat', 'eventid', 'unannounce', 'like', 'unlike', 'bookmark', 'unbookmark', 'likedBy', 'time', 'year', 'month', 'day', 'editnewpost', 'graph', 'showshare', 'category', 'showwanted', 'rmshare', 'rmwanted', 'repeatprivate', 'unrepeatprivate', 'replyto', 'replyfollowers', 'replydm', 'editblogpost', 'handle', 'blockdomain' ) for invStr in invalidStrings: if '?' + invStr + '=' in content: return True return False