Snake case

main
Bob Mottram 2021-12-30 18:38:36 +00:00
parent fb7208938b
commit ec7384681a
4 changed files with 330 additions and 322 deletions

View File

@ -43,33 +43,33 @@ def get_briar_address(actor_json: {}) -> str:
return ''
def set_briar_address(actor_json: {}, briarAddress: str) -> None:
def set_briar_address(actor_json: {}, briar_address: str) -> None:
"""Sets an briar address for the given actor
"""
notBriarAddress = False
not_briar_address = False
if len(briarAddress) < 50:
notBriarAddress = True
if not briarAddress.startswith('briar://'):
notBriarAddress = True
if briarAddress.lower() != briarAddress:
notBriarAddress = True
if '"' in briarAddress:
notBriarAddress = True
if ' ' in briarAddress:
notBriarAddress = True
if '.' in briarAddress:
notBriarAddress = True
if ',' in briarAddress:
notBriarAddress = True
if '<' in briarAddress:
notBriarAddress = True
if len(briar_address) < 50:
not_briar_address = True
if not briar_address.startswith('briar://'):
not_briar_address = True
if briar_address.lower() != briar_address:
not_briar_address = True
if '"' in briar_address:
not_briar_address = True
if ' ' in briar_address:
not_briar_address = True
if '.' in briar_address:
not_briar_address = True
if ',' in briar_address:
not_briar_address = True
if '<' in briar_address:
not_briar_address = True
if not actor_json.get('attachment'):
actor_json['attachment'] = []
# remove any existing value
propertyFound = None
property_found = None
for property_value in actor_json['attachment']:
if not property_value.get('name'):
continue
@ -77,11 +77,11 @@ def set_briar_address(actor_json: {}, briarAddress: str) -> None:
continue
if not property_value['name'].lower().startswith('briar'):
continue
propertyFound = property_value
property_found = property_value
break
if propertyFound:
actor_json['attachment'].remove(propertyFound)
if notBriarAddress:
if property_found:
actor_json['attachment'].remove(property_found)
if not_briar_address:
return
for property_value in actor_json['attachment']:
@ -93,12 +93,12 @@ def set_briar_address(actor_json: {}, briarAddress: str) -> None:
continue
if property_value['type'] != 'PropertyValue':
continue
property_value['value'] = briarAddress
property_value['value'] = briar_address
return
newBriarAddress = {
new_briar_address = {
"name": "Briar",
"type": "PropertyValue",
"value": briarAddress
"value": briar_address
}
actor_json['attachment'].append(newBriarAddress)
actor_json['attachment'].append(new_briar_address)

163
cache.py
View File

@ -17,25 +17,25 @@ from utils import get_file_case_insensitive
from utils import get_user_paths
def _remove_person_from_cache(base_dir: str, personUrl: str,
def _remove_person_from_cache(base_dir: str, person_url: str,
person_cache: {}) -> bool:
"""Removes an actor from the cache
"""
cacheFilename = base_dir + '/cache/actors/' + \
personUrl.replace('/', '#') + '.json'
if os.path.isfile(cacheFilename):
cache_filename = base_dir + '/cache/actors/' + \
person_url.replace('/', '#') + '.json'
if os.path.isfile(cache_filename):
try:
os.remove(cacheFilename)
os.remove(cache_filename)
except OSError:
print('EX: unable to delete cached actor ' + str(cacheFilename))
if person_cache.get(personUrl):
del person_cache[personUrl]
print('EX: unable to delete cached actor ' + str(cache_filename))
if person_cache.get(person_url):
del person_cache[person_url]
def check_for_changed_actor(session, base_dir: str,
http_prefix: str, domain_full: str,
personUrl: str, avatarUrl: str, person_cache: {},
timeoutSec: int):
person_url: str, avatarUrl: str, person_cache: {},
timeout_sec: int):
"""Checks if the avatar url exists and if not then
the actor has probably changed without receiving an actor/Person Update.
So clear the actor from the cache and it will be refreshed when the next
@ -45,63 +45,63 @@ def check_for_changed_actor(session, base_dir: str,
return
if domain_full in avatarUrl:
return
if url_exists(session, avatarUrl, timeoutSec, http_prefix, domain_full):
if url_exists(session, avatarUrl, timeout_sec, http_prefix, domain_full):
return
_remove_person_from_cache(base_dir, personUrl, person_cache)
_remove_person_from_cache(base_dir, person_url, person_cache)
def store_person_in_cache(base_dir: str, personUrl: str,
personJson: {}, person_cache: {},
allowWriteToFile: bool) -> None:
def store_person_in_cache(base_dir: str, person_url: str,
person_json: {}, person_cache: {},
allow_write_to_file: bool) -> None:
"""Store an actor in the cache
"""
if 'statuses' in personUrl or personUrl.endswith('/actor'):
if 'statuses' in person_url or person_url.endswith('/actor'):
# This is not an actor or person account
return
curr_time = datetime.datetime.utcnow()
person_cache[personUrl] = {
"actor": personJson,
person_cache[person_url] = {
"actor": person_json,
"timestamp": curr_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
if not base_dir:
return
# store to file
if not allowWriteToFile:
if not allow_write_to_file:
return
if os.path.isdir(base_dir + '/cache/actors'):
cacheFilename = base_dir + '/cache/actors/' + \
personUrl.replace('/', '#') + '.json'
if not os.path.isfile(cacheFilename):
save_json(personJson, cacheFilename)
cache_filename = base_dir + '/cache/actors/' + \
person_url.replace('/', '#') + '.json'
if not os.path.isfile(cache_filename):
save_json(person_json, cache_filename)
def get_person_from_cache(base_dir: str, personUrl: str, person_cache: {},
allowWriteToFile: bool) -> {}:
def get_person_from_cache(base_dir: str, person_url: str, person_cache: {},
allow_write_to_file: bool) -> {}:
"""Get an actor from the cache
"""
# if the actor is not in memory then try to load it from file
loadedFromFile = False
if not person_cache.get(personUrl):
loaded_from_file = False
if not person_cache.get(person_url):
# does the person exist as a cached file?
cacheFilename = base_dir + '/cache/actors/' + \
personUrl.replace('/', '#') + '.json'
actorFilename = get_file_case_insensitive(cacheFilename)
if actorFilename:
personJson = load_json(actorFilename)
if personJson:
store_person_in_cache(base_dir, personUrl, personJson,
cache_filename = base_dir + '/cache/actors/' + \
person_url.replace('/', '#') + '.json'
actor_filename = get_file_case_insensitive(cache_filename)
if actor_filename:
person_json = load_json(actor_filename)
if person_json:
store_person_in_cache(base_dir, person_url, person_json,
person_cache, False)
loadedFromFile = True
loaded_from_file = True
if person_cache.get(personUrl):
if not loadedFromFile:
if person_cache.get(person_url):
if not loaded_from_file:
# update the timestamp for the last time the actor was retrieved
curr_time = datetime.datetime.utcnow()
curr_timeStr = curr_time.strftime("%Y-%m-%dT%H:%M:%SZ")
person_cache[personUrl]['timestamp'] = curr_timeStr
return person_cache[personUrl]['actor']
curr_time_str = curr_time.strftime("%Y-%m-%dT%H:%M:%SZ")
person_cache[person_url]['timestamp'] = curr_time_str
return person_cache[person_url]['actor']
return None
@ -110,15 +110,15 @@ def expire_person_cache(person_cache: {}):
"""
curr_time = datetime.datetime.utcnow()
removals = []
for personUrl, cacheJson in person_cache.items():
cacheTime = datetime.datetime.strptime(cacheJson['timestamp'],
"%Y-%m-%dT%H:%M:%SZ")
daysSinceCached = (curr_time - cacheTime).days
if daysSinceCached > 2:
removals.append(personUrl)
for person_url, cache_json in person_cache.items():
cache_time = datetime.datetime.strptime(cache_json['timestamp'],
"%Y-%m-%dT%H:%M:%SZ")
days_since_cached = (curr_time - cache_time).days
if days_since_cached > 2:
removals.append(person_url)
if len(removals) > 0:
for personUrl in removals:
del person_cache[personUrl]
for person_url in removals:
del person_cache[person_url]
print(str(len(removals)) + ' actors were expired from the cache')
@ -136,52 +136,55 @@ def get_webfinger_from_cache(handle: str, cached_webfingers: {}) -> {}:
return None
def get_person_pub_key(base_dir: str, session, personUrl: str,
def get_person_pub_key(base_dir: str, session, person_url: str,
person_cache: {}, debug: bool,
project_version: str, http_prefix: str,
domain: str, onion_domain: str,
signing_priv_key_pem: str) -> str:
if not personUrl:
if not person_url:
return None
personUrl = personUrl.replace('#main-key', '')
usersPaths = get_user_paths()
for possibleUsersPath in usersPaths:
if personUrl.endswith(possibleUsersPath + 'inbox'):
person_url = person_url.replace('#main-key', '')
users_paths = get_user_paths()
for possible_users_path in users_paths:
if person_url.endswith(possible_users_path + 'inbox'):
if debug:
print('DEBUG: Obtaining public key for shared inbox')
personUrl = \
personUrl.replace(possibleUsersPath + 'inbox', '/inbox')
person_url = \
person_url.replace(possible_users_path + 'inbox', '/inbox')
break
personJson = \
get_person_from_cache(base_dir, personUrl, person_cache, True)
if not personJson:
person_json = \
get_person_from_cache(base_dir, person_url, person_cache, True)
if not person_json:
if debug:
print('DEBUG: Obtaining public key for ' + personUrl)
personDomain = domain
print('DEBUG: Obtaining public key for ' + person_url)
person_domain = domain
if onion_domain:
if '.onion/' in personUrl:
personDomain = onion_domain
profileStr = 'https://www.w3.org/ns/activitystreams'
asHeader = {
'Accept': 'application/activity+json; profile="' + profileStr + '"'
if '.onion/' in person_url:
person_domain = onion_domain
profile_str = 'https://www.w3.org/ns/activitystreams'
accept_str = \
'application/activity+json; profile="' + profile_str + '"'
as_header = {
'Accept': accept_str
}
personJson = \
person_json = \
get_json(signing_priv_key_pem,
session, personUrl, asHeader, None, debug,
project_version, http_prefix, personDomain)
if not personJson:
session, person_url, as_header, None, debug,
project_version, http_prefix, person_domain)
if not person_json:
return None
pubKey = None
if personJson.get('publicKey'):
if personJson['publicKey'].get('publicKeyPem'):
pubKey = personJson['publicKey']['publicKeyPem']
pub_key = None
if person_json.get('publicKey'):
if person_json['publicKey'].get('publicKeyPem'):
pub_key = person_json['publicKey']['publicKeyPem']
else:
if personJson.get('publicKeyPem'):
pubKey = personJson['publicKeyPem']
if person_json.get('publicKeyPem'):
pub_key = person_json['publicKeyPem']
if not pubKey:
if not pub_key:
if debug:
print('DEBUG: Public key not found for ' + personUrl)
print('DEBUG: Public key not found for ' + person_url)
store_person_in_cache(base_dir, personUrl, personJson, person_cache, True)
return pubKey
store_person_in_cache(base_dir, person_url, person_json,
person_cache, True)
return pub_key

View File

@ -10,27 +10,31 @@ __module_group__ = "RSS Feeds"
import os
import datetime
MAX_TAG_LENGTH = 42
INVALID_HASHTAG_CHARS = (',', ' ', '<', ';', '\\', '"', '&', '#')
def get_hashtag_category(base_dir: str, hashtag: str) -> str:
"""Returns the category for the hashtag
"""
categoryFilename = base_dir + '/tags/' + hashtag + '.category'
if not os.path.isfile(categoryFilename):
categoryFilename = base_dir + '/tags/' + hashtag.title() + '.category'
if not os.path.isfile(categoryFilename):
categoryFilename = \
category_filename = base_dir + '/tags/' + hashtag + '.category'
if not os.path.isfile(category_filename):
category_filename = base_dir + '/tags/' + hashtag.title() + '.category'
if not os.path.isfile(category_filename):
category_filename = \
base_dir + '/tags/' + hashtag.upper() + '.category'
if not os.path.isfile(categoryFilename):
if not os.path.isfile(category_filename):
return ''
categoryStr = None
category_str = None
try:
with open(categoryFilename, 'r') as fp:
categoryStr = fp.read()
with open(category_filename, 'r') as category_file:
category_str = category_file.read()
except OSError:
print('EX: unable to read category ' + categoryFilename)
if categoryStr:
return categoryStr
print('EX: unable to read category ' + category_filename)
if category_str:
return category_str
return ''
@ -39,88 +43,87 @@ def get_hashtag_categories(base_dir: str,
category: str = None) -> None:
"""Returns a dictionary containing hashtag categories
"""
maxTagLength = 42
hashtagCategories = {}
hashtag_categories = {}
if recent:
curr_time = datetime.datetime.utcnow()
daysSinceEpoch = (curr_time - datetime.datetime(1970, 1, 1)).days
recently = daysSinceEpoch - 1
days_since_epoch = (curr_time - datetime.datetime(1970, 1, 1)).days
recently = days_since_epoch - 1
for subdir, dirs, files in os.walk(base_dir + '/tags'):
for f in files:
if not f.endswith('.category'):
for catfile in files:
if not catfile.endswith('.category'):
continue
categoryFilename = os.path.join(base_dir + '/tags', f)
if not os.path.isfile(categoryFilename):
category_filename = os.path.join(base_dir + '/tags', catfile)
if not os.path.isfile(category_filename):
continue
hashtag = f.split('.')[0]
if len(hashtag) > maxTagLength:
hashtag = catfile.split('.')[0]
if len(hashtag) > MAX_TAG_LENGTH:
continue
with open(categoryFilename, 'r') as fp:
categoryStr = fp.read()
with open(category_filename, 'r') as fp_category:
category_str = fp_category.read()
if not categoryStr:
if not category_str:
continue
if category:
# only return a dictionary for a specific category
if categoryStr != category:
if category_str != category:
continue
if recent:
tagsFilename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(tagsFilename):
tags_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(tags_filename):
continue
modTimesinceEpoc = \
os.path.getmtime(tagsFilename)
lastModifiedDate = \
datetime.datetime.fromtimestamp(modTimesinceEpoc)
fileDaysSinceEpoch = \
(lastModifiedDate -
mod_time_since_epoc = \
os.path.getmtime(tags_filename)
last_modified_date = \
datetime.datetime.fromtimestamp(mod_time_since_epoc)
file_days_since_epoch = \
(last_modified_date -
datetime.datetime(1970, 1, 1)).days
if fileDaysSinceEpoch < recently:
if file_days_since_epoch < recently:
continue
if not hashtagCategories.get(categoryStr):
hashtagCategories[categoryStr] = [hashtag]
if not hashtag_categories.get(category_str):
hashtag_categories[category_str] = [hashtag]
else:
if hashtag not in hashtagCategories[categoryStr]:
hashtagCategories[categoryStr].append(hashtag)
if hashtag not in hashtag_categories[category_str]:
hashtag_categories[category_str].append(hashtag)
break
return hashtagCategories
return hashtag_categories
def update_hashtag_categories(base_dir: str) -> None:
"""Regenerates the list of hashtag categories
"""
categoryListFilename = base_dir + '/accounts/categoryList.txt'
hashtagCategories = get_hashtag_categories(base_dir)
if not hashtagCategories:
if os.path.isfile(categoryListFilename):
category_list_filename = base_dir + '/accounts/categoryList.txt'
hashtag_categories = get_hashtag_categories(base_dir)
if not hashtag_categories:
if os.path.isfile(category_list_filename):
try:
os.remove(categoryListFilename)
os.remove(category_list_filename)
except OSError:
print('EX: update_hashtag_categories ' +
'unable to delete cached category list ' +
categoryListFilename)
category_list_filename)
return
categoryList = []
for categoryStr, hashtagList in hashtagCategories.items():
categoryList.append(categoryStr)
categoryList.sort()
category_list = []
for category_str, _ in hashtag_categories.items():
category_list.append(category_str)
category_list.sort()
categoryListStr = ''
for categoryStr in categoryList:
categoryListStr += categoryStr + '\n'
category_list_str = ''
for category_str in category_list:
category_list_str += category_str + '\n'
# save a list of available categories for quick lookup
try:
with open(categoryListFilename, 'w+') as fp:
fp.write(categoryListStr)
with open(category_list_filename, 'w+') as fp_category:
fp_category.write(category_list_str)
except OSError:
print('EX: unable to write category ' + categoryListFilename)
print('EX: unable to write category ' + category_list_filename)
def _valid_hashtag_category(category: str) -> bool:
@ -129,9 +132,8 @@ def _valid_hashtag_category(category: str) -> bool:
if not category:
return False
invalidChars = (',', ' ', '<', ';', '\\', '"', '&', '#')
for ch in invalidChars:
if ch in category:
for char in INVALID_HASHTAG_CHARS:
if char in category:
return False
# too long
@ -149,34 +151,34 @@ def set_hashtag_category(base_dir: str, hashtag: str, category: str,
return False
if not force:
hashtagFilename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
hashtag = hashtag.title()
hashtagFilename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
hashtag = hashtag.upper()
hashtagFilename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtagFilename):
hashtag_filename = base_dir + '/tags/' + hashtag + '.txt'
if not os.path.isfile(hashtag_filename):
return False
if not os.path.isdir(base_dir + '/tags'):
os.mkdir(base_dir + '/tags')
categoryFilename = base_dir + '/tags/' + hashtag + '.category'
category_filename = base_dir + '/tags/' + hashtag + '.category'
if force:
# don't overwrite any existing categories
if os.path.isfile(categoryFilename):
if os.path.isfile(category_filename):
return False
categoryWritten = False
category_written = False
try:
with open(categoryFilename, 'w+') as fp:
fp.write(category)
categoryWritten = True
with open(category_filename, 'w+') as fp_category:
fp_category.write(category)
category_written = True
except OSError as ex:
print('EX: unable to write category ' + categoryFilename +
print('EX: unable to write category ' + category_filename +
' ' + str(ex))
if categoryWritten:
if category_written:
if update:
update_hashtag_categories(base_dir)
return True
@ -184,18 +186,18 @@ def set_hashtag_category(base_dir: str, hashtag: str, category: str,
return False
def guess_hashtag_category(tagName: str, hashtagCategories: {}) -> str:
def guess_hashtag_category(tagName: str, hashtag_categories: {}) -> str:
"""Tries to guess a category for the given hashtag.
This works by trying to find the longest similar hashtag
"""
if len(tagName) < 4:
return ''
categoryMatched = ''
tagMatchedLen = 0
category_matched = ''
tag_matched_len = 0
for categoryStr, hashtagList in hashtagCategories.items():
for hashtag in hashtagList:
for category_str, hashtag_list in hashtag_categories.items():
for hashtag in hashtag_list:
if len(hashtag) < 4:
# avoid matching very small strings which often
# lead to spurious categories
@ -203,13 +205,13 @@ def guess_hashtag_category(tagName: str, hashtagCategories: {}) -> str:
if hashtag not in tagName:
if tagName not in hashtag:
continue
if not categoryMatched:
tagMatchedLen = len(hashtag)
categoryMatched = categoryStr
if not category_matched:
tag_matched_len = len(hashtag)
category_matched = category_str
else:
# match the longest tag
if len(hashtag) > tagMatchedLen:
categoryMatched = categoryStr
if not categoryMatched:
if len(hashtag) > tag_matched_len:
category_matched = category_str
if not category_matched:
return ''
return categoryMatched
return category_matched

263
city.py
View File

@ -22,8 +22,10 @@ PERSON_SHOP = 3
PERSON_EVENING = 4
PERSON_PARTY = 5
BUSY_STATES = (PERSON_WORK, PERSON_SHOP, PERSON_PLAY, PERSON_PARTY)
def _get_decoy_camera(decoySeed: int) -> (str, str, int):
def _get_decoy_camera(decoy_seed: int) -> (str, str, int):
"""Returns a decoy camera make and model which took the photo
"""
cameras = [
@ -82,13 +84,13 @@ def _get_decoy_camera(decoySeed: int) -> (str, str, int):
["Google", "Pixel 3"],
["Google", "Pixel 3a"]
]
randgen = random.Random(decoySeed)
randgen = random.Random(decoy_seed)
index = randgen.randint(0, len(cameras) - 1)
serialNumber = randgen.randint(100000000000, 999999999999999999999999)
return cameras[index][0], cameras[index][1], serialNumber
serial_number = randgen.randint(100000000000, 999999999999999999999999)
return cameras[index][0], cameras[index][1], serial_number
def _get_city_pulse(curr_timeOfDay, decoySeed: int) -> (float, float):
def _get_city_pulse(curr_time_of_day, decoy_seed: int) -> (float, float):
"""This simulates expected average patterns of movement in a city.
Jane or Joe average lives and works in the city, commuting in
and out of the central district for work. They have a unique
@ -97,150 +99,149 @@ def _get_city_pulse(curr_timeOfDay, decoySeed: int) -> (float, float):
Distance from the city centre is in the range 0.0 - 1.0
Angle is in radians
"""
randgen = random.Random(decoySeed)
randgen = random.Random(decoy_seed)
variance = 3
busyStates = (PERSON_WORK, PERSON_SHOP, PERSON_PLAY, PERSON_PARTY)
dataDecoyState = PERSON_SLEEP
weekday = curr_timeOfDay.weekday()
minHour = 7 + randint(0, variance)
maxHour = 17 + randint(0, variance)
if curr_timeOfDay.hour > minHour:
if curr_timeOfDay.hour <= maxHour:
data_decoy_state = PERSON_SLEEP
weekday = curr_time_of_day.weekday()
min_hour = 7 + randint(0, variance)
max_hour = 17 + randint(0, variance)
if curr_time_of_day.hour > min_hour:
if curr_time_of_day.hour <= max_hour:
if weekday < 5:
dataDecoyState = PERSON_WORK
data_decoy_state = PERSON_WORK
elif weekday == 5:
dataDecoyState = PERSON_SHOP
data_decoy_state = PERSON_SHOP
else:
dataDecoyState = PERSON_PLAY
data_decoy_state = PERSON_PLAY
else:
if weekday < 5:
dataDecoyState = PERSON_EVENING
data_decoy_state = PERSON_EVENING
else:
dataDecoyState = PERSON_PARTY
randgen2 = random.Random(decoySeed + dataDecoyState)
angleRadians = \
data_decoy_state = PERSON_PARTY
randgen2 = random.Random(decoy_seed + data_decoy_state)
angle_radians = \
(randgen2.randint(0, 100000) / 100000) * 2 * math.pi
# some people are quite random, others have more predictable habits
decoyRandomness = randgen.randint(1, 3)
decoy_randomness = randgen.randint(1, 3)
# occasionally throw in a wildcard to keep the machine learning guessing
if randint(0, 100) < decoyRandomness:
distanceFromCityCenter = (randint(0, 100000) / 100000)
angleRadians = (randint(0, 100000) / 100000) * 2 * math.pi
if randint(0, 100) < decoy_randomness:
distance_from_city_center = (randint(0, 100000) / 100000)
angle_radians = (randint(0, 100000) / 100000) * 2 * math.pi
else:
# what consitutes the central district is fuzzy
centralDistrictFuzz = (randgen.randint(0, 100000) / 100000) * 0.1
busyRadius = 0.3 + centralDistrictFuzz
if dataDecoyState in busyStates:
central_district_fuzz = (randgen.randint(0, 100000) / 100000) * 0.1
busy_radius = 0.3 + central_district_fuzz
if data_decoy_state in BUSY_STATES:
# if we are busy then we're somewhere in the city center
distanceFromCityCenter = \
(randgen.randint(0, 100000) / 100000) * busyRadius
distance_from_city_center = \
(randgen.randint(0, 100000) / 100000) * busy_radius
else:
# otherwise we're in the burbs
distanceFromCityCenter = busyRadius + \
((1.0 - busyRadius) * (randgen.randint(0, 100000) / 100000))
return distanceFromCityCenter, angleRadians
distance_from_city_center = busy_radius + \
((1.0 - busy_radius) * (randgen.randint(0, 100000) / 100000))
return distance_from_city_center, angle_radians
def parse_nogo_string(nogoLine: str) -> []:
def parse_nogo_string(nogo_line: str) -> []:
"""Parses a line from locations_nogo.txt and returns the polygon
"""
nogoLine = nogoLine.replace('\n', '').replace('\r', '')
polygonStr = nogoLine.split(':', 1)[1]
if ';' in polygonStr:
pts = polygonStr.split(';')
nogo_line = nogo_line.replace('\n', '').replace('\r', '')
polygon_str = nogo_line.split(':', 1)[1]
if ';' in polygon_str:
pts = polygon_str.split(';')
else:
pts = polygonStr.split(',')
pts = polygon_str.split(',')
if len(pts) <= 4:
return []
polygon = []
for index in range(int(len(pts)/2)):
if index*2 + 1 >= len(pts):
break
longitudeStr = pts[index*2].strip()
latitudeStr = pts[index*2 + 1].strip()
if 'E' in latitudeStr or 'W' in latitudeStr:
longitudeStr = pts[index*2 + 1].strip()
latitudeStr = pts[index*2].strip()
if 'E' in longitudeStr:
longitudeStr = \
longitudeStr.replace('E', '')
longitude = float(longitudeStr)
elif 'W' in longitudeStr:
longitudeStr = \
longitudeStr.replace('W', '')
longitude = -float(longitudeStr)
longitude_str = pts[index*2].strip()
latitude_str = pts[index*2 + 1].strip()
if 'E' in latitude_str or 'W' in latitude_str:
longitude_str = pts[index*2 + 1].strip()
latitude_str = pts[index*2].strip()
if 'E' in longitude_str:
longitude_str = \
longitude_str.replace('E', '')
longitude = float(longitude_str)
elif 'W' in longitude_str:
longitude_str = \
longitude_str.replace('W', '')
longitude = -float(longitude_str)
else:
longitude = float(longitudeStr)
latitude = float(latitudeStr)
longitude = float(longitude_str)
latitude = float(latitude_str)
polygon.append([latitude, longitude])
return polygon
def spoof_geolocation(base_dir: str,
city: str, curr_time, decoySeed: int,
citiesList: [],
nogoList: []) -> (float, float, str, str,
str, str, int):
city: str, curr_time, decoy_seed: int,
cities_list: [],
nogo_list: []) -> (float, float, str, str,
str, str, int):
"""Given a city and the current time spoofs the location
for an image
returns latitude, longitude, N/S, E/W,
camera make, camera model, camera serial number
"""
locationsFilename = base_dir + '/custom_locations.txt'
if not os.path.isfile(locationsFilename):
locationsFilename = base_dir + '/locations.txt'
locations_filename = base_dir + '/custom_locations.txt'
if not os.path.isfile(locations_filename):
locations_filename = base_dir + '/locations.txt'
nogoFilename = base_dir + '/custom_locations_nogo.txt'
if not os.path.isfile(nogoFilename):
nogoFilename = base_dir + '/locations_nogo.txt'
nogo_filename = base_dir + '/custom_locations_nogo.txt'
if not os.path.isfile(nogo_filename):
nogo_filename = base_dir + '/locations_nogo.txt'
manCityRadius = 0.1
varianceAtLocation = 0.0004
man_city_radius = 0.1
variance_at_location = 0.0004
default_latitude = 51.8744
default_longitude = 0.368333
default_latdirection = 'N'
default_longdirection = 'W'
if citiesList:
cities = citiesList
if cities_list:
cities = cities_list
else:
if not os.path.isfile(locationsFilename):
if not os.path.isfile(locations_filename):
return (default_latitude, default_longitude,
default_latdirection, default_longdirection,
"", "", 0)
cities = []
try:
with open(locationsFilename, 'r') as f:
cities = f.readlines()
with open(locations_filename, 'r') as loc_file:
cities = loc_file.readlines()
except OSError:
print('EX: unable to read locations ' + locationsFilename)
print('EX: unable to read locations ' + locations_filename)
nogo = []
if nogoList:
nogo = nogoList
if nogo_list:
nogo = nogo_list
else:
if os.path.isfile(nogoFilename):
nogoList = []
if os.path.isfile(nogo_filename):
nogo_list = []
try:
with open(nogoFilename, 'r') as f:
nogoList = f.readlines()
with open(nogo_filename, 'r') as nogo_file:
nogo_list = nogo_file.readlines()
except OSError:
print('EX: unable to read ' + nogoFilename)
for line in nogoList:
print('EX: unable to read ' + nogo_filename)
for line in nogo_list:
if line.startswith(city + ':'):
polygon = parse_nogo_string(line)
if polygon:
nogo.append(polygon)
city = city.lower()
for cityName in cities:
if city in cityName.lower():
cityFields = cityName.split(':')
latitude = cityFields[1]
longitude = cityFields[2]
areaKm2 = 0
if len(cityFields) > 3:
areaKm2 = int(cityFields[3])
for city_name in cities:
if city in city_name.lower():
city_fields = city_name.split(':')
latitude = city_fields[1]
longitude = city_fields[2]
area_km2 = 0
if len(city_fields) > 3:
area_km2 = int(city_fields[3])
latdirection = 'N'
longdirection = 'E'
if 'S' in latitude:
@ -252,56 +253,57 @@ def spoof_geolocation(base_dir: str,
latitude = float(latitude)
longitude = float(longitude)
# get the time of day at the city
approxTimeZone = int(longitude / 15.0)
approx_time_zone = int(longitude / 15.0)
if longdirection == 'E':
approxTimeZone = -approxTimeZone
curr_timeAdjusted = curr_time - \
datetime.timedelta(hours=approxTimeZone)
camMake, camModel, camSerialNumber = \
_get_decoy_camera(decoySeed)
validCoord = False
seedOffset = 0
while not validCoord:
approx_time_zone = -approx_time_zone
curr_time_adjusted = curr_time - \
datetime.timedelta(hours=approx_time_zone)
cam_make, cam_model, cam_serial_number = \
_get_decoy_camera(decoy_seed)
valid_coord = False
seed_offset = 0
while not valid_coord:
# patterns of activity change in the city over time
(distanceFromCityCenter, angleRadians) = \
_get_city_pulse(curr_timeAdjusted, decoySeed + seedOffset)
(distance_from_city_center, angle_radians) = \
_get_city_pulse(curr_time_adjusted,
decoy_seed + seed_offset)
# The city radius value is in longitude and the reference
# is Manchester. Adjust for the radius of the chosen city.
if areaKm2 > 1:
manRadius = math.sqrt(1276 / math.pi)
radius = math.sqrt(areaKm2 / math.pi)
cityRadiusDeg = (radius / manRadius) * manCityRadius
if area_km2 > 1:
man_radius = math.sqrt(1276 / math.pi)
radius = math.sqrt(area_km2 / math.pi)
city_radius_deg = (radius / man_radius) * man_city_radius
else:
cityRadiusDeg = manCityRadius
city_radius_deg = man_city_radius
# Get the position within the city, with some randomness added
latitude += \
distanceFromCityCenter * cityRadiusDeg * \
math.cos(angleRadians)
distance_from_city_center * city_radius_deg * \
math.cos(angle_radians)
longitude += \
distanceFromCityCenter * cityRadiusDeg * \
math.sin(angleRadians)
distance_from_city_center * city_radius_deg * \
math.sin(angle_radians)
longval = longitude
if longdirection == 'W':
longval = -longitude
validCoord = not point_in_nogo(nogo, latitude, longval)
if not validCoord:
seedOffset += 1
if seedOffset > 100:
valid_coord = not point_in_nogo(nogo, latitude, longval)
if not valid_coord:
seed_offset += 1
if seed_offset > 100:
break
# add a small amount of variance around the location
fraction = randint(0, 100000) / 100000
distanceFromLocation = fraction * fraction * varianceAtLocation
distance_from_location = fraction * fraction * variance_at_location
fraction = randint(0, 100000) / 100000
angleFromLocation = fraction * 2 * math.pi
latitude += distanceFromLocation * math.cos(angleFromLocation)
longitude += distanceFromLocation * math.sin(angleFromLocation)
angle_from_location = fraction * 2 * math.pi
latitude += distance_from_location * math.cos(angle_from_location)
longitude += distance_from_location * math.sin(angle_from_location)
# gps locations aren't transcendental, so round to a fixed
# number of decimal places
latitude = int(latitude * 100000) / 100000.0
longitude = int(longitude * 100000) / 100000.0
return (latitude, longitude, latdirection, longdirection,
camMake, camModel, camSerialNumber)
cam_make, cam_model, cam_serial_number)
return (default_latitude, default_longitude,
default_latdirection, default_longdirection,
@ -314,33 +316,34 @@ def get_spoofed_city(city: str, base_dir: str,
image metadata
"""
city = ''
cityFilename = acct_dir(base_dir, nickname, domain) + '/city.txt'
if os.path.isfile(cityFilename):
city_filename = acct_dir(base_dir, nickname, domain) + '/city.txt'
if os.path.isfile(city_filename):
try:
with open(cityFilename, 'r') as fp:
city = fp.read().replace('\n', '')
with open(city_filename, 'r') as city_file:
city = city_file.read().replace('\n', '')
except OSError:
print('EX: unable to read ' + cityFilename)
print('EX: unable to read ' + city_filename)
return city
def _point_in_polygon(poly: [], x: float, y: float) -> bool:
def _point_in_polygon(poly: [], x_coord: float, y_coord: float) -> bool:
"""Returns true if the given point is inside the given polygon
"""
n = len(poly)
num = len(poly)
inside = False
p2x = 0.0
p2y = 0.0
xints = 0.0
p1x, p1y = poly[0]
for i in range(n + 1):
p2x, p2y = poly[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
for i in range(num + 1):
p2x, p2y = poly[i % num]
if y_coord > min(p1y, p2y):
if y_coord <= max(p1y, p2y):
if x_coord <= max(p1x, p2x):
if p1y != p2y:
xints = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xints:
xints = \
(y_coord - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x_coord <= xints:
inside = not inside
p1x, p1y = p2x, p2y