mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			249 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			249 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "timeFunctions.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.6.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Core"
 | |
| 
 | |
| import os
 | |
| import time
 | |
| import datetime
 | |
| from dateutil.tz import tz
 | |
| from utils import acct_dir
 | |
| from utils import data_dir
 | |
| from utils import has_object_dict
 | |
| 
 | |
| 
 | |
| def convert_published_to_local_timezone(published, timezone: str) -> str:
 | |
|     """Converts a post published time into local time
 | |
|     """
 | |
|     to_zone = None
 | |
|     from_zone = tz.gettz('UTC')
 | |
|     if timezone:
 | |
|         try:
 | |
|             to_zone = tz.gettz(timezone)
 | |
|         except BaseException:
 | |
|             pass
 | |
|     if not timezone or not to_zone:
 | |
|         return published
 | |
| 
 | |
|     utc = published.replace(tzinfo=from_zone)
 | |
|     local_time = utc.astimezone(to_zone)
 | |
|     return local_time
 | |
| 
 | |
| 
 | |
| def _utc_mktime(utc_tuple):
 | |
|     """Returns number of seconds elapsed since epoch
 | |
|     Note that no timezone are taken into consideration.
 | |
|     utc tuple must be: (year, month, day, hour, minute, second)
 | |
|     """
 | |
| 
 | |
|     if len(utc_tuple) == 6:
 | |
|         utc_tuple += (0, 0, 0)
 | |
|     return time.mktime(utc_tuple) - time.mktime((1970, 1, 1, 0, 0, 0, 0, 0, 0))
 | |
| 
 | |
| 
 | |
| def _datetime_to_timestamp(dtime):
 | |
|     """Converts a datetime object to UTC timestamp"""
 | |
|     return int(_utc_mktime(dtime.timetuple()))
 | |
| 
 | |
| 
 | |
| def date_utcnow():
 | |
|     """returns the time now
 | |
|     """
 | |
|     return datetime.datetime.now(datetime.timezone.utc)
 | |
| 
 | |
| 
 | |
| def date_from_numbers(year: int, month: int, day: int,
 | |
|                       hour: int, mins: int):
 | |
|     """returns an offset-aware datetime
 | |
|     """
 | |
|     return datetime.datetime(year, month, day, hour, mins, 0,
 | |
|                              tzinfo=datetime.timezone.utc)
 | |
| 
 | |
| 
 | |
| def date_from_string_format(date_str: str, formats: []):
 | |
|     """returns an offset-aware datetime from a string date
 | |
|     """
 | |
|     if not formats:
 | |
|         formats = ("%a, %d %b %Y %H:%M:%S %Z",
 | |
|                    "%a, %d %b %Y %H:%M:%S %z",
 | |
|                    "%Y-%m-%dT%H:%M:%S%z")
 | |
|     dtime = None
 | |
|     for date_format in formats:
 | |
|         try:
 | |
|             dtime = \
 | |
|                 datetime.datetime.strptime(date_str, date_format)
 | |
|         except BaseException:
 | |
|             continue
 | |
|         break
 | |
|     if not dtime:
 | |
|         return None
 | |
|     if not dtime.tzinfo:
 | |
|         dtime = dtime.replace(tzinfo=datetime.timezone.utc)
 | |
|     return dtime
 | |
| 
 | |
| 
 | |
| def date_epoch():
 | |
|     """returns an offset-aware version of epoch
 | |
|     """
 | |
|     return date_from_numbers(1970, 1, 1, 0, 0)
 | |
| 
 | |
| 
 | |
| def date_string_to_seconds(date_str: str) -> int:
 | |
|     """Converts a date string (eg "published") into seconds since epoch
 | |
|     """
 | |
|     expiry_time = \
 | |
|         date_from_string_format(date_str, ['%Y-%m-%dT%H:%M:%S%z'])
 | |
|     if not expiry_time:
 | |
|         print('EX: date_string_to_seconds unable to parse date ' +
 | |
|               str(date_str))
 | |
|         return None
 | |
|     return _datetime_to_timestamp(expiry_time)
 | |
| 
 | |
| 
 | |
| def date_seconds_to_string(date_sec: int) -> str:
 | |
|     """Converts a date in seconds since epoch to a string
 | |
|     """
 | |
|     this_date = \
 | |
|         datetime.datetime.fromtimestamp(date_sec, datetime.timezone.utc)
 | |
|     if not this_date.tzinfo:
 | |
|         this_date = this_date.replace(tzinfo=datetime.timezone.utc)
 | |
|     this_date_tz = this_date.astimezone(datetime.timezone.utc)
 | |
|     return this_date_tz.strftime("%Y-%m-%dT%H:%M:%SZ")
 | |
| 
 | |
| 
 | |
| def get_account_timezone(base_dir: str, nickname: str, domain: str) -> str:
 | |
|     """Returns the timezone for the given account
 | |
|     """
 | |
|     tz_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/timezone.txt'
 | |
|     if not os.path.isfile(tz_filename):
 | |
|         return None
 | |
|     timezone = None
 | |
|     try:
 | |
