Merge branch 'main' of ssh://code.freedombone.net:2222/bashrc/epicyon

main
Bob Mottram 2021-06-25 19:20:11 +01:00
commit 40e766f029
43 changed files with 519 additions and 563 deletions

View File

@ -7,6 +7,7 @@ __email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "ActivityPub"
from domainhandler import removeDomainPort
from utils import hasObjectDict
from utils import removeIdEnding
from utils import hasUsersPath
@ -128,8 +129,7 @@ def createAnnounce(session, baseDir: str, federationList: [],
if not urlPermitted(objectUrl, federationList):
return None
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
fullDomain = getFullDomain(domain, port)
statusNumber, published = getStatusNumber()
@ -399,8 +399,7 @@ def outboxUndoAnnounce(recentPostsCache: {},
print('DEBUG: c2s undo announce request arrived in outbox')
messageId = removeIdEnding(messageJson['object']['object'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:

View File

@ -5,12 +5,13 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "ActivityPub"
__module_group__ = "Core"
import os
import json
import time
from datetime import datetime
from domainhandler import removeDomainPort
from utils import hasObjectDict
from utils import isAccountDir
from utils import getCachedPostFilename
@ -58,8 +59,7 @@ def addBlock(baseDir: str, nickname: str, domain: str,
blockNickname: str, blockDomain: str) -> bool:
"""Block the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
blockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
blockHandle = blockNickname + '@' + blockDomain
@ -111,8 +111,7 @@ def removeBlock(baseDir: str, nickname: str, domain: str,
unblockNickname: str, unblockDomain: str) -> bool:
"""Unblock the given account
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
unblockingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/blocking.txt'
unblockHandle = unblockNickname + '@' + unblockDomain
@ -338,8 +337,7 @@ def outboxBlock(baseDir: str, httpPrefix: str,
if debug:
print('DEBUG: c2s block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
@ -406,8 +404,7 @@ def outboxUndoBlock(baseDir: str, httpPrefix: str,
if debug:
print('DEBUG: c2s undo block object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
@ -605,8 +602,7 @@ def outboxMute(baseDir: str, httpPrefix: str,
if debug:
print('DEBUG: c2s mute object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
@ -663,8 +659,7 @@ def outboxUndoMute(baseDir: str, httpPrefix: str,
if debug:
print('DEBUG: c2s undo mute object has no nickname')
return
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:

19
blog.py
View File

@ -16,6 +16,7 @@ from webapp_utils import htmlHeaderWithBlogMarkup
from webapp_utils import htmlFooter
from webapp_utils import getPostAttachmentsAsHtml
from webapp_media import addEmbeddedElements
from utils import isAccountDir
from utils import removeHtml
from utils import getConfigParam
from utils import getFullDomain
@ -643,11 +644,7 @@ def _noOfBlogAccounts(baseDir: str) -> int:
ctr = 0
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if acct.startswith('inbox@'):
continue
elif acct.startswith('news@'):
if not isAccountDir(acct):
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
@ -662,11 +659,7 @@ def _singleBlogAccountNickname(baseDir: str) -> str:
"""
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if acct.startswith('inbox@'):
continue
elif acct.startswith('news@'):
if not isAccountDir(acct):
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
@ -704,11 +697,7 @@ def htmlBlogView(authorized: bool,
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if acct.startswith('inbox@'):
continue
elif acct.startswith('news@'):
if not isAccountDir(acct):
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'

View File

@ -11,6 +11,7 @@ import os
from pprint import pprint
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from domainhandler import removeDomainPort
from utils import hasUsersPath
from utils import getFullDomain
from utils import removeIdEnding
@ -560,8 +561,7 @@ def outboxBookmark(recentPostsCache: {},
print('DEBUG: c2s bookmark Add request arrived in outbox')
messageUrl = removeIdEnding(messageJson['object']['url'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageUrl)
if not postFilename:
if debug:
@ -625,8 +625,7 @@ def outboxUndoBookmark(recentPostsCache: {},
print('DEBUG: c2s unbookmark Remove request arrived in outbox')
messageUrl = removeIdEnding(messageJson['object']['url'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageUrl)
if not postFilename:
if debug:

View File

@ -5,11 +5,13 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Core"
import os
import email.parser
import urllib.parse
from shutil import copyfile
from domainhandler import removeDomainPort
from utils import isValidLanguage
from utils import getImageExtensions
from utils import loadJson
@ -772,8 +774,7 @@ def addHtmlTags(baseDir: str, httpPrefix: str,
replaceEmoji = {}
emojiDict = {}
originalDomain = domain
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
followingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/following.txt'

View File

@ -5,6 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Core"
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer, HTTPServer
import sys
@ -139,8 +140,8 @@ from blog import htmlBlogView
from blog import htmlBlogPage
from blog import htmlBlogPost
from blog import htmlEditBlog
from webapp_utils import setMinimal
from webapp_utils import isMinimal
from webapp_minimalbutton import setMinimal
from webapp_minimalbutton import isMinimal
from webapp_utils import getAvatarImageUrl
from webapp_utils import htmlHashtagBlocked
from webapp_utils import htmlFollowingList

View File

@ -9,6 +9,7 @@ __module_group__ = "ActivityPub"
import os
from datetime import datetime
from domainhandler import removeDomainPort
from utils import hasUsersPath
from utils import getFullDomain
from utils import removeIdEnding
@ -154,8 +155,7 @@ def outboxDelete(baseDir: str, httpPrefix: str,
"wasn't created by you (nickname does not match)")
return
deleteDomain, deletePort = getDomainFromActor(messageId)
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
if deleteDomain != domain:
if debug:
print("DEBUG: you can't delete a post which " +

32
domainhandler.py 100644
View File

@ -0,0 +1,32 @@
__filename__ = "domainhandler.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Core"
def removeDomainPort(domain: str) -> str:
"""If the domain has a port appended then remove it
eg. mydomain.com:80 becomes mydomain.com
"""
if ':' in domain:
if domain.startswith('did:'):
return domain
domain = domain.split(':')[0]
return domain
def getPortFromDomain(domain: str) -> int:
"""If the domain has a port number appended then return it
eg. mydomain.com:80 returns 80
"""
if ':' in domain:
if domain.startswith('did:'):
return None
portStr = domain.split(':')[1]
if portStr.isdigit():
return int(portStr)
return None

View File

@ -61,6 +61,8 @@ from tests import testUpdateActor
from tests import runAllTests
from auth import storeBasicCredentials
from auth import createPassword
from domainhandler import removeDomainPort
from domainhandler import getPortFromDomain
from utils import hasUsersPath
from utils import getFullDomain
from utils import setConfigParam
@ -1080,8 +1082,8 @@ if args.message:
toDomain = toDomain.replace('\n', '').replace('\r', '')
toPort = 443
if ':' in toDomain:
toPort = toDomain.split(':')[1]
toDomain = toDomain.split(':')[0]
toPort = getPortFromDomain(toDomain)
toDomain = removeDomainPort(toDomain)
else:
if args.sendto.endswith('followers'):
toNickname = None

View File

@ -9,6 +9,7 @@ __module_group__ = "ActivityPub"
from pprint import pprint
import os
from domainhandler import removeDomainPort
from utils import hasObjectDict
from utils import hasUsersPath
from utils import getFullDomain
@ -153,8 +154,7 @@ def isFollowingActor(baseDir: str,
"""Is the given nickname following the given actor?
The actor can also be a handle: nickname@domain
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
return False
@ -205,8 +205,7 @@ def isFollowerOfPerson(baseDir: str, nickname: str, domain: str,
followerNickname: str, followerDomain: str) -> bool:
"""is the given nickname a follower of followerNickname?
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
followersFile = baseDir + '/accounts/' + \
nickname + '@' + domain + '/followers.txt'
if not os.path.isfile(followersFile):
@ -243,8 +242,7 @@ def unfollowAccount(baseDir: str, nickname: str, domain: str,
debug: bool = False) -> bool:
"""Removes a person to the follow list
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
handle = nickname + '@' + domain
handleToUnfollow = followNickname + '@' + followDomain
if not os.path.isdir(baseDir + '/accounts'):
@ -433,8 +431,7 @@ def getFollowingFeed(baseDir: str, domain: str, port: int, path: str,
}
handleDomain = domain
if ':' in handleDomain:
handleDomain = domain.split(':')[0]
handleDomain = removeDomainPort(handleDomain)
handle = nickname + '@' + handleDomain
filename = baseDir + '/accounts/' + handle + '/' + followFile + '.txt'
if not os.path.isfile(filename):
@ -493,8 +490,7 @@ def _followApprovalRequired(baseDir: str, nicknameToFollow: str,
return False
manuallyApproveFollows = False
if ':' in domainToFollow:
domainToFollow = domainToFollow.split(':')[0]
domainToFollow = removeDomainPort(domainToFollow)
actorFilename = baseDir + '/accounts/' + \
nicknameToFollow + '@' + domainToFollow + '.json'
if os.path.isfile(actorFilename):

View File

@ -8,6 +8,7 @@ __status__ = "Production"
__module_group__ = "Calendar"
import os
from domainhandler import removeDomainPort
def receivingCalendarEvents(baseDir: str, nickname: str, domain: str,
@ -43,8 +44,7 @@ def _receiveCalendarEvents(baseDir: str, nickname: str, domain: str,
indicating whether to receive calendar events from that account
"""
# check that a following file exists
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
followingFilename = baseDir + '/accounts/' + \
nickname + '@' + domain + '/following.txt'
if not os.path.isfile(followingFilename):

2
git.py
View File

@ -5,7 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "ActivityPub"
__module_group__ = "Core"
import os
import html

View File

@ -5,7 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Calendar"
__module_group__ = "Core"
import os
from uuid import UUID

View File

@ -13,6 +13,8 @@ import datetime
import time
import random
from linked_data_sig import verifyJsonSignature
from domainhandler import removeDomainPort
from domainhandler import getPortFromDomain
from utils import hasObjectDict
from utils import dmAllowedFromDomain
from utils import isRecentPost
@ -186,8 +188,7 @@ def _inboxStorePostToHtmlCache(recentPostsCache: {}, maxRecentPosts: int,
def validInbox(baseDir: str, nickname: str, domain: str) -> bool:
"""Checks whether files were correctly saved to the inbox
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
inboxDir = baseDir + '/accounts/' + nickname + '@' + domain + '/inbox'
if not os.path.isdir(inboxDir):
return True
@ -209,8 +210,7 @@ def validInboxFilenames(baseDir: str, nickname: str, domain: str,
"""Used by unit tests to check that the port number gets appended to
domain names within saved post filenames
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
inboxDir = baseDir + '/accounts/' + nickname + '@' + domain + '/inbox'
if not os.path.isdir(inboxDir):
return True
@ -358,8 +358,7 @@ def savePostToInboxQueue(baseDir: str, httpPrefix: str,
str(len(messageBytes)) + ' bytes')
return None
originalDomain = domain
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
# block at the ealiest stage possible, which means the data
# isn't written to file
@ -536,8 +535,7 @@ def _inboxPostRecipients(baseDir: str, postJsonObject: {},
print('WARNING: inbox post has no actor')
return recipientsDict, recipientsDictFollowers
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
domainBase = domain
domain = getFullDomain(domain, port)
domainMatch = '/' + domain + '/users/'
@ -1124,8 +1122,7 @@ def _receiveBookmark(recentPostsCache: {},
print('DEBUG: c2s inbox bookmark Add request arrived in outbox')
messageUrl = removeIdEnding(messageJson['object']['url'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageUrl)
if not postFilename:
if debug:
@ -1200,8 +1197,7 @@ def _receiveUndoBookmark(recentPostsCache: {},
'request arrived in outbox')
messageUrl = removeIdEnding(messageJson['object']['url'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageUrl)
if not postFilename:
if debug:
@ -1521,11 +1517,15 @@ def jsonPostAllowsComments(postJsonObject: {}) -> bool:
"""
if 'commentsEnabled' in postJsonObject:
return postJsonObject['commentsEnabled']
if 'rejectReplies' in postJsonObject:
return not postJsonObject['rejectReplies']
if postJsonObject.get('object'):
if not hasObjectDict(postJsonObject):
return False
elif 'commentsEnabled' in postJsonObject['object']:
return postJsonObject['object']['commentsEnabled']
elif 'rejectReplies' in postJsonObject['object']:
return not postJsonObject['object']['rejectReplies']
return True
@ -1580,24 +1580,6 @@ def populateReplies(baseDir: str, httpPrefix: str, domain: str,
print('DEBUG: post may have expired - ' + replyTo)
return False
# TODO store replies collection
# replyItem = {
# "type": "Document",
# "url": replyTo
# }
# if not messageJson['object'].get('replies'):
# messageJson['object']['replies'] = {
# "items": [replyItem]
# }
# else:
# found = False
# for item in messageJson['object']['replies']['items']:
# if item['url'] == replyTo:
# found = True
# break
# if not found:
# messageJson['object']['replies']['items'].append(replyItem)
#
if not _postAllowsComments(postFilename):
if debug:
print('DEBUG: post does not allow comments - ' + replyTo)
@ -1975,8 +1957,7 @@ def _sendToGroupMembers(session, baseDir: str, handle: str, port: int,
# set subject
if not postJsonObject['object'].get('summary'):
postJsonObject['object']['summary'] = 'General Discussion'
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
with open(followersFile, 'r') as groupMembers:
for memberHandle in groupMembers:
if memberHandle != handle:
@ -1984,10 +1965,8 @@ def _sendToGroupMembers(session, baseDir: str, handle: str, port: int,
memberDomain = memberHandle.split('@')[1]
memberPort = port
if ':' in memberDomain:
memberPortStr = memberDomain.split(':')[1]
if memberPortStr.isdigit():
memberPort = int(memberPortStr)
memberDomain = memberDomain.split(':')[0]
memberPort = getPortFromDomain(memberDomain)
memberDomain = removeDomainPort(memberDomain)
sendSignedJson(postJsonObject, session, baseDir,
nickname, domain, port,
memberNickname, memberDomain, memberPort, cc,
@ -2078,8 +2057,7 @@ def _updateLastSeen(baseDir: str, handle: str, actor: str) -> None:
return
nickname = handle.split('@')[0]
domain = handle.split('@')[1]
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
accountPath = baseDir + '/accounts/' + nickname + '@' + domain
if not os.path.isdir(accountPath):
return
@ -2130,10 +2108,8 @@ def _bounceDM(senderPostId: str, session, httpPrefix: str,
senderDomain = sendingHandle.split('@')[1]
senderPort = port
if ':' in senderDomain:
senderPortStr = senderDomain.split(':')[1]
if senderPortStr.isdigit():
senderPort = int(senderPortStr)
senderDomain = senderDomain.split(':')[0]
senderPort = getPortFromDomain(senderDomain)
senderDomain = removeDomainPort(senderDomain)
cc = []
# create the bounce DM

View File

@ -7,6 +7,7 @@ __email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "ActivityPub"
from domainhandler import removeDomainPort
from utils import hasObjectDict
from utils import hasUsersPath
from utils import getFullDomain
@ -327,8 +328,7 @@ def outboxLike(recentPostsCache: {},
print('DEBUG: c2s like request arrived in outbox')
messageId = removeIdEnding(messageJson['object'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
@ -377,8 +377,7 @@ def outboxUndoLike(recentPostsCache: {},
print('DEBUG: c2s undo like request arrived in outbox')
messageId = removeIdEnding(messageJson['object']['object'])
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:

View File

@ -12,6 +12,8 @@ from follow import followedAccountAccepts
from follow import followedAccountRejects
from follow import removeFromFollowRequests
from utils import loadJson
from domainhandler import removeDomainPort
from domainhandler import getPortFromDomain
def manualDenyFollowRequest(session, baseDir: str,
@ -49,8 +51,8 @@ def manualDenyFollowRequest(session, baseDir: str,
denyHandle.split('@')[1].replace('\n', '').replace('\r', '')
denyPort = port
if ':' in denyDomain:
denyPort = denyDomain.split(':')[1]
denyDomain = denyDomain.split(':')[0]
denyPort = getPortFromDomain(denyDomain)
denyDomain = removeDomainPort(denyDomain)
followedAccountRejects(session, baseDir, httpPrefix,
nickname, domain, port,
denyNickname, denyDomain, denyPort,
@ -141,9 +143,7 @@ def manualApproveFollowRequest(session, baseDir: str,
handleOfFollowRequester.replace('\r', '')
port2 = port
if ':' in handleOfFollowRequester:
port2Str = handleOfFollowRequester.split(':')[1]
if port2Str.isdigit():
port2 = int(port2Str)
port2 = getPortFromDomain(handleOfFollowRequester)
requestsDir = accountDir + '/requests'
followActivityfilename = \
requestsDir + '/' + handleOfFollowRequester + '.follow'
@ -158,8 +158,8 @@ def manualApproveFollowRequest(session, baseDir: str,
approveDomain.replace('\r', '')
approvePort = port2
if ':' in approveDomain:
approvePort = approveDomain.split(':')[1]
approveDomain = approveDomain.split(':')[0]
approvePort = getPortFromDomain(approveDomain)
approveDomain = removeDomainPort(approveDomain)
print('Manual follow accept: Sending Accept for ' +
handle + ' follow request from ' +
approveNickname + '@' + approveDomain)

168
markdown.py 100644
View File

@ -0,0 +1,168 @@
__filename__ = "markdown.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Web Interface"
def _markdownEmphasisHtml(markdown: str) -> str:
"""Add italics and bold html markup to the given markdown
"""
replacements = {
' **': ' <b>',
'** ': '</b> ',
'**.': '</b>.',
'**:': '</b>:',
'**;': '</b>;',
'**,': '</b>,',
'**\n': '</b>\n',
' *': ' <i>',
'* ': '</i> ',
'*.': '</i>.',
'*:': '</i>:',
'*;': '</i>;',
'*,': '</i>,',
'*\n': '</i>\n',
' _': ' <ul>',
'_ ': '</ul> ',
'_.': '</ul>.',
'_:': '</ul>:',
'_;': '</ul>;',
'_,': '</ul>,',
'_\n': '</ul>\n'
}
for md, html in replacements.items():
markdown = markdown.replace(md, html)
if markdown.startswith('**'):
markdown = markdown[2:] + '<b>'
elif markdown.startswith('*'):
markdown = markdown[1:] + '<i>'
elif markdown.startswith('_'):
markdown = markdown[1:] + '<ul>'
if markdown.endswith('**'):
markdown = markdown[:len(markdown) - 2] + '</b>'
elif markdown.endswith('*'):
markdown = markdown[:len(markdown) - 1] + '</i>'
elif markdown.endswith('_'):
markdown = markdown[:len(markdown) - 1] + '</ul>'
return markdown
def _markdownReplaceQuotes(markdown: str) -> str:
"""Replaces > quotes with html blockquote
"""
if '> ' not in markdown:
return markdown
lines = markdown.split('\n')
result = ''
prevQuoteLine = None
for line in lines:
if '> ' not in line:
result += line + '\n'
prevQuoteLine = None
continue
lineStr = line.strip()
if not lineStr.startswith('> '):
result += line + '\n'
prevQuoteLine = None
continue
lineStr = lineStr.replace('> ', '', 1).strip()
if prevQuoteLine:
newPrevLine = prevQuoteLine.replace('</i></blockquote>\n', '')
result = result.replace(prevQuoteLine, newPrevLine) + ' '
lineStr += '</i></blockquote>\n'
else:
lineStr = '<blockquote><i>' + lineStr + '</i></blockquote>\n'
result += lineStr
prevQuoteLine = lineStr
if '</blockquote>\n' in result:
result = result.replace('</blockquote>\n', '</blockquote>')
if result.endswith('\n') and \
not markdown.endswith('\n'):
result = result[:len(result) - 1]
return result
def _markdownReplaceLinks(markdown: str, images: bool = False) -> str:
"""Replaces markdown links with html
Optionally replace image links
"""
replaceLinks = {}
text = markdown
startChars = '['
if images:
startChars = '!['
while startChars in text:
if ')' not in text:
break
text = text.split(startChars, 1)[1]
markdownLink = startChars + text.split(')')[0] + ')'
if ']' not in markdownLink or \
'(' not in markdownLink:
text = text.split(')', 1)[1]
continue
if not images:
replaceLinks[markdownLink] = \
'<a href="' + \
markdownLink.split('(')[1].split(')')[0] + \
'" target="_blank" rel="nofollow noopener noreferrer">' + \
markdownLink.split(startChars)[1].split(']')[0] + \
'</a>'
else:
replaceLinks[markdownLink] = \
'<img class="markdownImage" src="' + \
markdownLink.split('(')[1].split(')')[0] + \
'" alt="' + \
markdownLink.split(startChars)[1].split(']')[0] + \
'" />'
text = text.split(')', 1)[1]
for mdLink, htmlLink in replaceLinks.items():
markdown = markdown.replace(mdLink, htmlLink)
return markdown
def markdownToHtml(markdown: str) -> str:
"""Converts markdown formatted text to html
"""
markdown = _markdownReplaceQuotes(markdown)
markdown = _markdownEmphasisHtml(markdown)
markdown = _markdownReplaceLinks(markdown, True)
markdown = _markdownReplaceLinks(markdown)
# replace headers
linesList = markdown.split('\n')
htmlStr = ''
ctr = 0
for line in linesList:
if ctr > 0:
htmlStr += '<br>'
if line.startswith('#####'):
line = line.replace('#####', '').strip()
line = '<h5>' + line + '</h5>'
ctr = -1
elif line.startswith('####'):
line = line.replace('####', '').strip()
line = '<h4>' + line + '</h4>'
ctr = -1
elif line.startswith('###'):
line = line.replace('###', '').strip()
line = '<h3>' + line + '</h3>'
ctr = -1
elif line.startswith('##'):
line = line.replace('##', '').strip()
line = '<h2>' + line + '</h2>'
ctr = -1
elif line.startswith('#'):
line = line.replace('#', '').strip()
line = '<h1>' + line + '</h1>'
ctr = -1
htmlStr += line
ctr += 1
return htmlStr

View File

@ -5,7 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Metadata"
__module_group__ = "Core"
import os
from utils import loadJson

View File

@ -8,6 +8,7 @@ __status__ = "Production"
__module_group__ = "Core"
import os
from utils import isAccountDir
from utils import getNicknameFromActor
from utils import getDomainFromActor
from webfinger import webfingerHandle
@ -186,11 +187,7 @@ def migrateAccounts(baseDir: str, session,
ctr = 0
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for handle in dirs:
if '@' not in handle:
continue
if handle.startswith('inbox@'):
continue
if handle.startswith('news@'):
if not isAccountDir(handle):
continue
nickname = handle.split('@')[0]
domain = handle.split('@')[1]

View File

@ -38,6 +38,7 @@ from roles import setRole
from roles import setRolesFromList
from roles import getActorRolesList
from media import processMetaData
from domainhandler import removeDomainPort
from utils import getStatusNumber
from utils import getFullDomain
from utils import validNickname
@ -48,10 +49,12 @@ from utils import getConfigParam
from utils import refreshNewswire
from utils import getProtocolPrefixes
from utils import hasUsersPath
from utils import getImageExtensions
from session import createSession
from session import getJson
from webfinger import webfingerHandle
from pprint import pprint
from cache import getPersonFromCache
def generateRSAKey() -> (str, str):
@ -93,8 +96,7 @@ def setProfileImage(baseDir: str, httpPrefix: str, nickname: str, domain: str,
if imageFilename.startswith('~/'):
imageFilename = imageFilename.replace('~/', str(Path.home()) + '/')
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
fullDomain = getFullDomain(domain, port)
handle = nickname + '@' + domain
@ -147,8 +149,7 @@ def setProfileImage(baseDir: str, httpPrefix: str, nickname: str, domain: str,
def _accountExists(baseDir: str, nickname: str, domain: str) -> bool:
"""Returns true if the given account exists
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
return os.path.isdir(baseDir + '/accounts/' + nickname + '@' + domain) or \
os.path.isdir(baseDir + '/deactivated/' + nickname + '@' + domain)
@ -720,8 +721,7 @@ def personLookup(domain: str, path: str, baseDir: str) -> {}:
return None
if not isSharedInbox and not validNickname(domain, nickname):
return None
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
handle = nickname + '@' + domain
filename = baseDir + '/accounts/' + handle + '.json'
if not os.path.isfile(filename):
@ -1349,3 +1349,31 @@ def getActorJson(hostDomain: str, handle: str, http: bool, gnunet: bool,
pprint(personJson)
return personJson, asHeader
return None, None
def getPersonAvatarUrl(baseDir: str, personUrl: str, personCache: {},
allowDownloads: bool) -> str:
"""Returns the avatar url for the person
"""
personJson = \
getPersonFromCache(baseDir, personUrl, personCache, allowDownloads)
if not personJson:
return None
# get from locally stored image
if not personJson.get('id'):
return None
actorStr = personJson['id'].replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
imageExtension = getImageExtensions()
for ext in imageExtension:
if os.path.isfile(avatarImagePath + '.' + ext):
return '/avatars/' + actorStr + '.' + ext
elif os.path.isfile(avatarImagePath.lower() + '.' + ext):
return '/avatars/' + actorStr.lower() + '.' + ext
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None

View File

@ -32,6 +32,8 @@ from session import postImage
from webfinger import webfingerHandle
from httpsig import createSignedHeader
from siteactive import siteIsActive
from domainhandler import removeDomainPort
from domainhandler import getPortFromDomain
from utils import hasObjectDict
from utils import rejectPostId
from utils import removeInvalidChars
@ -683,8 +685,7 @@ def savePostToBox(baseDir: str, httpPrefix: str, postId: str,
boxname != 'scheduled':
return None
originalDomain = domain
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
if not postId:
statusNumber, published = getStatusNumber()
@ -1077,6 +1078,7 @@ def _createPostBase(baseDir: str, nickname: str, domain: str, port: int,
'atomUri': newPostId,
'inReplyToAtomUri': inReplyToAtomUri,
'commentsEnabled': commentsEnabled,
'rejectReplies': not commentsEnabled,
'mediaType': 'text/html',
'content': content,
'contentMap': {
@ -1128,6 +1130,7 @@ def _createPostBase(baseDir: str, nickname: str, domain: str, port: int,
'atomUri': newPostId,
'inReplyToAtomUri': inReplyToAtomUri,
'commentsEnabled': commentsEnabled,
'rejectReplies': not commentsEnabled,
'mediaType': 'text/html',
'content': content,
'contentMap': {
@ -1689,7 +1692,7 @@ def getMentionedPeople(baseDir: str, httpPrefix: str,
mentionedNickname = handle.split('@')[0]
mentionedDomain = handle.split('@')[1].strip('\n').strip('\r')
if ':' in mentionedDomain:
mentionedDomain = mentionedDomain.split(':')[0]
mentionedDomain = removeDomainPort(mentionedDomain)
if not validNickname(mentionedDomain, mentionedNickname):
continue
actor = \
@ -2665,8 +2668,8 @@ def sendToFollowers(session, baseDir: str,
index = 0
toDomain = followerHandles[index].split('@')[1]
if ':' in toDomain:
toPort = toDomain.split(':')[1]
toDomain = toDomain.split(':')[0]
toPort = getPortFromDomain(toDomain)
toDomain = removeDomainPort(toDomain)
cc = ''

View File

@ -15,6 +15,7 @@ JSON-LD.
__copyright__ = 'Copyright (c) 2011-2014 Digital Bazaar, Inc.'
__license__ = 'New BSD license'
__version__ = '0.6.8'
__module_group__ = "ActivityPub"
__all__ = [
'compact', 'expand', 'flatten', 'frame', 'link', 'from_rdf', 'to_rdf',

View File

@ -10,6 +10,7 @@ import os
from utils import loadJson
from utils import saveJson
from utils import getStatusNumber
from domainhandler import removeDomainPort
def _clearRoleStatus(baseDir: str, role: str) -> None:
@ -75,8 +76,7 @@ def _addRole(baseDir: str, nickname: str, domain: str,
"""Adds a role nickname to the file.
This is a file containing the nicknames of accounts having this role
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
roleFile = baseDir + '/accounts/' + roleFilename
if os.path.isfile(roleFile):
# is this nickname already in the file?

View File

@ -20,6 +20,7 @@ from utils import loadJson
from utils import saveJson
from utils import getImageExtensions
from utils import hasObjectDict
from domainhandler import removeDomainPort
from media import processMetaData
@ -187,9 +188,7 @@ def expireShares(baseDir: str) -> None:
def _expireSharesForAccount(baseDir: str, nickname: str, domain: str) -> None:
"""Removes expired items from shares for a particular account
"""
handleDomain = domain
if ':' in handleDomain:
handleDomain = domain.split(':')[0]
handleDomain = removeDomainPort(domain)
handle = nickname + '@' + handleDomain
sharesFilename = baseDir + '/accounts/' + handle + '/shares.json'
if os.path.isfile(sharesFilename):
@ -250,9 +249,7 @@ def getSharesFeedForPerson(baseDir: str,
domain = getFullDomain(domain, port)
handleDomain = domain
if ':' in handleDomain:
handleDomain = domain.split(':')[0]
handleDomain = removeDomainPort(domain)
handle = nickname + '@' + handleDomain
sharesFilename = baseDir + '/accounts/' + handle + '/shares.json'

View File

@ -5,6 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Testing"
import time
import os
@ -115,8 +116,8 @@ from newswire import parseFeedDate
from mastoapiv1 import getMastoApiV1IdFromNickname
from mastoapiv1 import getNicknameFromMastoApiV1Id
from webapp_post import prepareHtmlPostNickname
from webapp_utils import markdownToHtml
from speaker import speakerReplaceLinks
from markdown import markdownToHtml
testServerAliceRunning = False
testServerBobRunning = False
@ -2359,6 +2360,16 @@ def _testJsonPostAllowsComments():
"commentsEnabled": False
}
assert not jsonPostAllowsComments(postJsonObject)
postJsonObject = {
"id": "123",
"rejectReplies": False
}
assert jsonPostAllowsComments(postJsonObject)
postJsonObject = {
"id": "123",
"rejectReplies": True
}
assert not jsonPostAllowsComments(postJsonObject)
postJsonObject = {
"id": "123",
"commentsEnabled": True

View File

@ -7,6 +7,7 @@ __email__ = "bob@freedombone.net"
__status__ = "Production"
import os
from utils import isAccountDir
from utils import loadJson
from utils import saveJson
from utils import getImageExtensions
@ -623,11 +624,7 @@ def _setThemeImages(baseDir: str, name: str) -> None:
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if acct.startswith('inbox@'):
continue
elif acct.startswith('news@'):
if not isAccountDir(acct):
continue
accountDir = \
os.path.join(baseDir + '/accounts', acct)

278
utils.py
View File

@ -5,6 +5,7 @@ __version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "ActivityPu"
import os
import re
@ -15,6 +16,8 @@ import json
import idna
import locale
from pprint import pprint
from domainhandler import removeDomainPort
from domainhandler import getPortFromDomain
from followingCalendar import addPersonToCalendar
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
@ -417,8 +420,7 @@ def getFollowersOfPerson(baseDir: str,
Used by the shared inbox to know who to send incoming mail to
"""
followers = []
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
handle = nickname + '@' + domain
if not os.path.isdir(baseDir + '/accounts/' + handle):
return followers
@ -645,8 +647,7 @@ def createInboxQueueDir(nickname: str, domain: str, baseDir: str) -> str:
def domainPermitted(domain: str, federationList: []):
if len(federationList) == 0:
return True
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
if domain in federationList:
return True
return False
@ -742,83 +743,90 @@ def getDisplayName(baseDir: str, actor: str, personCache: {}) -> str:
return nameFound
def _genderFromString(translate: {}, text: str) -> str:
"""Given some text, does it contain a gender description?
"""
gender = None
textOrig = text
text = text.lower()
if translate['He/Him'].lower() in text or \
translate['boy'].lower() in text:
gender = 'He/Him'
elif (translate['She/Her'].lower() in text or
translate['girl'].lower() in text):
gender = 'She/Her'
elif 'him' in text or 'male' in text:
gender = 'He/Him'
elif 'her' in text or 'she' in text or \
'fem' in text or 'woman' in text:
gender = 'She/Her'
elif 'man' in text or 'He' in textOrig:
gender = 'He/Him'
return gender
def getGenderFromBio(baseDir: str, actor: str, personCache: {},
translate: {}) -> str:
"""Tries to ascertain gender from bio description
This is for use by text-to-speech for pitch setting
"""
defaultGender = 'They/Them'
if '/statuses/' in actor:
actor = actor.split('/statuses/')[0]
if not personCache.get(actor):
return None
return defaultGender
bioFound = None
if translate:
pronounStr = translate['pronoun'].lower()
else:
pronounStr = 'pronoun'
actorJson = None
if personCache[actor].get('actor'):
# is gender defined as a profile tag?
if personCache[actor]['actor'].get('attachment'):
tagsList = personCache[actor]['actor']['attachment']
if isinstance(tagsList, list):
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
if tag['name'].lower() == \
translate['gender'].lower():
bioFound = tag['value']
break
elif tag['name'].lower().startswith(pronounStr):
bioFound = tag['value']
break
# if not then use the bio
if not bioFound and personCache[actor]['actor'].get('summary'):
bioFound = personCache[actor]['actor']['summary']
actorJson = personCache[actor]['actor']
else:
# Try to obtain from the cached actors
cachedActorFilename = \
baseDir + '/cache/actors/' + (actor.replace('/', '#')) + '.json'
if os.path.isfile(cachedActorFilename):
actorJson = loadJson(cachedActorFilename, 1)
if actorJson:
# is gender defined as a profile tag?
if actorJson.get('attachment'):
tagsList = actorJson['attachment']
if isinstance(tagsList, list):
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
if tag['name'].lower() == \
translate['gender'].lower():
bioFound = tag['value']
break
elif tag['name'].lower().startswith(pronounStr):
bioFound = tag['value']
break
# if not then use the bio
if not bioFound and actorJson.get('summary'):
bioFound = actorJson['summary']
if not actorJson:
return defaultGender
# is gender defined as a profile tag?
if actorJson.get('attachment'):
tagsList = actorJson['attachment']
if isinstance(tagsList, list):
# look for a gender field name
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
if tag['name'].lower() == \
translate['gender'].lower():
bioFound = tag['value']
break
elif tag['name'].lower().startswith(pronounStr):
bioFound = tag['value']
break
# the field name could be anything,
# just look at the value
if not bioFound:
for tag in tagsList:
if not isinstance(tag, dict):
continue
if not tag.get('name') or not tag.get('value'):
continue
gender = _genderFromString(translate, tag['value'])
if gender:
return gender
# if not then use the bio
if not bioFound and actorJson.get('summary'):
bioFound = actorJson['summary']
if not bioFound:
return None
gender = 'They/Them'
bioFoundOrig = bioFound
bioFound = bioFound.lower()
if translate['He/Him'].lower() in bioFound or \
translate['boy'].lower() in bioFound:
gender = 'He/Him'
elif (translate['She/Her'].lower() in bioFound or
translate['girl'].lower() in bioFound):
gender = 'She/Her'
elif 'him' in bioFound or 'male' in bioFound:
gender = 'He/Him'
elif 'her' in bioFound or 'she' in bioFound or \
'fem' in bioFound or 'woman' in bioFound:
gender = 'She/Her'
elif 'man' in bioFound or 'He' in bioFoundOrig:
gender = 'He/Him'
return defaultGender
gender = _genderFromString(translate, bioFound)
if not gender:
gender = defaultGender
return gender
@ -827,56 +835,34 @@ def getNicknameFromActor(actor: str) -> str:
"""
if actor.startswith('@'):
actor = actor[1:]
if '/users/' not in actor:
if '/profile/' in actor:
nickStr = actor.split('/profile/')[1].replace('@', '')
usersPaths = ('/users/', '/profile/', '/channel/', '/accounts/', '/u/')
for possiblePath in usersPaths:
if possiblePath in actor:
nickStr = actor.split(possiblePath)[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/channel/' in actor:
nickStr = actor.split('/channel/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/accounts/' in actor:
nickStr = actor.split('/accounts/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/u/' in actor:
nickStr = actor.split('/u/')[1].replace('@', '')
if '/' not in nickStr:
return nickStr
else:
return nickStr.split('/')[0]
elif '/@' in actor:
# https://domain/@nick
nickStr = actor.split('/@')[1]
if '/' in nickStr:
nickStr = nickStr.split('/')[0]
return nickStr
elif '@' in actor:
nickStr = actor.split('@')[0]
return nickStr
elif '://' in actor:
domain = actor.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in actor:
return None
nickStr = actor.split('://' + domain + '/')[1]
if '/' in nickStr or '.' in nickStr:
return None
return nickStr
return None
nickStr = actor.split('/users/')[1].replace('@', '')
if '/' not in nickStr:
if '/@' in actor:
# https://domain/@nick
nickStr = actor.split('/@')[1]
if '/' in nickStr:
nickStr = nickStr.split('/')[0]
return nickStr
else:
return nickStr.split('/')[0]
elif '@' in actor:
nickStr = actor.split('@')[0]
return nickStr
elif '://' in actor:
domain = actor.split('://')[1]
if '/' in domain:
domain = domain.split('/')[0]
if '://' + domain + '/' not in actor:
return None
nickStr = actor.split('://' + domain + '/')[1]
if '/' in nickStr or '.' in nickStr:
return None
return nickStr
return None
def getDomainFromActor(actor: str) -> (str, int):
@ -886,27 +872,14 @@ def getDomainFromActor(actor: str) -> (str, int):
actor = actor[1:]
port = None
prefixes = getProtocolPrefixes()
if '/profile/' in actor:
domain = actor.split('/profile/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/accounts/' in actor:
domain = actor.split('/accounts/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/channel/' in actor:
domain = actor.split('/channel/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/users/' in actor:
domain = actor.split('/users/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/u/' in actor:
domain = actor.split('/u/')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
elif '/@' in actor:
usersPaths = ('/users/', '/profile/', '/accounts/', '/channel/', '/u/')
for possiblePath in usersPaths:
if possiblePath in actor:
domain = actor.split(possiblePath)[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
break
if '/@' in actor:
domain = actor.split('/@')[0]
for prefix in prefixes:
domain = domain.replace(prefix, '')
@ -919,11 +892,8 @@ def getDomainFromActor(actor: str) -> (str, int):
if '/' in actor:
domain = domain.split('/')[0]
if ':' in domain:
portStr = domain.split(':')[1]
if not portStr.isdigit():
return None, None
port = int(portStr)
domain = domain.split(':')[0]
port = getPortFromDomain(domain)
domain = removeDomainPort(domain)
return domain, port
@ -932,8 +902,7 @@ def _setDefaultPetName(baseDir: str, nickname: str, domain: str,
"""Sets a default petname
This helps especially when using onion or i2p address
"""
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domain)
userPath = baseDir + '/accounts/' + nickname + '@' + domain
petnamesFilename = userPath + '/petnames.txt'
@ -975,7 +944,8 @@ def followPerson(baseDir: str, nickname: str, domain: str,
print('DEBUG: follow of domain ' + followDomain)
if ':' in domain:
handle = nickname + '@' + domain.split(':')[0]
domainOnly = removeDomainPort(domain)
handle = nickname + '@' + domainOnly
else:
handle = nickname + '@' + domain
@ -984,7 +954,8 @@ def followPerson(baseDir: str, nickname: str, domain: str,
return False
if ':' in followDomain:
handleToFollow = followNickname + '@' + followDomain.split(':')[0]
followDomainOnly = removeDomainPort(followDomain)
handleToFollow = followNickname + '@' + followDomainOnly
else:
handleToFollow = followNickname + '@' + followDomain
@ -1189,10 +1160,6 @@ def _removeAttachment(baseDir: str, httpPrefix: str, domain: str,
return
if not postJson['attachment'][0].get('url'):
return
# if port:
# if port != 80 and port != 443:
# if ':' not in domain:
# domain = domain + ':' + str(port)
attachmentUrl = postJson['attachment'][0]['url']
if not attachmentUrl:
return
@ -1487,18 +1454,18 @@ def noOfActiveAccountsMonthly(baseDir: str, months: int) -> bool:
monthSeconds = int(60*60*24*30*months)
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for account in dirs:
if '@' in account:
if not account.startswith('inbox@') and \
not account.startswith('news@'):
lastUsedFilename = \
baseDir + '/accounts/' + account + '/.lastUsed'
if os.path.isfile(lastUsedFilename):
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff = (currTime - int(lastUsed))
if timeDiff < monthSeconds:
accountCtr += 1
if not isAccountDir(account):
continue
lastUsedFilename = \
baseDir + '/accounts/' + account + '/.lastUsed'
if not os.path.isfile(lastUsedFilename):
continue
with open(lastUsedFilename, 'r') as lastUsedFile:
lastUsed = lastUsedFile.read()
if lastUsed.isdigit():
timeDiff = (currTime - int(lastUsed))
if timeDiff < monthSeconds:
accountCtr += 1
break
return accountCtr
@ -1824,13 +1791,6 @@ def getFileCaseInsensitive(path: str) -> str:
if path != path.lower():
if os.path.isfile(path.lower()):
return path.lower()
# directory, filename = os.path.split(path)
# directory, filename = (directory or '.'), filename.lower()
# for f in os.listdir(directory):
# if f.lower() == filename:
# newpath = os.path.join(directory, f)
# if os.path.isfile(newpath):
# return newpath
return None

View File

@ -12,7 +12,7 @@ from shutil import copyfile
from utils import getConfigParam
from webapp_utils import htmlHeaderWithWebsiteMarkup
from webapp_utils import htmlFooter
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
def htmlAbout(cssCache: {}, baseDir: str, httpPrefix: str,

View File

@ -18,6 +18,7 @@ from utils import getDomainFromActor
from utils import locatePost
from utils import loadJson
from utils import weekDayOfMonthStart
from domainhandler import removeDomainPort
from happening import getTodaysEvents
from happening import getCalendarEvents
from webapp_utils import htmlHeaderWithExternalStyle
@ -247,9 +248,7 @@ def htmlCalendar(personCache: {}, cssCache: {}, translate: {},
textModeBanner: str, accessKeys: {}) -> str:
"""Show the calendar for a person
"""
domain = domainFull
if ':' in domainFull:
domain = domainFull.split(':')[0]
domain = removeDomainPort(domainFull)
monthNumber = 0
dayNumber = None
@ -460,7 +459,6 @@ def htmlCalendar(personCache: {}, cssCache: {}, translate: {},
htmlHideFromScreenReader('') + ' ' + translate['Previous month']
navLinks[prevMonthStr] = calActor + '/calendar?year=' + str(prevYear) + \
'?month=' + str(prevMonthNumber)
# TODO
navAccessKeys = {
}
screenReaderCal = \

View File

@ -11,6 +11,7 @@ import os
from utils import getConfigParam
from utils import getNicknameFromActor
from utils import isEditor
from domainhandler import removeDomainPort
from webapp_utils import sharesTimelineJson
from webapp_utils import htmlPostSeparator
from webapp_utils import getLeftImageFile
@ -76,9 +77,7 @@ def getLeftColumnContent(baseDir: str, nickname: str, domainFull: str,
htmlStr = ''
separatorStr = htmlPostSeparator(baseDir, 'left')
domain = domainFull
if ':' in domain:
domain = domain.split(':')
domain = removeDomainPort(domainFull)
editImageClass = ''
if showHeaderImage:
@ -298,9 +297,7 @@ def htmlLinksMobile(cssCache: {}, baseDir: str,
else:
editor = isEditor(baseDir, nickname)
domain = domainFull
if ':' in domain:
domain = domain.split(':')[0]
domain = removeDomainPort(domainFull)
instanceTitle = \
getConfigParam(baseDir, 'instanceTitle')

View File

@ -17,6 +17,7 @@ from utils import votesOnNewswireItem
from utils import getNicknameFromActor
from utils import isEditor
from utils import getConfigParam
from domainhandler import removeDomainPort
from posts import isModerator
from webapp_utils import getRightImageFile
from webapp_utils import htmlHeaderWithExternalStyle
@ -58,9 +59,7 @@ def getRightColumnContent(baseDir: str, nickname: str, domainFull: str,
"""
htmlStr = ''
domain = domainFull
if ':' in domain:
domain = domain.split(':')
domain = removeDomainPort(domainFull)
if authorized:
# only show the publish button if logged in, otherwise replace it with

View File

@ -0,0 +1,40 @@
__filename__ = "webapp_minimalbutton.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
__module_group__ = "Web Interface"
import os
def isMinimal(baseDir: str, domain: str, nickname: str) -> bool:
"""Returns true if minimal buttons should be shown
for the given account
"""
accountDir = baseDir + '/accounts/' + \
nickname + '@' + domain
if not os.path.isdir(accountDir):
return True
minimalFilename = accountDir + '/.notminimal'
if os.path.isfile(minimalFilename):
return False
return True
def setMinimal(baseDir: str, domain: str, nickname: str,
minimal: bool) -> None:
"""Sets whether an account should display minimal buttons
"""
accountDir = baseDir + '/accounts/' + nickname + '@' + domain
if not os.path.isdir(accountDir):
return
minimalFilename = accountDir + '/.notminimal'
minimalFileExists = os.path.isfile(minimalFilename)
if minimal and minimalFileExists:
os.remove(minimalFilename)
elif not minimal and not minimalFileExists:
with open(minimalFilename, 'w+') as fp:
fp.write('\n')

View File

@ -8,6 +8,7 @@ __status__ = "Production"
__module_group__ = "Web Interface"
import os
from utils import isAccountDir
from utils import getFullDomain
from utils import isEditor
from utils import loadJson
@ -270,11 +271,7 @@ def htmlModerationInfo(cssCache: {}, translate: {},
accounts = []
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if acct.startswith('inbox@'):
continue
elif acct.startswith('news@'):
if not isAccountDir(acct):
continue
accounts.append(acct)
break

View File

@ -52,9 +52,9 @@ from content import removeLongWords
from content import getMentionsFromHtml
from content import switchWords
from person import isPersonSnoozed
from person import getPersonAvatarUrl
from announce import announcedByPerson
from webapp_utils import getAvatarImageUrl
from webapp_utils import getPersonAvatarUrl
from webapp_utils import updateAvatarImageCache
from webapp_utils import loadIndividualPostAsHtmlFromCache
from webapp_utils import addEmojiToDisplayName
@ -1401,6 +1401,9 @@ def individualPostAsHtml(allowDownloads: bool,
if 'commentsEnabled' in postJsonObject['object']:
if postJsonObject['object']['commentsEnabled'] is False:
commentsEnabled = False
elif 'rejectReplies' in postJsonObject['object']:
if postJsonObject['object']['rejectReplies']:
commentsEnabled = False
replyStr = _getReplyIconHtml(nickname, isPublicRepeat,
showIcons, commentsEnabled,

View File

@ -26,6 +26,7 @@ from skills import getSkills
from theme import getThemesList
from person import personBoxJson
from person import getActorJson
from person import getPersonAvatarUrl
from webfinger import webfingerHandle
from posts import parseUserFeed
from posts import getPersonBox
@ -45,7 +46,6 @@ from webapp_frontscreen import htmlFrontScreen
from webapp_utils import htmlKeyboardNavigation
from webapp_utils import htmlHideFromScreenReader
from webapp_utils import scheduledPostsExist
from webapp_utils import getPersonAvatarUrl
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlHeaderWithPersonMarkup
from webapp_utils import htmlFooter

View File

@ -11,6 +11,7 @@ import os
from shutil import copyfile
import urllib.parse
from datetime import datetime
from utils import isAccountDir
from utils import getConfigParam
from utils import getFullDomain
from utils import isEditor
@ -407,11 +408,7 @@ def htmlSkillsSearch(actor: str,
for f in files:
if not f.endswith('.json'):
continue
if '@' not in f:
continue
if f.startswith('inbox@'):
continue
elif f.startswith('news@'):
if not isAccountDir(f):
continue
actorFilename = os.path.join(subdir, f)
actorJson = loadJson(actorFilename)
@ -446,11 +443,7 @@ def htmlSkillsSearch(actor: str,
for f in files:
if not f.endswith('.json'):
continue
if '@' not in f:
continue
if f.startswith('inbox@'):
continue
elif f.startswith('news@'):
if not isAccountDir(f):
continue
actorFilename = os.path.join(subdir, f)
cachedActorJson = loadJson(actorFilename)

View File

@ -17,7 +17,7 @@ from utils import isEditor
from utils import removeIdEnding
from follow import followerApprovalActive
from person import isPersonSnoozed
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
from webapp_utils import htmlKeyboardNavigation
from webapp_utils import htmlHideFromScreenReader
from webapp_utils import htmlPostSeparator

View File

@ -12,7 +12,7 @@ from shutil import copyfile
from utils import getConfigParam
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
def htmlTermsOfService(cssCache: {}, baseDir: str,

View File

@ -16,170 +16,10 @@ from utils import getProtocolPrefixes
from utils import loadJson
from utils import getCachedPostFilename
from utils import getConfigParam
from cache import getPersonFromCache
from cache import storePersonInCache
from content import addHtmlTags
from content import replaceEmojiFromTags
def _markdownEmphasisHtml(markdown: str) -> str:
"""Add italics and bold html markup to the given markdown
"""
replacements = {
' **': ' <b>',
'** ': '</b> ',
'**.': '</b>.',
'**:': '</b>:',
'**;': '</b>;',
'**,': '</b>,',
'**\n': '</b>\n',
' *': ' <i>',
'* ': '</i> ',
'*.': '</i>.',
'*:': '</i>:',
'*;': '</i>;',
'*,': '</i>,',
'*\n': '</i>\n',
' _': ' <ul>',
'_ ': '</ul> ',
'_.': '</ul>.',
'_:': '</ul>:',
'_;': '</ul>;',
'_,': '</ul>,',
'_\n': '</ul>\n'
}
for md, html in replacements.items():
markdown = markdown.replace(md, html)
if markdown.startswith('**'):
markdown = markdown[2:] + '<b>'
elif markdown.startswith('*'):
markdown = markdown[1:] + '<i>'
elif markdown.startswith('_'):
markdown = markdown[1:] + '<ul>'
if markdown.endswith('**'):
markdown = markdown[:len(markdown) - 2] + '</b>'
elif markdown.endswith('*'):
markdown = markdown[:len(markdown) - 1] + '</i>'
elif markdown.endswith('_'):
markdown = markdown[:len(markdown) - 1] + '</ul>'
return markdown
def _markdownReplaceQuotes(markdown: str) -> str:
"""Replaces > quotes with html blockquote
"""
if '> ' not in markdown:
return markdown
lines = markdown.split('\n')
result = ''
prevQuoteLine = None
for line in lines:
if '> ' not in line:
result += line + '\n'
prevQuoteLine = None
continue
lineStr = line.strip()
if not lineStr.startswith('> '):
result += line + '\n'
prevQuoteLine = None
continue
lineStr = lineStr.replace('> ', '', 1).strip()
if prevQuoteLine:
newPrevLine = prevQuoteLine.replace('</i></blockquote>\n', '')
result = result.replace(prevQuoteLine, newPrevLine) + ' '
lineStr += '</i></blockquote>\n'
else:
lineStr = '<blockquote><i>' + lineStr + '</i></blockquote>\n'
result += lineStr
prevQuoteLine = lineStr
if '</blockquote>\n' in result:
result = result.replace('</blockquote>\n', '</blockquote>')
if result.endswith('\n') and \
not markdown.endswith('\n'):
result = result[:len(result) - 1]
return result
def _markdownReplaceLinks(markdown: str, images: bool = False) -> str:
"""Replaces markdown links with html
Optionally replace image links
"""
replaceLinks = {}
text = markdown
startChars = '['
if images:
startChars = '!['
while startChars in text:
if ')' not in text:
break
text = text.split(startChars, 1)[1]
markdownLink = startChars + text.split(')')[0] + ')'
if ']' not in markdownLink or \
'(' not in markdownLink:
text = text.split(')', 1)[1]
continue
if not images:
replaceLinks[markdownLink] = \
'<a href="' + \
markdownLink.split('(')[1].split(')')[0] + \
'" target="_blank" rel="nofollow noopener noreferrer">' + \
markdownLink.split(startChars)[1].split(']')[0] + \
'</a>'
else:
replaceLinks[markdownLink] = \
'<img class="markdownImage" src="' + \
markdownLink.split('(')[1].split(')')[0] + \
'" alt="' + \
markdownLink.split(startChars)[1].split(']')[0] + \
'" />'
text = text.split(')', 1)[1]
for mdLink, htmlLink in replaceLinks.items():
markdown = markdown.replace(mdLink, htmlLink)
return markdown
def markdownToHtml(markdown: str) -> str:
"""Converts markdown formatted text to html
"""
markdown = _markdownReplaceQuotes(markdown)
markdown = _markdownEmphasisHtml(markdown)
markdown = _markdownReplaceLinks(markdown, True)
markdown = _markdownReplaceLinks(markdown)
# replace headers
linesList = markdown.split('\n')
htmlStr = ''
ctr = 0
for line in linesList:
if ctr > 0:
htmlStr += '<br>'
if line.startswith('#####'):
line = line.replace('#####', '').strip()
line = '<h5>' + line + '</h5>'
ctr = -1
elif line.startswith('####'):
line = line.replace('####', '').strip()
line = '<h4>' + line + '</h4>'
ctr = -1
elif line.startswith('###'):
line = line.replace('###', '').strip()
line = '<h3>' + line + '</h3>'
ctr = -1
elif line.startswith('##'):
line = line.replace('##', '').strip()
line = '<h2>' + line + '</h2>'
ctr = -1
elif line.startswith('#'):
line = line.replace('#', '').strip()
line = '<h1>' + line + '</h1>'
ctr = -1
htmlStr += line
ctr += 1
return htmlStr
from person import getPersonAvatarUrl
def getBrokenLinkSubstitute() -> str:
@ -533,34 +373,6 @@ def updateAvatarImageCache(session, baseDir: str, httpPrefix: str,
return avatarImageFilename.replace(baseDir + '/cache', '')
def getPersonAvatarUrl(baseDir: str, personUrl: str, personCache: {},
allowDownloads: bool) -> str:
"""Returns the avatar url for the person
"""
personJson = \
getPersonFromCache(baseDir, personUrl, personCache, allowDownloads)
if not personJson:
return None
# get from locally stored image
if not personJson.get('id'):
return None
actorStr = personJson['id'].replace('/', '-')
avatarImagePath = baseDir + '/cache/avatars/' + actorStr
imageExtension = getImageExtensions()
for ext in imageExtension:
if os.path.isfile(avatarImagePath + '.' + ext):
return '/avatars/' + actorStr + '.' + ext
elif os.path.isfile(avatarImagePath.lower() + '.' + ext):
return '/avatars/' + actorStr.lower() + '.' + ext
if personJson.get('icon'):
if personJson['icon'].get('url'):
return personJson['icon']['url']
return None
def scheduledPostsExist(baseDir: str, nickname: str, domain: str) -> bool:
"""Returns true if there are posts scheduled to be delivered
"""
@ -1359,33 +1171,3 @@ def htmlKeyboardNavigation(banner: str, links: {}, accessKeys: {},
str(title) + '</a></label></li>\n'
htmlStr += '</ul></div>\n'
return htmlStr
def isMinimal(baseDir: str, domain: str, nickname: str) -> bool:
"""Returns true if minimal buttons should be shown
for the given account
"""
accountDir = baseDir + '/accounts/' + \
nickname + '@' + domain
if not os.path.isdir(accountDir):
return True
minimalFilename = accountDir + '/.notminimal'
if os.path.isfile(minimalFilename):
return False
return True
def setMinimal(baseDir: str, domain: str, nickname: str,
minimal: bool) -> None:
"""Sets whether an account should display minimal buttons
"""
accountDir = baseDir + '/accounts/' + nickname + '@' + domain
if not os.path.isdir(accountDir):
return
minimalFilename = accountDir + '/.notminimal'
minimalFileExists = os.path.isfile(minimalFilename)
if minimal and minimalFileExists:
os.remove(minimalFilename)
elif not minimal and not minimalFileExists:
with open(minimalFilename, 'w+') as fp:
fp.write('\n')

View File

@ -13,7 +13,7 @@ from utils import getConfigParam
from utils import removeHtml
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
def isWelcomeScreenComplete(baseDir: str, nickname: str, domain: str) -> bool:

View File

@ -13,7 +13,7 @@ from utils import removeHtml
from utils import getConfigParam
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
def htmlWelcomeFinal(baseDir: str, nickname: str, domain: str,

View File

@ -16,7 +16,7 @@ from utils import getImageExtensions
from utils import getImageFormats
from webapp_utils import htmlHeaderWithExternalStyle
from webapp_utils import htmlFooter
from webapp_utils import markdownToHtml
from markdown import markdownToHtml
def htmlWelcomeProfile(baseDir: str, nickname: str, domain: str,

View File

@ -17,6 +17,7 @@ from utils import loadJson
from utils import loadJsonOnionify
from utils import saveJson
from utils import getProtocolPrefixes
from domainhandler import removeDomainPort
def _parseHandle(handle: str) -> (str, str):
@ -53,13 +54,8 @@ def webfingerHandle(session, handle: str, httpPrefix: str,
nickname, domain = _parseHandle(handle)
if not nickname:
return None
wfDomain = domain
if ':' in wfDomain:
# wfPortStr=wfDomain.split(':')[1]
# if wfPortStr.isdigit():
# wfPort=int(wfPortStr)
# if wfPort==80 or wfPort==443:
wfDomain = wfDomain.split(':')[0]
wfDomain = removeDomainPort(domain)
wf = getWebfingerFromCache(nickname + '@' + wfDomain,
cachedWebfingers)
if wf: