Test for stranded functions which aren't called

merge-requests/8/head
Bob Mottram 2020-12-22 12:59:46 +00:00
parent 96e813181b
commit a7b094f84f
16 changed files with 97 additions and 883 deletions

View File

@ -183,135 +183,6 @@ def announcePublic(session, baseDir: str, federationList: [],
debug, projectVersion)
def repeatPost(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
announceNickname: str, announceDomain: str,
announcePort: int, announceHttpsPrefix: str,
announceStatusNumber: int, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool, projectVersion: str) -> {}:
"""Repeats a given status post
"""
announcedDomain = getFullDomain(announceDomain, announcePort)
objectUrl = announceHttpsPrefix + '://' + announcedDomain + '/users/' + \
announceNickname + '/statuses/' + str(announceStatusNumber)
return announcePublic(session, baseDir, federationList,
nickname, domain, port, httpPrefix,
objectUrl, clientToServer,
sendThreads, postLog,
personCache, cachedWebfingers,
debug, projectVersion)
def undoAnnounce(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int,
toUrl: str, ccUrl: str, httpPrefix: str,
objectUrl: str, saveToFile: bool,
clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Undoes an announce message
Typically toUrl will be https://www.w3.org/ns/activitystreams#Public
and ccUrl might be a specific person whose post was repeated and the
objectUrl is typically the url of the message which was repeated,
corresponding to url or atomUri in createPostBase
"""
if not urlPermitted(objectUrl, federationList):
return None
if ':' in domain:
domain = domain.split(':')[0]
fullDomain = getFullDomain(domain, port)
newUndoAnnounce = {
"@context": "https://www.w3.org/ns/activitystreams",
'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
'type': 'Undo',
'cc': [],
'to': [toUrl],
'object': {
'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
'cc': [],
'object': objectUrl,
'to': [toUrl],
'type': 'Announce'
}
}
if ccUrl:
if len(ccUrl) > 0:
newUndoAnnounce['object']['cc'] = [ccUrl]
announceNickname = None
announceDomain = None
announcePort = None
if '/users/' in objectUrl or \
'/accounts/' in objectUrl or \
'/channel/' in objectUrl or \
'/profile/' in objectUrl:
announceNickname = getNicknameFromActor(objectUrl)
announceDomain, announcePort = getDomainFromActor(objectUrl)
if announceNickname and announceDomain:
sendSignedJson(newUndoAnnounce, session, baseDir,
nickname, domain, port,
announceNickname, announceDomain, announcePort,
'https://www.w3.org/ns/activitystreams#Public',
httpPrefix, True, clientToServer, federationList,
sendThreads, postLog, cachedWebfingers,
personCache, debug)
return newUndoAnnounce
def undoAnnouncePublic(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
objectUrl: str, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Undoes a public announcement
"""
fromDomain = getFullDomain(domain, port)
toUrl = 'https://www.w3.org/ns/activitystreams#Public'
ccUrl = httpPrefix + '://' + fromDomain + '/users/' + nickname + \
'/followers'
return undoAnnounce(session, baseDir, federationList,
nickname, domain, port,
toUrl, ccUrl, httpPrefix,
objectUrl, True, clientToServer,
sendThreads, postLog,
personCache, cachedWebfingers,
debug)
def undoRepeatPost(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
announceNickname: str, announceDomain: str,
announcePort: int, announceHttpsPrefix: str,
announceStatusNumber: int, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Undoes a status post repeat
"""
announcedDomain = getFullDomain(announceDomain, announcePort)
objectUrl = announceHttpsPrefix + '://' + announcedDomain + '/users/' + \
announceNickname + '/statuses/' + str(announceStatusNumber)
return undoAnnouncePublic(session, baseDir, federationList,
nickname, domain, port, httpPrefix,
objectUrl, clientToServer,
sendThreads, postLog,
personCache, cachedWebfingers,
debug)
def sendAnnounceViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,

19
blog.py
View File

@ -599,25 +599,6 @@ def htmlBlogPageRSS3(authorized: bool, session,
return blogRSS3
def getBlogIndexesForAccounts(baseDir: str) -> {}:
""" Get the index files for blogs for each account
and add them to a dict
"""
blogIndexes = {}
for subdir, dirs, files in os.walk(baseDir + '/accounts'):
for acct in dirs:
if '@' not in acct:
continue
if 'inbox@' in acct:
continue
accountDir = os.path.join(baseDir + '/accounts', acct)
blogsIndex = accountDir + '/tlblogs.index'
if os.path.isfile(blogsIndex):
blogIndexes[acct] = blogsIndex
break
return blogIndexes
def noOfBlogAccounts(baseDir: str) -> int:
"""Returns the number of blog accounts
"""

View File

@ -39,16 +39,6 @@ alphabet = \
alphabet_values = dict(zip(alphabet, range(len(alphabet))))
def base83_decode(base83_str):
"""
Decodes a base83 string, as used in blurhash, to an integer.
"""
value = 0
for base83_char in base83_str:
value = value * 83 + alphabet_values[base83_char]
return value
def base83_encode(value, length):
"""
Decodes an integer to a base83 string, as used in blurhash.
@ -94,101 +84,6 @@ def linear_to_srgb(value):
return int((1.055 * math.pow(value, 1 / 2.4) - 0.055) * 255 + 0.5)
def blurhash_components(blurhash):
"""
Decodes and returns the number of x and y components in the given blurhash.
"""
if len(blurhash) < 6:
raise ValueError("BlurHash must be at least 6 characters long.")
# Decode metadata
size_info = base83_decode(blurhash[0])
size_y = int(size_info / 9) + 1
size_x = (size_info % 9) + 1
return size_x, size_y
def blurhash_decode(blurhash, width, height, punch=1.0, linear=False):
"""
Decodes the given blurhash to an image of the specified size.
Returns the resulting image a list of lists of 3-value sRGB 8 bit integer
lists. Set linear to True if you would prefer to get linear floating point
RGB back.
The punch parameter can be used to de- or increase the contrast of the
resulting image.
As per the original implementation it is suggested to only decode
to a relatively small size and then scale the result up, as it
basically looks the same anyways.
"""
if len(blurhash) < 6:
raise ValueError("BlurHash must be at least 6 characters long.")
# Decode metadata
size_info = base83_decode(blurhash[0])
size_y = int(size_info / 9) + 1
size_x = (size_info % 9) + 1
quant_max_value = base83_decode(blurhash[1])
real_max_value = (float(quant_max_value + 1) / 166.0) * punch
# Make sure we at least have the right number of characters
if len(blurhash) != 4 + 2 * size_x * size_y:
raise ValueError("Invalid BlurHash length.")
# Decode DC component
dc_value = base83_decode(blurhash[2:6])
colours = [(
srgb_to_linear(dc_value >> 16),
srgb_to_linear((dc_value >> 8) & 255),
srgb_to_linear(dc_value & 255)
)]
# Decode AC components
for component in range(1, size_x * size_y):
ac_value = base83_decode(blurhash[4+component*2:4+(component+1)*2])
colours.append((
sign_pow((float(int(ac_value / (19 * 19))) - 9.0)
/ 9.0, 2.0) * real_max_value,
sign_pow((float(int(ac_value / 19) % 19) - 9.0)
/ 9.0, 2.0) * real_max_value,
sign_pow((float(ac_value % 19) - 9.0)
/ 9.0, 2.0) * real_max_value
))
# Return image RGB values, as a list of lists of lists,
# consumable by something like numpy or PIL.
pixels = []
for y in range(height):
pixel_row = []
for x in range(width):
pixel = [0.0, 0.0, 0.0]
for j in range(size_y):
for i in range(size_x):
basis = \
math.cos(math.pi * float(x) * float(i) /
float(width)) * \
math.cos(math.pi * float(y) * float(j) / float(height))
colour = colours[i + j * size_x]
pixel[0] += colour[0] * basis
pixel[1] += colour[1] * basis
pixel[2] += colour[2] * basis
if linear is False:
pixel_row.append([
linear_to_srgb(pixel[0]),
linear_to_srgb(pixel[1]),
linear_to_srgb(pixel[2]),
])
else:
pixel_row.append(pixel)
pixels.append(pixel_row)
return pixels
def blurhash_encode(image, components_x=4, components_y=4, linear=False):
"""
Calculates the blurhash for an image using the given x and y

View File

@ -18,10 +18,6 @@ from utils import locatePost
from utils import getCachedPostFilename
from utils import loadJson
from utils import saveJson
from session import postJson
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from posts import getPersonBox
def undoBookmarksCollectionEntry(recentPostsCache: {},
@ -283,32 +279,6 @@ def bookmark(recentPostsCache: {},
return newBookmarkJson
def bookmarkPost(recentPostsCache: {},
session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
bookmarkNickname: str, bookmarkedomain: str,
bookmarkPort: int,
ccList: [],
bookmarkStatusNumber: int, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool, projectVersion: str) -> {}:
"""Bookmarks a given status post. This is only used by unit tests
"""
bookmarkedomain = getFullDomain(bookmarkedomain, bookmarkPort)
actorBookmarked = httpPrefix + '://' + bookmarkedomain + \
'/users/' + bookmarkNickname
objectUrl = actorBookmarked + '/statuses/' + str(bookmarkStatusNumber)
return bookmark(recentPostsCache,
session, baseDir, federationList, nickname, domain, port,
ccList, httpPrefix, objectUrl, actorBookmarked,
clientToServer,
sendThreads, postLog, personCache, cachedWebfingers,
debug, projectVersion)
def undoBookmark(recentPostsCache: {},
session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int,
@ -375,180 +345,6 @@ def undoBookmark(recentPostsCache: {},
return newUndoBookmarkJson
def undoBookmarkPost(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
bookmarkNickname: str, bookmarkedomain: str,
bookmarkPort: int, ccList: [],
bookmarkStatusNumber: int, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Removes a bookmarked post
"""
bookmarkedomain = getFullDomain(bookmarkedomain, bookmarkPort)
objectUrl = httpPrefix + '://' + bookmarkedomain + \
'/users/' + bookmarkNickname + \
'/statuses/' + str(bookmarkStatusNumber)
return undoBookmark(session, baseDir, federationList,
nickname, domain, port,
ccList, httpPrefix, objectUrl, clientToServer,
sendThreads, postLog, personCache,
cachedWebfingers, debug)
def sendBookmarkViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, bookmarkUrl: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str) -> {}:
"""Creates a bookmark via c2s
"""
if not session:
print('WARN: No session for sendBookmarkViaServer')
return 6
fromDomainFull = getFullDomain(fromDomain, fromPort)
newBookmarkJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Bookmark',
'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
'object': bookmarkUrl
}
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest = webfingerHandle(session, handle, httpPrefix,
cachedWebfingers,
fromDomain, projectVersion)
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for ' + handle)
return 1
if not isinstance(wfRequest, dict):
print('WARN: Webfinger for ' + handle + ' did not return a dict. ' +
str(wfRequest))
return 1
postToBox = 'outbox'
# get the actor inbox for the To handle
(inboxUrl, pubKeyId, pubKey,
fromPersonId, sharedInbox, avatarUrl,
displayName) = getPersonBox(baseDir, session, wfRequest, personCache,
projectVersion, httpPrefix, fromNickname,
fromDomain, postToBox, 72483)
if not inboxUrl:
if debug:
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for ' + handle)
return 4
authHeader = createBasicAuthHeader(fromNickname, password)
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
}
postResult = postJson(session, newBookmarkJson, [],
inboxUrl, headers)
if not postResult:
if debug:
print('DEBUG: POST announce failed for c2s to ' + inboxUrl)
return 5
if debug:
print('DEBUG: c2s POST bookmark success')
return newBookmarkJson
def sendUndoBookmarkViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, bookmarkUrl: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str) -> {}:
"""Undo a bookmark via c2s
"""
if not session:
print('WARN: No session for sendUndoBookmarkViaServer')
return 6
fromDomainFull = getFullDomain(fromDomain, fromPort)
newUndoBookmarkJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Undo',
'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
'object': {
'type': 'Bookmark',
'actor': httpPrefix+'://'+fromDomainFull+'/users/'+fromNickname,
'object': bookmarkUrl
}
}
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest = webfingerHandle(session, handle, httpPrefix, cachedWebfingers,
fromDomain, projectVersion)
if not wfRequest:
if debug:
print('DEBUG: announce webfinger failed for ' + handle)
return 1
if not isinstance(wfRequest, dict):
print('WARN: Webfinger for ' + handle + ' did not return a dict. ' +
str(wfRequest))
return 1
postToBox = 'outbox'
# get the actor inbox for the To handle
(inboxUrl, pubKeyId, pubKey,
fromPersonId, sharedInbox, avatarUrl,
displayName) = getPersonBox(baseDir, session, wfRequest, personCache,
projectVersion, httpPrefix, fromNickname,
fromDomain, postToBox, 72528)
if not inboxUrl:
if debug:
print('DEBUG: No ' + postToBox + ' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: No actor was found for ' + handle)
return 4
authHeader = createBasicAuthHeader(fromNickname, password)
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
}
postResult = postJson(session, newUndoBookmarkJson, [],
inboxUrl, headers)
if not postResult:
if debug:
print('DEBUG: POST announce failed for c2s to ' + inboxUrl)
return 5
if debug:
print('DEBUG: c2s POST undo bookmark success')
return newUndoBookmarkJson
def outboxBookmark(recentPostsCache: {},
baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,

View File

@ -438,34 +438,6 @@ def addHashTags(wordStr: str, httpPrefix: str, domain: str,
return True
def loadEmojiDict(emojiDataFilename: str, emojiDict: {}) -> None:
"""Creates an emoji dictionary based on emoji/emoji-data.txt
"""
if not os.path.isfile(emojiDataFilename):
return
with open(emojiDataFilename, "r") as fileHandler:
for line in fileHandler:
if len(line) < 5:
continue
if line.startswith('#'):
continue
if '; Emoji' not in line:
continue
if ')' not in line:
continue
emojiUnicode = line.split(' ')[0]
if len(emojiUnicode) < 4:
continue
if '..' in emojiUnicode:
emojiUnicode = emojiUnicode.split('..')[0]
emojiName = line.split(')', 1)[1].strip()
emojiName = emojiName.replace('\n', '').replace('\r', '')
emojiName = emojiName.replace(' ', '').replace('-', '')
if '..' in emojiName:
emojiName = emojiName.split('..')[0]
emojiDict[emojiName.lower()] = emojiUnicode
def addEmoji(baseDir: str, wordStr: str,
httpPrefix: str, domain: str,
replaceEmoji: {}, postTags: {},

View File

@ -21,7 +21,6 @@ import pyqrcode
from hashlib import sha256
from hashlib import sha1
from session import createSession
from webfinger import parseHandle
from webfinger import webfingerMeta
from webfinger import webfingerNodeInfo
from webfinger import webfingerLookup
@ -275,22 +274,6 @@ def saveDomainQrcode(baseDir: str, httpPrefix: str,
url.png(qrcodeFilename, scale)
def readFollowList(filename: str) -> None:
"""Returns a list of ActivityPub addresses to follow
"""
followlist = []
if not os.path.isfile(filename):
return followlist
followUsers = open(filename, "r")
for u in followUsers:
if u not in followlist:
nickname, domain = parseHandle(u)
if nickname:
followlist.append(nickname + '@' + domain)
followUsers.close()
return followlist
class PubServer(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'

111
delete.py
View File

@ -10,82 +10,17 @@ import os
from datetime import datetime
from utils import getFullDomain
from utils import removeIdEnding
from utils import getStatusNumber
from utils import urlPermitted
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import deletePost
from utils import removeModerationPostFromIndex
from posts import sendSignedJson
from session import postJson
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from posts import getPersonBox
def createDelete(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int,
toUrl: str, ccUrl: str, httpPrefix: str,
objectUrl: str, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Creates a delete message
Typically toUrl will be https://www.w3.org/ns/activitystreams#Public
and ccUrl might be a specific person whose post is to be deleted
objectUrl is typically the url of the message, corresponding to url
or atomUri in createPostBase
"""
if not urlPermitted(objectUrl, federationList):
return None
if ':' in domain:
domain = domain.split(':')[0]
fullDomain = domain
fullDomain = getFullDomain(domain, port)
statusNumber, published = getStatusNumber()
newDeleteId = \
httpPrefix + '://' + fullDomain + '/users/' + \
nickname + '/statuses/' + statusNumber
newDelete = {
"@context": "https://www.w3.org/ns/activitystreams",
'actor': httpPrefix+'://'+fullDomain+'/users/'+nickname,
'atomUri': newDeleteId,
'cc': [],
'id': newDeleteId + '/activity',
'object': objectUrl,
'published': published,
'to': [toUrl],
'type': 'Delete'
}
if ccUrl:
if len(ccUrl) > 0:
newDelete['cc'] = [ccUrl]
deleteNickname = None
deleteDomain = None
deletePort = None
if '/users/' in objectUrl or \
'/accounts/' in objectUrl or \
'/channel/' in objectUrl or \
'/profile/' in objectUrl:
deleteNickname = getNicknameFromActor(objectUrl)
deleteDomain, deletePort = getDomainFromActor(objectUrl)
if deleteNickname and deleteDomain:
sendSignedJson(newDelete, session, baseDir,
nickname, domain, port,
deleteNickname, deleteDomain, deletePort,
'https://www.w3.org/ns/activitystreams#Public',
httpPrefix, True, clientToServer, federationList,
sendThreads, postLog, cachedWebfingers,
personCache, debug)
return newDelete
def sendDeleteViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
@ -167,52 +102,6 @@ def sendDeleteViaServer(baseDir: str, session,
return newDeleteJson
def deletePublic(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
objectUrl: str, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Makes a public delete activity
"""
fromDomain = getFullDomain(domain, port)
toUrl = 'https://www.w3.org/ns/activitystreams#Public'
ccUrl = httpPrefix + '://' + fromDomain + \
'/users/' + nickname + '/followers'
return createDelete(session, baseDir, federationList,
nickname, domain, port,
toUrl, ccUrl, httpPrefix,
objectUrl, clientToServer,
sendThreads, postLog,
personCache, cachedWebfingers,
debug)
def deletePostPub(session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
deleteNickname: str, deleteDomain: str,
deletePort: int, deleteHttpsPrefix: str,
deleteStatusNumber: int, clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool) -> {}:
"""Deletes a given status post
"""
deletedDomain = getFullDomain(deleteDomain, deletePort)
objectUrl = \
deleteHttpsPrefix + '://' + deletedDomain + '/users/' + \
deleteNickname + '/statuses/' + str(deleteStatusNumber)
return deletePublic(session, baseDir, federationList,
nickname, domain, port, httpPrefix,
objectUrl, clientToServer,
sendThreads, postLog,
personCache, cachedWebfingers,
debug)
def outboxDelete(baseDir: str, httpPrefix: str,
nickname: str, domain: str,
messageJson: {}, debug: bool,

View File

@ -313,15 +313,6 @@ def inboxPermittedMessage(domain: str, messageJson: {},
return True
def validPublishedDate(published: str) -> bool:
currTime = datetime.datetime.utcnow()
pubDate = datetime.datetime.strptime(published, "%Y-%m-%dT%H:%M:%SZ")
daysSincePublished = (currTime - pubDate).days
if daysSincePublished > 30:
return False
return True
def savePostToInboxQueue(baseDir: str, httpPrefix: str,
nickname: str, domain: str,
postJsonObject: {},

70
like.py
View File

@ -141,76 +141,6 @@ def likePost(recentPostsCache: {},
debug, projectVersion)
def undolike(recentPostsCache: {},
session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int,
ccList: [], httpPrefix: str,
objectUrl: str, actorLiked: str,
clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool, projectVersion: str) -> {}:
"""Removes a like
actor is the person doing the liking
'to' might be a specific person (actor) whose post was liked
object is typically the url of the message which was liked
"""
if not urlPermitted(objectUrl, federationList):
return None
fullDomain = getFullDomain(domain, port)
newUndoLikeJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Undo',
'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
'object': {
'type': 'Like',
'actor': httpPrefix + '://' + fullDomain + '/users/' + nickname,
'object': objectUrl
}
}
if ccList:
if len(ccList) > 0:
newUndoLikeJson['cc'] = ccList
newUndoLikeJson['object']['cc'] = ccList
# Extract the domain and nickname from a statuses link
likedPostNickname = None
likedPostDomain = None
likedPostPort = None
if actorLiked:
likedPostNickname = getNicknameFromActor(actorLiked)
likedPostDomain, likedPostPort = getDomainFromActor(actorLiked)
else:
if '/users/' in objectUrl or \
'/accounts/' in objectUrl or \
'/channel/' in objectUrl or \
'/profile/' in objectUrl:
likedPostNickname = getNicknameFromActor(objectUrl)
likedPostDomain, likedPostPort = getDomainFromActor(objectUrl)
if likedPostNickname:
postFilename = locatePost(baseDir, nickname, domain, objectUrl)
if not postFilename:
return None
undoLikesCollectionEntry(baseDir, postFilename, objectUrl,
newUndoLikeJson['actor'], domain, debug)
sendSignedJson(newUndoLikeJson, session, baseDir,
nickname, domain, port,
likedPostNickname, likedPostDomain, likedPostPort,
'https://www.w3.org/ns/activitystreams#Public',
httpPrefix, True, clientToServer, federationList,
sendThreads, postLog, cachedWebfingers, personCache,
debug, projectVersion)
else:
return None
return newUndoLikeJson
def sendLikeViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,

View File

@ -6,7 +6,7 @@ __maintainer__ = "Bob Mottram"
__email__ = "bob@freedombone.net"
__status__ = "Production"
from blurhash import blurhash_encode as blurencode
from blurhash import blurhash_encode
from PIL import Image
import numpy
import os
@ -57,7 +57,8 @@ def removeMetaData(imageFilename: str, outputFilename: str) -> None:
def getImageHash(imageFilename: str) -> str:
return blurencode(numpy.array(Image.open(imageFilename).convert("RGB")))
value = numpy.array(Image.open(imageFilename).convert("RGB"))
return blurhash_encode(value)
def isMedia(imageFilename: str) -> bool:

View File

@ -722,48 +722,6 @@ def personBoxJson(recentPostsCache: {},
return None
def personInboxJson(recentPostsCache: {},
baseDir: str, domain: str, port: int, path: str,
httpPrefix: str, noOfItems: int) -> []:
"""Obtain the inbox feed for the given person
Authentication is expected to have already happened
"""
if '/inbox' not in path:
return None
# Only show the header by default
headerOnly = True
# handle page numbers
pageNumber = None
if '?page=' in path:
pageNumber = path.split('?page=')[1]
if pageNumber == 'true':
pageNumber = 1
else:
try:
pageNumber = int(pageNumber)
except BaseException:
pass
path = path.split('?page=')[0]
headerOnly = False
if not path.endswith('/inbox'):
return None
nickname = None
if path.startswith('/users/'):
nickname = path.replace('/users/', '', 1).replace('/inbox', '')
if path.startswith('/@'):
nickname = path.replace('/@', '', 1).replace('/inbox', '')
if not nickname:
return None
if not validNickname(domain, nickname):
return None
return createInbox(recentPostsCache, baseDir, nickname,
domain, port, httpPrefix,
noOfItems, headerOnly, pageNumber)
def setDisplayNickname(baseDir: str, nickname: str, domain: str,
displayName: str) -> bool:
if len(displayName) > 32:

View File

@ -1215,28 +1215,6 @@ def postIsAddressedToFollowers(baseDir: str,
return addressedToFollowers
def postIsAddressedToPublic(baseDir: str, postJsonObject: {}) -> bool:
"""Returns true if the given post is addressed to public
"""
if not postJsonObject.get('object'):
return False
if not postJsonObject['object'].get('to'):
return False
publicUrl = 'https://www.w3.org/ns/activitystreams#Public'
# does the public url exist in 'to' or 'cc' lists?
addressedToPublic = False
if publicUrl in postJsonObject['object']['to']:
addressedToPublic = True
if not addressedToPublic:
if not postJsonObject['object'].get('cc'):
return False
if publicUrl in postJsonObject['object']['cc']:
addressedToPublic = True
return addressedToPublic
def createPublicPost(baseDir: str,
nickname: str, domain: str, port: int, httpPrefix: str,
content: str, followersOnly: bool, saveToFile: bool,
@ -2737,17 +2715,6 @@ def createModeration(baseDir: str, nickname: str, domain: str, port: int,
return boxItems
def getStatusNumberFromPostFilename(filename) -> int:
"""Gets the status number from a post filename
eg. https:##testdomain.com:8085#users#testuser567#
statuses#1562958506952068.json
returns 156295850695206
"""
if '#statuses#' not in filename:
return None
return int(filename.split('#')[-1].replace('.json', ''))
def isDM(postJsonObject: {}) -> bool:
"""Returns true if the given post is a DM
"""
@ -2849,66 +2816,6 @@ def isReply(postJsonObject: {}, actor: str) -> bool:
return False
def createBoxIndex(boxDir: str, postsInBoxDict: {}) -> int:
""" Creates an index for the given box
"""
postsCtr = 0
postsInPersonInbox = os.scandir(boxDir)
for postFilename in postsInPersonInbox:
postFilename = postFilename.name
if not postFilename.endswith('.json'):
continue
# extract the status number
statusNumber = getStatusNumberFromPostFilename(postFilename)
if statusNumber:
postsInBoxDict[statusNumber] = os.path.join(boxDir, postFilename)
postsCtr += 1
return postsCtr
def createSharedInboxIndex(baseDir: str, sharedBoxDir: str,
postsInBoxDict: {}, postsCtr: int,
nickname: str, domain: str) -> int:
""" Creates an index for the given shared inbox
"""
handle = nickname + '@' + domain
followingFilename = baseDir + '/accounts/' + handle + '/following.txt'
postsInSharedInbox = os.scandir(sharedBoxDir)
followingHandles = None
for postFilename in postsInSharedInbox:
postFilename = postFilename.name
if not postFilename.endswith('.json'):
continue
statusNumber = getStatusNumberFromPostFilename(postFilename)
if not statusNumber:
continue
sharedInboxFilename = os.path.join(sharedBoxDir, postFilename)
# get the actor from the shared post
postJsonObject = loadJson(sharedInboxFilename, 0)
if not postJsonObject:
print('WARN: json load exception createSharedInboxIndex')
continue
actorNickname = getNicknameFromActor(postJsonObject['actor'])
if not actorNickname:
continue
actorDomain, actorPort = getDomainFromActor(postJsonObject['actor'])
if not actorDomain:
continue
# is the actor followed by this account?
if not followingHandles:
with open(followingFilename, 'r') as followingFile:
followingHandles = followingFile.read()
if actorNickname + '@' + actorDomain not in followingHandles:
continue
postsInBoxDict[statusNumber] = sharedInboxFilename
postsCtr += 1
return postsCtr
def addPostStringToTimeline(postStr: str, boxname: str,
postsInBox: [], boxActor: str) -> bool:
""" is this a valid timeline post?

View File

@ -41,17 +41,6 @@ def setSkillLevel(baseDir: str, nickname: str, domain: str,
return True
def setSkills(baseDir: str, nickname: str, domain: str, skills: {}) -> None:
actorFilename = baseDir + '/accounts/' + nickname + '@' + domain + '.json'
if not os.path.isfile(actorFilename):
return False
actorJson = loadJson(actorFilename)
if actorJson:
actorJson['skills'] = skills
saveJson(actorJson, actorFilename)
def getSkills(baseDir: str, nickname: str, domain: str) -> []:
"""Returns the skills for a given person
"""

View File

@ -2533,8 +2533,102 @@ def testReplyToPublicPost() -> None:
httpPrefix + '://rat.site/users/ninjarodent'
def testFunctions():
print('testFunctions')
function = {}
functionProperties = {}
modules = []
for subdir, dirs, files in os.walk('.'):
for sourceFile in files:
if not sourceFile.endswith('.py'):
continue
modName = sourceFile.replace('.py', '')
modules.append(modName)
sourceStr = ''
with open(sourceFile, "r") as f:
sourceStr = f.read()
with open(sourceFile, "r") as f:
lines = f.readlines()
for line in lines:
if not line.startswith('def '):
continue
methodName = line.split('def ', 1)[1].split('(')[0]
methodArgs = \
sourceStr.split('def ' + methodName + '(')[1]
methodArgs = methodArgs.split(')')[0]
methodArgs = methodArgs.replace(' ', '').split(',')
if function.get(modName):
function[modName].append(methodName)
else:
function[modName] = [methodName]
functionProperties[methodName] = {
"args": methodArgs,
"module": modName,
"calledInModule": []
}
break
# which modules is each function used within?
for modName in modules:
with open(modName + '.py', "r") as f:
lines = f.readlines()
for name, properties in functionProperties.items():
for line in lines:
if line.startswith('def '):
continue
if name + '(' in line:
modList = \
functionProperties[name]['calledInModule']
if modName not in modList:
modList.append(modName)
# don't check these functions, because they are procedurally called
exclusions = [
'set_document_loader',
'normalize',
'get_document_loader',
'runInboxQueueWatchdog',
'runInboxQueue',
'runPostSchedule',
'runPostScheduleWatchdog',
'str2bool',
'runNewswireDaemon',
'runNewswireWatchdog',
'threadSendPost',
'sendToFollowers',
'expireCache',
'migrateAccount',
'getMutualsOfPerson',
'runPostsQueue',
'runSharesExpire',
'runPostsWatchdog',
'runSharesExpireWatchdog',
'getThisWeeksEvents',
'getAvailability',
'testThreadsFunction',
'createServerAlice',
'createServerBob',
'createServerEve',
'E2EEremoveDevice',
'setOrganizationScheme'
]
# check that functions are called somewhere
for name, properties in functionProperties.items():
if name in exclusions:
continue
if not properties['calledInModule']:
print('function ' + name +
' in module ' + properties['module'] +
' is not called anywhere')
assert properties['calledInModule']
# print(str(function))
# print(str(functionProperties))
def runAllTests():
print('Running tests...')
testFunctions()
testReplyToPublicPost()
testGetMentionedPeople()
testGuessHashtagCategory()

View File

@ -398,39 +398,6 @@ def setThemeDefault(baseDir: str, allowLocalNetworkAccess: bool):
allowLocalNetworkAccess)
def setThemeHighVis(baseDir: str, allowLocalNetworkAccess: bool):
name = 'highvis'
themeParams = {
"newswire-publish-icon": True,
"full-width-timeline-buttons": False,
"icons-as-buttons": False,
"rss-icon-at-top": True,
"publish-button-at-top": False,
"font-size-header": "22px",
"font-size-header-mobile": "32px",
"font-size": "45px",
"font-size2": "45px",
"font-size3": "45px",
"font-size4": "35px",
"font-size5": "29px",
"gallery-font-size": "35px",
"gallery-font-size-mobile": "55px",
"hashtag-vertical-spacing3": "100px",
"hashtag-vertical-spacing4": "150px",
"time-vertical-align": "-10px",
"*font-family": "'LinBiolinum_Rah'",
"*src": "url('./fonts/LinBiolinum_Rah.woff2') format('woff2')"
}
bgParams = {
"login": "jpg",
"follow": "jpg",
"options": "jpg",
"search": "jpg"
}
setThemeFromDict(baseDir, name, themeParams, bgParams,
allowLocalNetworkAccess)
def setThemeFonts(baseDir: str, themeName: str) -> None:
"""Adds custom theme fonts
"""

View File

@ -542,16 +542,6 @@ def htmlFooter() -> str:
return htmlStr
def getFontFromCss(css: str) -> (str, str):
"""Returns the font name and format
"""
if ' url(' not in css:
return None, None
fontName = css.split(" url(")[1].split(")")[0].replace("'", '')
fontFormat = css.split(" format('")[1].split("')")[0]
return fontName, fontFormat
def loadIndividualPostAsHtmlFromCache(baseDir: str,
nickname: str, domain: str,
postJsonObject: {}) -> str: