mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			456 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			456 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "happening.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.2.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Core"
 | |
| 
 | |
| import os
 | |
| from uuid import UUID
 | |
| from datetime import datetime
 | |
| from datetime import timedelta
 | |
| 
 | |
| from utils import isPublicPost
 | |
| from utils import loadJson
 | |
| from utils import saveJson
 | |
| from utils import locatePost
 | |
| from utils import hasObjectDict
 | |
| from utils import acctDir
 | |
| 
 | |
| 
 | |
| def _validUuid(testUuid: str, version: int):
 | |
|     """Check if uuid_to_test is a valid UUID
 | |
|     """
 | |
|     try:
 | |
|         uuid_obj = UUID(testUuid, version=version)
 | |
|     except ValueError:
 | |
|         return False
 | |
| 
 | |
|     return str(uuid_obj) == testUuid
 | |
| 
 | |
| 
 | |
| def _removeEventFromTimeline(eventId: str, tlEventsFilename: str) -> None:
 | |
|     """Removes the given event Id from the timeline
 | |
|     """
 | |
|     if eventId + '\n' not in open(tlEventsFilename).read():
 | |
|         return
 | |
|     with open(tlEventsFilename, 'r') as fp:
 | |
|         eventsTimeline = fp.read().replace(eventId + '\n', '')
 | |
|         try:
 | |
|             with open(tlEventsFilename, 'w+') as fp2:
 | |
|                 fp2.write(eventsTimeline)
 | |
|         except BaseException:
 | |
|             print('EX: ERROR: unable to save events timeline')
 | |
|             pass
 | |
| 
 | |
| 
 | |
| def saveEventPost(baseDir: str, handle: str, postId: str,
 | |
|                   eventJson: {}) -> bool:
 | |
|     """Saves an event to the calendar and/or the events timeline
 | |
|     If an event has extra fields, as per Mobilizon,
 | |
|     Then it is saved as a separate entity and added to the
 | |
|     events timeline
 | |
|     See https://framagit.org/framasoft/mobilizon/-/blob/
 | |
|     master/lib/federation/activity_stream/converter/event.ex
 | |
|     """
 | |
|     if not os.path.isdir(baseDir + '/accounts/' + handle):
 | |
|         print('WARN: Account does not exist at ' +
 | |
|               baseDir + '/accounts/' + handle)
 | |
|     calendarPath = baseDir + '/accounts/' + handle + '/calendar'
 | |
|     if not os.path.isdir(calendarPath):
 | |
|         os.mkdir(calendarPath)
 | |
| 
 | |
|     # get the year, month and day from the event
 | |
|     eventTime = datetime.strptime(eventJson['startTime'],
 | |
|                                   "%Y-%m-%dT%H:%M:%S%z")
 | |
|     eventYear = int(eventTime.strftime("%Y"))
 | |
|     if eventYear < 2020 or eventYear >= 2100:
 | |
|         return False
 | |
|     eventMonthNumber = int(eventTime.strftime("%m"))
 | |
|     if eventMonthNumber < 1 or eventMonthNumber > 12:
 | |
|         return False
 | |
|     eventDayOfMonth = int(eventTime.strftime("%d"))
 | |
|     if eventDayOfMonth < 1 or eventDayOfMonth > 31:
 | |
|         return False
 | |
| 
 | |
|     if eventJson.get('name') and eventJson.get('actor') and \
 | |
|        eventJson.get('uuid') and eventJson.get('content'):
 | |
|         if not _validUuid(eventJson['uuid'], 4):
 | |
|             return False
 | |
|         print('Mobilizon type event')
 | |
|         # if this is a full description of an event then save it
 | |
|         # as a separate json file
 | |
|         eventsPath = baseDir + '/accounts/' + handle + '/events'
 | |
|         if not os.path.isdir(eventsPath):
 | |
|             os.mkdir(eventsPath)
 | |
|         eventsYearPath = \
 | |
|             baseDir + '/accounts/' + handle + '/events/' + str(eventYear)
 | |
|         if not os.path.isdir(eventsYearPath):
 | |
|             os.mkdir(eventsYearPath)
 | |
