Function to get the current time as an integer

main
Bob Mottram 2025-05-28 16:01:47 +01:00
parent 5871155fe3
commit 1994896e2f
8 changed files with 24 additions and 13 deletions

View File

@ -10,7 +10,6 @@ __status__ = "Production"
__module_group__ = "ActivityPub"
import os
import time
from posts import send_signed_json
from flags import has_group_type
from flags import url_permitted
@ -30,6 +29,7 @@ from utils import local_actor_url
from utils import has_actor
from utils import has_object_string_type
from utils import get_actor_from_post
from timeFunctions import get_current_time_int
def _create_quote_accept_reject(receiving_actor: str,
@ -438,7 +438,7 @@ def receive_quote_request(message_json: {}, federation_list: [],
"""
if message_json['type'] != 'QuoteRequest':
return False
curr_time = int(time.time())
curr_time = get_current_time_int()
seconds_since_last_quote_request = curr_time - last_quote_request
if seconds_since_last_quote_request < 30:
# don't handle quote requests too often

View File

@ -18,6 +18,7 @@ from flags import is_quote_toot
from quote import get_quote_toot_url
from timeFunctions import date_utcnow
from timeFunctions import date_from_string_format
from timeFunctions import get_current_time_int
from utils import get_user_paths
from utils import contains_statuses
from utils import data_dir
@ -674,7 +675,7 @@ def update_blocked_cache(base_dir: str,
blocked_cache_update_secs: int) -> int:
"""Updates the cache of globally blocked domains held in memory
"""
curr_time = int(time.time())
curr_time = get_current_time_int()
if blocked_cache_last_updated > curr_time:
print('WARN: Cache updated in the future')
blocked_cache_last_updated = 0

View File

@ -8,11 +8,11 @@ __status__ = "Production"
__module_group__ = "Core"
import os
import time
from utils import data_dir
from utils import save_json
from utils import user_agent_domain
from utils import remove_eol
from timeFunctions import get_current_time_int
from blocking import get_mil_domains_list
from blocking import get_gov_domains_list
from blocking import get_bsky_domains_list
@ -34,7 +34,7 @@ def update_known_crawlers(ua_str: str,
if not ua_str:
return None
curr_time = int(time.time())
curr_time = get_current_time_int()
if known_crawlers.get(ua_str):
known_crawlers[ua_str]['hits'] += 1
known_crawlers[ua_str]['lastseen'] = curr_time

View File

@ -94,6 +94,7 @@ from flags import is_image_file
from flags import is_artist
from flags import is_blog_post
from timeFunctions import date_utcnow
from timeFunctions import get_current_time_int
from utils import replace_strings
from utils import contains_invalid_chars
from utils import save_json
@ -6449,7 +6450,7 @@ def _show_known_crawlers(self, calling_domain: str, path: str,
if not is_moderator(base_dir, nickname):
return False
crawlers_list: list[str] = []
curr_time = int(time.time())
curr_time = get_current_time_int()
recent_crawlers = 60 * 60 * 24 * 30
for ua_str, item in known_crawlers.items():
if item['lastseen'] - curr_time < recent_crawlers:

View File

@ -28,6 +28,7 @@ from mitm import save_mitm_servers
from timeFunctions import date_utcnow
from timeFunctions import date_epoch
from timeFunctions import get_account_timezone
from timeFunctions import get_current_time_int
from utils import harmless_markup
from utils import lines_in_file
from utils import contains_statuses
@ -1331,7 +1332,7 @@ def _bounce_dm(sender_post_id: str, session, http_prefix: str,
# Don't send out bounce messages too frequently.
# Otherwise an adversary could try to DoS your instance
# by continuously sending DMs to you
curr_time = int(time.time())
curr_time = get_current_time_int()
if curr_time - last_bounce_message[0] < 60:
return False
@ -3337,7 +3338,7 @@ def run_inbox_queue(server,
str(queue_filename))
continue
curr_time = int(time.time())
curr_time = get_current_time_int()
# clear the daily quotas for maximum numbers of received posts
if curr_time - quotas_last_update_daily > 60 * 60 * 24:

View File

@ -39,6 +39,7 @@ from roles import get_actor_roles_list
from media import process_meta_data
from flags import is_image_file
from timeFunctions import date_utcnow
from timeFunctions import get_current_time_int
from utils import get_person_icon
from utils import account_is_indexable
from utils import get_image_mime_type
@ -1656,7 +1657,7 @@ def is_person_snoozed(base_dir: str, nickname: str, domain: str,
# is there a time appended?
if snoozed_time_str.isdigit():
snoozed_time = int(snoozed_time_str)
curr_time = int(time.time())
curr_time = get_current_time_int()
# has the snooze timed out?
if int(curr_time - snoozed_time) > 60 * 60 * 24:
replace_str = line

View File

@ -27,6 +27,7 @@ from flags import is_float
from timeFunctions import date_utcnow
from timeFunctions import date_string_to_seconds
from timeFunctions import date_seconds_to_string
from timeFunctions import get_current_time_int
from utils import replace_strings
from utils import data_dir
from utils import resembles_url
@ -486,7 +487,7 @@ def _expire_shares_for_account(base_dir: str, nickname: str, domain: str,
shares_json = load_json(shares_filename)
if not shares_json:
return 0
curr_time = int(time.time())
curr_time = get_current_time_int()
delete_item_id: list[str] = []
for item_id, item in shares_json.items():
if curr_time > item['expire']:
@ -1829,7 +1830,7 @@ def _generate_next_shares_token_update(base_dir: str,
except OSError:
print('EX: _generate_next_shares_token_update unable to read ' +
token_update_filename)
curr_time = int(time.time())
curr_time = get_current_time_int()
updated = False
if next_update_sec:
if curr_time > next_update_sec:
@ -1883,7 +1884,7 @@ def _regenerate_shares_token(base_dir: str, domain_full: str,
token_update_filename)
if not next_update_sec:
return
curr_time = int(time.time())
curr_time = get_current_time_int()
if curr_time <= next_update_sec:
return
create_shared_item_federation_token(base_dir, domain_full, True, None)
@ -1970,7 +1971,7 @@ def _dfc_to_shares_format(catalog_json: {},
_load_dfc_ids(base_dir, system_language, product_type,
http_prefix, domain_full)
curr_time = int(time.time())
curr_time = get_current_time_int()
for item in catalog_json['DFC:supplies']:
if not item.get('@id') or \
not item.get('@type') or \

View File

@ -240,3 +240,9 @@ def get_published_date(post_json_object: {}) -> str:
if not isinstance(published, str):
return None
return published
def get_current_time_int() -> int:
"""Returns the current time as an integer
"""
return int(time.time())