
497 lines
18 KiB
Raw Normal View History

2021-11-10 12:16:03 +00:00
__filename__ = "reaction.py"
__author__ = "Bob Mottram"
__license__ = "AGPL3+"
__version__ = "1.2.0"
__maintainer__ = "Bob Mottram"
__email__ = "bob@libreserver.org"
__status__ = "Production"
__module_group__ = "ActivityPub"
import os
from pprint import pprint
from utils import hasObjectString
from utils import hasObjectStringObject
from utils import hasObjectStringType
from utils import removeDomainPort
from utils import hasObjectDict
from utils import hasUsersPath
from utils import getFullDomain
from utils import removeIdEnding
from utils import urlPermitted
from utils import getNicknameFromActor
from utils import getDomainFromActor
from utils import locatePost
from utils import undoReactionCollectionEntry
from utils import hasGroupType
from utils import localActorUrl
from utils import loadJson
from utils import saveJson
from utils import removePostFromCache
from utils import getCachedPostFilename
from posts import sendSignedJson
from session import postJson
from webfinger import webfingerHandle
from auth import createBasicAuthHeader
from posts import getPersonBox
def noOfReactions(postJsonObject: {}, emojiContent: str) -> int:
"""Returns the number of emoji reactions of a given content type on a post
obj = postJsonObject
if hasObjectDict(postJsonObject):
obj = postJsonObject['object']
if not obj.get('reactions'):
return 0
if not isinstance(obj['reactions'], dict):
return 0
if not obj['reactions'].get('items'):
obj['reactions']['items'] = []
obj['reactions']['totalItems'] = 0
ctr = 0
for item in obj['reactions']['items']:
if not item.get('content'):
if item['content'] == emojiContent:
ctr += 1
return ctr
def _reaction(recentPostsCache: {},
session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int,
ccList: [], httpPrefix: str,
objectUrl: str, emojiContent: str,
actorReaction: str,
clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool, projectVersion: str,
signingPrivateKeyPem: str) -> {}:
"""Creates an emoji reaction
actor is the person doing the reacting
'to' might be a specific person (actor) whose post was reaction
object is typically the url of the message which was reaction
if not urlPermitted(objectUrl, federationList):
return None
fullDomain = getFullDomain(domain, port)
newReactionJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'EmojiReact',
'actor': localActorUrl(httpPrefix, nickname, fullDomain),
'object': objectUrl,
'content': emojiContent
if ccList:
if len(ccList) > 0:
newReactionJson['cc'] = ccList
# Extract the domain and nickname from a statuses link
reactionPostNickname = None
reactionPostDomain = None
reactionPostPort = None
groupAccount = False
if actorReaction:
reactionPostNickname = getNicknameFromActor(actorReaction)
reactionPostDomain, reactionPostPort = \
groupAccount = hasGroupType(baseDir, actorReaction, personCache)
if hasUsersPath(objectUrl):
reactionPostNickname = getNicknameFromActor(objectUrl)
reactionPostDomain, reactionPostPort = \
if '/' + str(reactionPostNickname) + '/' in objectUrl:
actorReaction = \
objectUrl.split('/' + reactionPostNickname + '/')[0] + \
'/' + reactionPostNickname
groupAccount = \
hasGroupType(baseDir, actorReaction, personCache)
if reactionPostNickname:
postFilename = locatePost(baseDir, nickname, domain, objectUrl)
if not postFilename:
print('DEBUG: reaction baseDir: ' + baseDir)
print('DEBUG: reaction nickname: ' + nickname)
print('DEBUG: reaction domain: ' + domain)
print('DEBUG: reaction objectUrl: ' + objectUrl)
return None
baseDir, postFilename, objectUrl,
nickname, domain, debug, None,
sendSignedJson(newReactionJson, session, baseDir,
nickname, domain, port,
reactionPostDomain, reactionPostPort,
httpPrefix, True, clientToServer, federationList,
sendThreads, postLog, cachedWebfingers, personCache,
debug, projectVersion, None, groupAccount,
signingPrivateKeyPem, 7165392)
return newReactionJson
def reactionPost(recentPostsCache: {},
session, baseDir: str, federationList: [],
nickname: str, domain: str, port: int, httpPrefix: str,
reactionNickname: str, reactionDomain: str, reactionPort: int,
ccList: [],
reactionStatusNumber: int, emojiContent: str,
clientToServer: bool,
sendThreads: [], postLog: [],
personCache: {}, cachedWebfingers: {},
debug: bool, projectVersion: str,
signingPrivateKeyPem: str) -> {}:
"""Adds a reaction to a given status post. This is only used by unit tests
reactionDomain = getFullDomain(reactionDomain, reactionPort)
actorReaction = localActorUrl(httpPrefix, reactionNickname, reactionDomain)
objectUrl = actorReaction + '/statuses/' + str(reactionStatusNumber)
return _reaction(recentPostsCache,
session, baseDir, federationList,
nickname, domain, port,
ccList, httpPrefix, objectUrl, emojiContent,
actorReaction, clientToServer,
sendThreads, postLog, personCache, cachedWebfingers,
debug, projectVersion, signingPrivateKeyPem)
def sendReactionViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, reactionUrl: str,
emojiContent: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str,
signingPrivateKeyPem: str) -> {}:
"""Creates a reaction via c2s
if not session:
print('WARN: No session for sendReactionViaServer')
return 6
fromDomainFull = getFullDomain(fromDomain, fromPort)
actor = localActorUrl(httpPrefix, fromNickname, fromDomainFull)
newReactionJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'EmojiReact',
'actor': actor,
'object': reactionUrl,
'content': emojiContent
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest = webfingerHandle(session, handle, httpPrefix,
fromDomain, projectVersion, debug, False,
if not wfRequest:
if debug:
print('DEBUG: reaction webfinger failed for ' + handle)
return 1
if not isinstance(wfRequest, dict):
print('WARN: reaction webfinger for ' + handle +
' did not return a dict. ' + str(wfRequest))
return 1
postToBox = 'outbox'
# get the actor inbox for the To handle
originDomain = fromDomain
(inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl,
displayName, _) = getPersonBox(signingPrivateKeyPem,
baseDir, session, wfRequest,
projectVersion, httpPrefix,
fromNickname, fromDomain,
postToBox, 72873)
if not inboxUrl:
if debug:
print('DEBUG: reaction no ' + postToBox +
' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: reaction no actor was found for ' + handle)
return 4
authHeader = createBasicAuthHeader(fromNickname, password)
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
postResult = postJson(httpPrefix, fromDomainFull,
session, newReactionJson, [], inboxUrl,
headers, 3, True)
if not postResult:
if debug:
print('WARN: POST reaction failed for c2s to ' + inboxUrl)
return 5
if debug:
print('DEBUG: c2s POST reaction success')
return newReactionJson
def sendUndoReactionViaServer(baseDir: str, session,
fromNickname: str, password: str,
fromDomain: str, fromPort: int,
httpPrefix: str, reactionUrl: str,
emojiContent: str,
cachedWebfingers: {}, personCache: {},
debug: bool, projectVersion: str,
signingPrivateKeyPem: str) -> {}:
"""Undo a reaction via c2s
if not session:
print('WARN: No session for sendUndoReactionViaServer')
return 6
fromDomainFull = getFullDomain(fromDomain, fromPort)
actor = localActorUrl(httpPrefix, fromNickname, fromDomainFull)
newUndoReactionJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'type': 'Undo',
'actor': actor,
'object': {
'type': 'EmojiReact',
'actor': actor,
'object': reactionUrl,
'content': emojiContent
handle = httpPrefix + '://' + fromDomainFull + '/@' + fromNickname
# lookup the inbox for the To handle
wfRequest = webfingerHandle(session, handle, httpPrefix,
fromDomain, projectVersion, debug, False,
if not wfRequest:
if debug:
print('DEBUG: unreaction webfinger failed for ' + handle)
return 1
if not isinstance(wfRequest, dict):
if debug:
print('WARN: unreaction webfinger for ' + handle +
' did not return a dict. ' + str(wfRequest))
return 1
postToBox = 'outbox'
# get the actor inbox for the To handle
originDomain = fromDomain
(inboxUrl, pubKeyId, pubKey, fromPersonId, sharedInbox, avatarUrl,
displayName, _) = getPersonBox(signingPrivateKeyPem,
baseDir, session, wfRequest,
personCache, projectVersion,
httpPrefix, fromNickname,
fromDomain, postToBox,
if not inboxUrl:
if debug:
print('DEBUG: unreaction no ' + postToBox +
' was found for ' + handle)
return 3
if not fromPersonId:
if debug:
print('DEBUG: unreaction no actor was found for ' + handle)
return 4
authHeader = createBasicAuthHeader(fromNickname, password)
headers = {
'host': fromDomain,
'Content-type': 'application/json',
'Authorization': authHeader
postResult = postJson(httpPrefix, fromDomainFull,
session, newUndoReactionJson, [], inboxUrl,
headers, 3, True)
if not postResult:
if debug:
print('WARN: POST unreaction failed for c2s to ' + inboxUrl)
return 5
if debug:
print('DEBUG: c2s POST unreaction success')
return newUndoReactionJson
def outboxReaction(recentPostsCache: {},
baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When a reaction request is received by the outbox from c2s
if not messageJson.get('type'):
if debug:
print('DEBUG: reaction - no type')
if not messageJson['type'] == 'EmojiReact':
if debug:
print('DEBUG: not a reaction')
if not hasObjectString(messageJson, debug):
if not messageJson.get('content'):
if not isinstance(messageJson['content'], str):
if debug:
print('DEBUG: c2s reaction request arrived in outbox')
messageId = removeIdEnding(messageJson['object'])
domain = removeDomainPort(domain)
emojiContent = messageJson['content']
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s reaction post not found in inbox or outbox')
return True
baseDir, postFilename, messageId,
nickname, domain, debug, None, emojiContent)
if debug:
print('DEBUG: post reaction via c2s - ' + postFilename)
def outboxUndoReaction(recentPostsCache: {},
baseDir: str, httpPrefix: str,
nickname: str, domain: str, port: int,
messageJson: {}, debug: bool) -> None:
""" When an undo reaction request is received by the outbox from c2s
if not messageJson.get('type'):
if not messageJson['type'] == 'Undo':
if not hasObjectStringType(messageJson, debug):
if not messageJson['object']['type'] == 'EmojiReact':
if debug:
print('DEBUG: not a undo reaction')
if not messageJson['object'].get('content'):
if not isinstance(messageJson['object']['content'], str):
if not hasObjectStringObject(messageJson, debug):
if debug:
print('DEBUG: c2s undo reaction request arrived in outbox')
messageId = removeIdEnding(messageJson['object']['object'])
emojiContent = messageJson['object']['content']
domain = removeDomainPort(domain)
postFilename = locatePost(baseDir, nickname, domain, messageId)
if not postFilename:
if debug:
print('DEBUG: c2s undo reaction post not found in inbox or outbox')
return True
undoReactionCollectionEntry(recentPostsCache, baseDir, postFilename,
messageId, messageJson['actor'],
domain, debug, None, emojiContent)
if debug:
print('DEBUG: post undo reaction via c2s - ' + postFilename)
def updateReactionCollection(recentPostsCache: {},
baseDir: str, postFilename: str,
objectUrl: str, actor: str,
nickname: str, domain: str, debug: bool,
postJsonObject: {},
emojiContent: str) -> None:
"""Updates the reactions collection within a post
if not postJsonObject:
postJsonObject = loadJson(postFilename)
if not postJsonObject:
# remove any cached version of this post so that the
# reaction icon is changed
removePostFromCache(postJsonObject, recentPostsCache)
cachedPostFilename = getCachedPostFilename(baseDir, nickname,
domain, postJsonObject)
if cachedPostFilename:
if os.path.isfile(cachedPostFilename):
except BaseException:
print('EX: updateReactionCollection unable to delete ' +
obj = postJsonObject
if hasObjectDict(postJsonObject):
obj = postJsonObject['object']
if not objectUrl.endswith('/reactions'):
objectUrl = objectUrl + '/reactions'
if not obj.get('reactions'):
if debug:
print('DEBUG: Adding initial emoji reaction to ' + objectUrl)
reactionsJson = {
"@context": "https://www.w3.org/ns/activitystreams",
'id': objectUrl,
'type': 'Collection',
"totalItems": 1,
'items': [{
'type': 'EmojiReact',
'actor': actor,
'content': emojiContent
obj['reactions'] = reactionsJson
if not obj['reactions'].get('items'):
obj['reactions']['items'] = []
for reactionItem in obj['reactions']['items']:
if reactionItem.get('actor') and reactionItem.get('content'):
if reactionItem['actor'] == actor and \
reactionItem['content'] == emojiContent:
# already reaction
newReaction = {
'type': 'EmojiReact',
'actor': actor,
'content': emojiContent
itlen = len(obj['reactions']['items'])
obj['reactions']['totalItems'] = itlen
if debug:
print('DEBUG: saving post with emoji reaction added')
saveJson(postJsonObject, postFilename)