|         eventId = str(eventYear) + '-' + eventTime.strftime("%m") + '-' + \
 | |
|             eventTime.strftime("%d") + '_' + eventJson['uuid']
 | |
|         eventFilename = eventsYearPath + '/' + eventId + '.json'
 | |
| 
 | |
|         saveJson(eventJson, eventFilename)
 | |
|         # save to the events timeline
 | |
|         tlEventsFilename = baseDir + '/accounts/' + handle + '/events.txt'
 | |
| 
 | |
|         if os.path.isfile(tlEventsFilename):
 | |
|             _removeEventFromTimeline(eventId, tlEventsFilename)
 | |
|             try:
 | |
|                 with open(tlEventsFilename, 'r+') as tlEventsFile:
 | |
|                     content = tlEventsFile.read()
 | |
|                     if eventId + '\n' not in content:
 | |
|                         tlEventsFile.seek(0, 0)
 | |
|                         tlEventsFile.write(eventId + '\n' + content)
 | |
|             except Exception as e:
 | |
|                 print('WARN: Failed to write entry to events file ' +
 | |
|                       tlEventsFilename + ' ' + str(e))
 | |
|                 return False
 | |
|         else:
 | |
|             with open(tlEventsFilename, 'w+') as tlEventsFile:
 | |
|                 tlEventsFile.write(eventId + '\n')
 | |
| 
 | |
|     # create a directory for the calendar year
 | |
|     if not os.path.isdir(calendarPath + '/' + str(eventYear)):
 | |
|         os.mkdir(calendarPath + '/' + str(eventYear))
 | |
| 
 | |
|     # calendar month file containing event post Ids
 | |
|     calendarFilename = calendarPath + '/' + str(eventYear) + \
 | |
|         '/' + str(eventMonthNumber) + '.txt'
 | |
| 
 | |
|     # Does this event post already exist within the calendar month?
 | |
|     if os.path.isfile(calendarFilename):
 | |
|         if postId in open(calendarFilename).read():
 | |
|             # Event post already exists
 | |
|             return False
 | |
| 
 | |
|     # append the post Id to the file for the calendar month
 | |
|     with open(calendarFilename, 'a+') as calendarFile:
 | |
|         calendarFile.write(postId + '\n')
 | |
| 
 | |
|     # create a file which will trigger a notification that
 | |
|     # a new event has been added
 | |
|     calendarNotificationFilename = \
 | |
|         baseDir + '/accounts/' + handle + '/.newCalendar'
 | |
|     with open(calendarNotificationFilename, 'w+') as calendarNotificationFile:
 | |
|         notifyStr = \
 | |
|             '/calendar?year=' + str(eventYear) + '?month=' + \
 | |
|             str(eventMonthNumber) + '?day=' + str(eventDayOfMonth)
 | |
|         calendarNotificationFile.write(notifyStr)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def _isHappeningEvent(tag: {}) -> bool:
 | |
|     """Is this tag an Event or Place ActivityStreams type?
 | |
|     """
 | |
|     if not tag.get('type'):
 | |
|         return False
 | |
|     if tag['type'] != 'Event' and tag['type'] != 'Place':
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def _isHappeningPost(postJsonObject: {}) -> bool:
 | |
|     """Is this a post with tags?
 | |
|     """
 | |
|     if not postJsonObject:
 | |
|         return False
 | |
|     if not hasObjectDict(postJsonObject):
 | |
|         return False
 | |
|     if not postJsonObject['object'].get('tag'):
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def getTodaysEvents(baseDir: str, nickname: str, domain: str,
 | |
|                     currYear: int, currMonthNumber: int,
 | |
|                     currDayOfMonth: int) -> {}:
 | |
|     """Retrieves calendar events for today
 | |
|     Returns a dictionary of lists containing Event and Place activities
 | |
|     """
 | |
|     now = datetime.now()
 | |
|     if not currYear:
 | |
|         year = now.year
 | |
|     else:
 | |
|         year = currYear
 | |
|     if not currMonthNumber:
 | |
|         monthNumber = now.month
 | |
|     else:
 | |
|         monthNumber = currMonthNumber
 | |
|     if not currDayOfMonth:
 | |
|         dayNumber = now.day
 | |