|         with open(tz_filename, 'r', encoding='utf-8') as fp_timezone:
 | |
|             timezone = fp_timezone.read().strip()
 | |
|     except OSError:
 | |
|         print('EX: get_account_timezone unable to read ' + tz_filename)
 | |
|     return timezone
 | |
| 
 | |
| 
 | |
| def set_account_timezone(base_dir: str, nickname: str, domain: str,
 | |
|                          timezone: str) -> None:
 | |
|     """Sets the timezone for the given account
 | |
|     """
 | |
|     tz_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/timezone.txt'
 | |
|     timezone = timezone.strip()
 | |
|     try:
 | |
|         with open(tz_filename, 'w+', encoding='utf-8') as fp_timezone:
 | |
|             fp_timezone.write(timezone)
 | |
|     except OSError:
 | |
|         print('EX: set_account_timezone unable to write ' +
 | |
|               tz_filename)
 | |
| 
 | |
| 
 | |
| def load_account_timezones(base_dir: str) -> {}:
 | |
|     """Returns a dictionary containing the preferred timezone for each account
 | |
|     """
 | |
|     account_timezone = {}
 | |
|     dir_str = data_dir(base_dir)
 | |
|     for _, dirs, _ in os.walk(dir_str):
 | |
|         for acct in dirs:
 | |
|             if '@' not in acct:
 | |
|                 continue
 | |
|             if acct.startswith('inbox@') or acct.startswith('Actor@'):
 | |
|                 continue
 | |
|             acct_directory = os.path.join(dir_str, acct)
 | |
|             tz_filename = acct_directory + '/timezone.txt'
 | |
|             if not os.path.isfile(tz_filename):
 | |
|                 continue
 | |
|             timezone = None
 | |
|             try:
 | |
|                 with open(tz_filename, 'r', encoding='utf-8') as fp_timezone:
 | |
|                     timezone = fp_timezone.read().strip()
 | |
|             except OSError:
 | |
|                 print('EX: load_account_timezones unable to read ' +
 | |
|                       tz_filename)
 | |
|             if timezone:
 | |
|                 nickname = acct.split('@')[0]
 | |
|                 account_timezone[nickname] = timezone
 | |
|         break
 | |
|     return account_timezone
 | |
| 
 | |
| 
 | |
| def week_day_of_month_start(month_number: int, year: int) -> int:
 | |
|     """Gets the day number of the first day of the month
 | |
|     1=sun, 7=sat
 | |
|     """
 | |
|     first_day_of_month = date_from_numbers(year, month_number, 1, 0, 0)
 | |
|     return int(first_day_of_month.strftime("%w")) + 1
 | |
| 
 | |
| 
 | |
| def valid_post_date(published: str, max_age_days: int, debug: bool) -> bool:
 | |
|     """Returns true if the published date is recent and is not in the future
 | |
|     """
 | |
|     baseline_time = date_epoch()
 | |
| 
 | |
|     days_diff = date_utcnow() - baseline_time
 | |
|     now_days_since_epoch = days_diff.days
 | |
| 
 | |
|     post_time_object = \
 | |
|         date_from_string_format(published, ["%Y-%m-%dT%H:%M:%S%z"])
 | |
|     if not post_time_object:
 | |
|         if debug:
 | |
|             print('EX: valid_post_date invalid published date ' +
 | |
|                   str(published))
 | |
|         return False
 | |
| 
 | |
|     days_diff = post_time_object - baseline_time
 | |
|     post_days_since_epoch = days_diff.days
 | |
| 
 | |
|     if post_days_since_epoch > now_days_since_epoch:
 | |
|         if debug:
 | |
|             print("Inbox post has a published date in the future!")
 | |
|         return False
 | |
| 
 | |
|     if now_days_since_epoch - post_days_since_epoch >= max_age_days:
 | |
|         if debug:
 | |
|             print("Inbox post is not recent enough")
 | |
|         return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def time_days_ago(datestr: str) -> int:
 | |
|     """returns the number of days ago for the given date
 | |
|     """
 | |
|     date1 = \
 | |
|         date_from_string_format(datestr,
 | |
|                                 ["%Y-%m-%dT%H:%M:%S%z"])
 | |
|     if not date1:
 | |
|         return 0
 | |
|     date_diff = date_utcnow() - date1
 | |
|     return date_diff.days
 | |
| 
 | |
| 
 | |
| def get_published_date(post_json_object: {}) -> str:
 | |
|     """Returns the published date on the given post
 | |
|     """
 | |
|     published = None
 | |
|     if post_json_object.get('published'):
 | |
|         published = post_json_object['published']
 | |
|     elif has_object_dict(post_json_object):
 | |
|         if post_json_object['object'].get('published'):
 | |
|             published = post_json_object['object']['published']
 | |
|     if not published:
 | |
|         return None
 | |
|     if not isinstance(published, str):
 | |
|         return None
 | |
|     return published
 | |
| 
 | |
| 
 | |
| def get_current_time_int() -> int:
 | |
|     """Returns the current time as an integer
 | |
|     """
 | |
|     return int(time.time())
 |