|     else:
 | |
|         dayNumber = currDayOfMonth
 | |
| 
 | |
|     calendarFilename = \
 | |
|         acctDir(baseDir, nickname, domain) + \
 | |
|         '/calendar/' + str(year) + '/' + str(monthNumber) + '.txt'
 | |
|     events = {}
 | |
|     if not os.path.isfile(calendarFilename):
 | |
|         return events
 | |
| 
 | |
|     calendarPostIds = []
 | |
|     recreateEventsFile = False
 | |
|     with open(calendarFilename, 'r') as eventsFile:
 | |
|         for postId in eventsFile:
 | |
|             postId = postId.replace('\n', '').replace('\r', '')
 | |
|             postFilename = locatePost(baseDir, nickname, domain, postId)
 | |
|             if not postFilename:
 | |
|                 recreateEventsFile = True
 | |
|                 continue
 | |
| 
 | |
|             postJsonObject = loadJson(postFilename)
 | |
|             if not _isHappeningPost(postJsonObject):
 | |
|                 continue
 | |
| 
 | |
|             publicEvent = isPublicPost(postJsonObject)
 | |
| 
 | |
|             postEvent = []
 | |
|             dayOfMonth = None
 | |
|             for tag in postJsonObject['object']['tag']:
 | |
|                 if not _isHappeningEvent(tag):
 | |
|                     continue
 | |
|                 # this tag is an event or a place
 | |
|                 if tag['type'] == 'Event':
 | |
|                     # tag is an event
 | |
|                     if not tag.get('startTime'):
 | |
|                         continue
 | |
|                     eventTime = \
 | |
|                         datetime.strptime(tag['startTime'],
 | |
|                                           "%Y-%m-%dT%H:%M:%S%z")
 | |
|                     if int(eventTime.strftime("%Y")) == year and \
 | |
|                        int(eventTime.strftime("%m")) == monthNumber and \
 | |
|                        int(eventTime.strftime("%d")) == dayNumber:
 | |
|                         dayOfMonth = str(int(eventTime.strftime("%d")))
 | |
|                         if '#statuses#' in postId:
 | |
|                             # link to the id so that the event can be
 | |
|                             # easily deleted
 | |
|                             tag['postId'] = postId.split('#statuses#')[1]
 | |
|                             tag['sender'] = postId.split('#statuses#')[0]
 | |
|                             tag['sender'] = tag['sender'].replace('#', '/')
 | |
|                             tag['public'] = publicEvent
 | |
|                         postEvent.append(tag)
 | |
|                 else:
 | |
|                     # tag is a place
 | |
|                     postEvent.append(tag)
 | |
|             if postEvent and dayOfMonth:
 | |
|                 calendarPostIds.append(postId)
 | |
|                 if not events.get(dayOfMonth):
 | |
|                     events[dayOfMonth] = []
 | |
|                 events[dayOfMonth].append(postEvent)
 | |
| 
 | |
|     # if some posts have been deleted then regenerate the calendar file
 | |
|     if recreateEventsFile:
 | |
|         with open(calendarFilename, 'w+') as calendarFile:
 | |
|             for postId in calendarPostIds:
 | |
|                 calendarFile.write(postId + '\n')
 | |
| 
 | |
|     return events
 | |
| 
 | |
| 
 | |
| def dayEventsCheck(baseDir: str, nickname: str, domain: str, currDate) -> bool:
 | |
|     """Are there calendar events for the given date?
 | |
|     """
 | |
|     year = currDate.year
 | |
|     monthNumber = currDate.month
 | |
|     dayNumber = currDate.day
 | |
| 
 | |
|     calendarFilename = \
 | |
|         acctDir(baseDir, nickname, domain) + \
 | |
|         '/calendar/' + str(year) + '/' + str(monthNumber) + '.txt'
 | |
|     if not os.path.isfile(calendarFilename):
 | |
|         return False
 | |
| 
 | |
|     eventsExist = False
 | |
|     with open(calendarFilename, 'r') as eventsFile:
 | |
|         for postId in eventsFile:
 | |
|             postId = postId.replace('\n', '').replace('\r', '')
 | |
|             postFilename = locatePost(baseDir, nickname, domain, postId)
 | |
|             if not postFilename:
 | |
|                 continue
 | |
| 
 | |
|             postJsonObject = loadJson(postFilename)
 | |
|             if not _isHappeningPost(postJsonObject):
 | |
|                 continue
 | |
| 
 | |
|             for tag in postJsonObject['object']['tag']:
 | |
|                 if not _isHappeningEvent(tag):
 | |
|                     continue
 | |
|                 # this tag is an event or a place
 | |
|                 if tag['type'] != 'Event':
 | |
|                     continue
 | |
|                 # tag is an event
 | |
|                 if not tag.get('startTime'):
 | |
|                     continue
 | |
|                 eventTime = \
 | |
|                     datetime.strptime(tag['startTime'],
 | |
|                                       "%Y-%m-%dT%H:%M:%S%z")
 | |
|                 if int(eventTime.strftime("%d")) != dayNumber:
 | |
|                     continue
 | |
|                 if int(eventTime.strftime("%m")) != monthNumber:
 | |
|                     continue
 | |
|                 if int(eventTime.strftime("%Y")) != year:
 | |
|                     continue
 | |
|                 eventsExist = True
 | |
|                 break
 | |
| 
 | |
|     return eventsExist
 | |
| 
 | |
| 
 | |
| def getThisWeeksEvents(baseDir: str, nickname: str, domain: str) -> {}:
 | |
|     """Retrieves calendar events for this week
 | |
|     Returns a dictionary indexed by day number of lists containing
 | |
|     Event and Place activities
 | |
|     Note: currently not used but could be with a weekly calendar screen
 | |
|     """
 | |
|     now = datetime.now()
 | |
|     endOfWeek = now + timedelta(7)
 | |
|     year = now.year
 | |
|     monthNumber = now.month
 | |
| 
 | |
|     calendarFilename = \
 | |
|         acctDir(baseDir, nickname, domain) + \
 | |
|         '/calendar/' + str(year) + '/' + str(monthNumber) + '.txt'
 | |
| 
 | |
|     events = {}
 | |
|     if not os.path.isfile(calendarFilename):
 | |
|         return events
 | |
| 
 | |
|     calendarPostIds = []
 | |
|     recreateEventsFile = False
 | |
|     with open(calendarFilename, 'r') as eventsFile:
 | |
|         for postId in eventsFile:
 | |
|             postId = postId.replace('\n', '').replace('\r', '')
 | |
|             postFilename = locatePost(baseDir, nickname, domain, postId)
 | |
|             if not postFilename:
 | |
|                 recreateEventsFile = True
 | |
|                 continue
 | |
| 
 | |
|             postJsonObject = loadJson(postFilename)
 | |
|             if not _isHappeningPost(postJsonObject):
 | |
|                 continue
 | |
| 
 | |
|             postEvent = []
 | |
|             weekDayIndex = None
 | |
|             for tag in postJsonObject['object']['tag']:
 | |
|                 if not _isHappeningEvent(tag):
 | |
|                     continue
 | |
|                 # this tag is an event or a place
 | |
|                 if tag['type'] == 'Event':
 | |
|                     # tag is an event
 | |
|                     if not tag.get('startTime'):
 | |
|                         continue
 | |
|                     eventTime = \
 | |
|                         datetime.strptime(tag['startTime'],
 | |
|                                           "%Y-%m-%dT%H:%M:%S%z")
 | |
|                     if eventTime >= now and eventTime <= endOfWeek:
 | |
|                         weekDayIndex = (eventTime - now).days()
 | |
|                         postEvent.append(tag)
 | |
|                 else:
 | |
|                     # tag is a place
 | |
|                     postEvent.append(tag)
 | |
|             if postEvent and weekDayIndex:
 | |
|                 calendarPostIds.append(postId)
 | |
|                 if not events.get(weekDayIndex):
 | |
|                     events[weekDayIndex] = []
 | |
|                 events[weekDayIndex].append(postEvent)
 | |
| 
 | |
|     # if some posts have been deleted then regenerate the calendar file
 | |
|     if recreateEventsFile:
 | |
|         with open(calendarFilename, 'w+') as calendarFile:
 | |
|             for postId in calendarPostIds:
 | |
|                 calendarFile.write(postId + '\n')
 | |
| 
 | |
|     return events
 | |
| 
 | |
| 
 | |
| def getCalendarEvents(baseDir: str, nickname: str, domain: str,
 | |
|                       year: int, monthNumber: int) -> {}:
 | |
|     """Retrieves calendar events
 | |
|     Returns a dictionary indexed by day number of lists containing
 | |
|     Event and Place activities
 | |
|     """
 | |
|     calendarFilename = \
 | |
|         acctDir(baseDir, nickname, domain) + \
 | |
|         '/calendar/' + str(year) + '/' + str(monthNumber) + '.txt'
 | |
| 
 | |
|     events = {}
 | |
|     if not os.path.isfile(calendarFilename):
 | |
|         return events
 | |
| 
 | |
|     calendarPostIds = []
 | |
|     recreateEventsFile = False
 | |
|     with open(calendarFilename, 'r') as eventsFile:
 | |
|         for postId in eventsFile:
 | |
|             postId = postId.replace('\n', '').replace('\r', '')
 | |
|             postFilename = locatePost(baseDir, nickname, domain, postId)
 | |
|             if not postFilename:
 | |
|                 recreateEventsFile = True
 | |
|                 continue
 | |
| 
 | |
|             postJsonObject = loadJson(postFilename)
 | |
|             if not _isHappeningPost(postJsonObject):
 | |
|                 continue
 | |
| 
 | |
|             postEvent = []
 | |
|             dayOfMonth = None
 | |
|             for tag in postJsonObject['object']['tag']:
 | |
|                 if not _isHappeningEvent(tag):
 | |
|                     continue
 | |
|                 # this tag is an event or a place
 | |
|                 if tag['type'] == 'Event':
 | |
|                     # tag is an event
 | |
|                     if not tag.get('startTime'):
 | |
|                         continue
 | |
|                     eventTime = \
 | |
|                         datetime.strptime(tag['startTime'],
 | |
|                                           "%Y-%m-%dT%H:%M:%S%z")
 | |
|                     if int(eventTime.strftime("%Y")) == year and \
 | |
|                        int(eventTime.strftime("%m")) == monthNumber:
 | |
|                         dayOfMonth = str(int(eventTime.strftime("%d")))
 | |
|                         postEvent.append(tag)
 | |
|                 else:
 | |
|                     # tag is a place
 | |
|                     postEvent.append(tag)
 | |
| 
 | |
|             if postEvent and dayOfMonth:
 | |
|                 calendarPostIds.append(postId)
 | |
|                 if not events.get(dayOfMonth):
 | |
|                     events[dayOfMonth] = []
 | |
|                 events[dayOfMonth].append(postEvent)
 | |
| 
 | |
|     # if some posts have been deleted then regenerate the calendar file
 | |
|     if recreateEventsFile:
 | |
|         with open(calendarFilename, 'w+') as calendarFile:
 | |
|             for postId in calendarPostIds:
 | |
|                 calendarFile.write(postId + '\n')
 | |
| 
 | |
|     return events
 | |
| 
 | |
| 
 | |
| def removeCalendarEvent(baseDir: str, nickname: str, domain: str,
 | |
|                         year: int, monthNumber: int, messageId: str) -> None:
 | |
|     """Removes a calendar event
 | |
|     """
 | |
|     calendarFilename = \
 | |
|         acctDir(baseDir, nickname, domain) + \
 | |
|         '/calendar/' + str(year) + '/' + str(monthNumber) + '.txt'
 | |
|     if not os.path.isfile(calendarFilename):
 | |
|         return
 | |
|     if '/' in messageId:
 | |
|         messageId = messageId.replace('/', '#')
 | |
|     if messageId not in open(calendarFilename).read():
 | |
|         return
 | |
|     lines = None
 | |
|     with open(calendarFilename, 'r') as f:
 | |
|         lines = f.readlines()
 | |
|     if not lines:
 | |
|         return
 | |
|     with open(calendarFilename, 'w+') as f:
 | |
|         for line in lines:
 | |
|             if messageId not in line:
 | |
|                 f.write(line)
 |