mirror of https://gitlab.com/bashrc2/epicyon
				
				
				
			
		
			
				
	
	
		
			1223 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			1223 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Python
		
	
	
| __filename__ = "maps.py"
 | |
| __author__ = "Bob Mottram"
 | |
| __license__ = "AGPL3+"
 | |
| __version__ = "1.6.0"
 | |
| __maintainer__ = "Bob Mottram"
 | |
| __email__ = "bob@libreserver.org"
 | |
| __status__ = "Production"
 | |
| __module_group__ = "Core"
 | |
| 
 | |
| 
 | |
| import os
 | |
| from flags import is_float
 | |
| from utils import resembles_url
 | |
| from utils import browser_supports_download_filename
 | |
| from utils import get_url_from_post
 | |
| from utils import acct_dir
 | |
| from utils import load_json
 | |
| from utils import save_json
 | |
| from utils import locate_post
 | |
| from utils import remove_html
 | |
| from utils import has_object_dict
 | |
| from utils import date_utcnow
 | |
| from utils import date_epoch
 | |
| from utils import date_from_string_format
 | |
| from session import get_resolved_url
 | |
| 
 | |
| 
 | |
| def _geocoords_to_osm_link(osm_domain: str, zoom: int,
 | |
|                            latitude: float, longitude: float) -> str:
 | |
|     """Returns an OSM link for the given geocoordinates
 | |
|     """
 | |
|     return 'https://www.' + osm_domain + '/#map=' + \
 | |
|         str(zoom) + '/' + str(latitude) + '/' + str(longitude)
 | |
| 
 | |
| 
 | |
| def get_location_dict_from_tags(tags: []) -> str:
 | |
|     """Returns the location from the tags list
 | |
|     """
 | |
|     for tag_item in tags:
 | |
|         if not tag_item.get('type'):
 | |
|             continue
 | |
|         if tag_item['type'] != 'Place':
 | |
|             continue
 | |
|         if not tag_item.get('name'):
 | |
|             continue
 | |
|         if not isinstance(tag_item['name'], str):
 | |
|             continue
 | |
|         return tag_item
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _get_event_dict_from_tags(tags: []) -> str:
 | |
|     """Returns the event from the tags list
 | |
|     """
 | |
|     for tag_item in tags:
 | |
|         if not tag_item.get('type'):
 | |
|             continue
 | |
|         if tag_item['type'] != 'Event':
 | |
|             continue
 | |
|         if not tag_item.get('startTime'):
 | |
|             continue
 | |
|         if not isinstance(tag_item['startTime'], str):
 | |
|             continue
 | |
|         return tag_item
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _location_address_from_dict(location: {}) -> str:
 | |
|     """returns location address as a string
 | |
|     """
 | |
|     location_str = ''
 | |
|     address_section_name = (
 | |
|         'streetAddress',
 | |
|         'addressLocality',
 | |
|         'addressRegion',
 | |
|         'postalCode',
 | |
|         'addressCountry'
 | |
|     )
 | |
|     for addr_section in address_section_name:
 | |
|         if location.get(addr_section):
 | |
|             if isinstance(addr_section, str):
 | |
|                 if location_str:
 | |
|                     location_str += ', '
 | |
|                 location_str += location[addr_section]
 | |
|     return location_str
 | |
| 
 | |
| 
 | |
| def _get_location_from_tags(tags: []) -> str:
 | |
|     """Returns the location from the tags list
 | |
|     """
 | |
|     locn = get_location_dict_from_tags(tags)
 | |
|     if locn:
 | |
|         location_str = locn['name'].replace('\n', ' ')
 | |
|         location_str = remove_html(location_str)
 | |
|         if locn.get('url'):
 | |
|             # location name and link
 | |
|             if isinstance(locn['url'], str):
 | |
|                 if resembles_url(locn['url']):
 | |
|                     location_str = \
 | |
|                         '<a href="' + locn['url'] + '" target="_blank" ' + \
 | |
|                         'rel="nofollow noopener noreferrer">' + \
 | |
|                         location_str + '</a>'
 | |
|         if locn.get('address'):
 | |
|             # location name and address
 | |
|             if isinstance(locn['address'], str):
 | |
|                 locn_address = remove_html(locn['address'])
 | |
|                 locn_address = locn_address.replace(', ', '<br>')
 | |
|                 location_str += '<br><address>' + locn_address + '</address>'
 | |
|             elif isinstance(locn['address'], dict):
 | |
|                 locn_address = \
 | |
|                     _location_address_from_dict(locn['address'])
 | |
|         return location_str
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _get_category_from_tags(tags: [], translate: {}) -> str:
 | |
|     """Returns the location category from the tags list
 | |
|     """
 | |
|     evnt = _get_event_dict_from_tags(tags)
 | |
|     if evnt:
 | |
|         if evnt.get('category'):
 | |
|             if isinstance(evnt['category'], str):
 | |
|                 category_str = remove_html(evnt['category'])
 | |
|                 if translate.get(category_str):
 | |
|                     return translate[category_str]
 | |
|             if isinstance(evnt['category'], list):
 | |
|                 category_str = ''
 | |
|                 for category_item in evnt['category']:
 | |
|                     if not isinstance(category_item, str):
 | |
|                         continue
 | |
|                     if not translate.get(category_item):
 | |
|                         continue
 | |
|                     if category_item:
 | |
|                         category_str += ', '
 | |
|                     category_str += translate[category_item]
 | |
|                 return category_str
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def _get_event_time_span_from_tags(tags: []) -> (str, str, str, str):
 | |
|     """Returns the event time span from the tags list
 | |
|     """
 | |
|     evnt = _get_event_dict_from_tags(tags)
 | |
|     if evnt:
 | |
|         start_time = end_time = ''
 | |
|         if evnt.get('startTime'):
 | |
|             if not isinstance(evnt['startTime'], str):
 | |
|                 return None, None, None, None
 | |
|             start_time = remove_html(evnt['startTime'])
 | |
|             if 'T' not in start_time:
 | |
|                 return None, None, None, None
 | |
|             start_date_str = start_time.split('T')[0]
 | |
|             start_time_str = start_time.split('T')[1]
 | |
|             if ':' not in start_time_str:
 | |
|                 return None, None, None, None
 | |
|             if '+' in start_time_str:
 | |
|                 start_time_str = start_time_str.split('+')[0]
 | |
|             if '-' in start_time_str:
 | |
|                 start_time_str = start_time_str.split('-')[0]
 | |
|             start_time_sections = start_time_str.split(':')
 | |
|             if len(start_time_sections) < 2:
 | |
|                 return None, None, None, None
 | |
|             start_time_str = \
 | |
|                 start_time_sections[0] + ':' + start_time_sections[1]
 | |
|             end_date_str = end_time_str = ''
 | |
|             if evnt.get('endTime'):
 | |
|                 if isinstance(evnt['endTime'], str):
 | |
|                     end_time = remove_html(evnt['endTime'])
 | |
|                     if 'T' in end_time:
 | |
|                         end_date_str = end_time.split('T')[0]
 | |
|                         end_time_str = end_time.split('T')[1]
 | |
|                         if '+' in end_time_str:
 | |
|                             end_time_str = end_time_str.split('+')[0]
 | |
|                         if '-' in end_time_str:
 | |
|                             end_time_str = end_time_str.split('-')[0]
 | |
|                         if ':' in end_time_str:
 | |
|                             end_time_sections = end_time_str.split(':')
 | |
|                             if len(end_time_sections) >= 2:
 | |
|                                 end_time_str = \
 | |
|                                     end_time_sections[0] + ':' + \
 | |
|                                     end_time_sections[1]
 | |
|                         return start_date_str, start_time_str, \
 | |
|                             end_date_str, end_time_str
 | |
|             return start_date_str, start_time_str, None, None
 | |
|     return None, None, None, None
 | |
| 
 | |
| 
 | |
| def html_address_book_list(base_dir: str, nickname: str, domain: str) -> str:
 | |
|     """Creates a list of potential addresses when creating a new post
 | |
|     with a location
 | |
|     """
 | |
|     list_str = '<datalist id="addressbook">\n'
 | |
|     address_book_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/addresses.json'
 | |
|     address_book_dict = {}
 | |
|     if os.path.isfile(address_book_filename):
 | |
|         address_book_dict2 = load_json(address_book_filename)
 | |
|         if address_book_dict2:
 | |
|             address_book_dict = address_book_dict2
 | |
| 
 | |
|     addresses_list = []
 | |
|     for _, address in address_book_dict.items():
 | |
|         addresses_list.append(address)
 | |
|     addresses_list.sort()
 | |
|     if addresses_list:
 | |
|         for addr in addresses_list:
 | |
|             if not addr:
 | |
|                 continue
 | |
|             list_str += '<option>' + addr + '</option>\n'
 | |
|     list_str += '</datalist>\n'
 | |
|     return list_str
 | |
| 
 | |
| 
 | |
| def update_address_book(base_dir: str, nickname: str, domain: str,
 | |
|                         location: str, address: str) -> None:
 | |
|     """Adds an address to the address book for the given account
 | |
|     """
 | |
|     address_book_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/addresses.json'
 | |
|     address_book_dict = {}
 | |
|     if os.path.isfile(address_book_filename):
 | |
|         address_book_dict2 = load_json(address_book_filename)
 | |
|         if address_book_dict2:
 | |
|             address_book_dict = address_book_dict2
 | |
|     address = remove_html(address)
 | |
|     if address_book_dict.get(location):
 | |
|         if address_book_dict[location] == address:
 | |
|             # already exists so we don't need to update
 | |
|             return
 | |
|     address_book_dict[location] = address
 | |
|     save_json(address_book_dict, address_book_filename)
 | |
| 
 | |
| 
 | |
| def get_location_from_post(post_json_object: {}) -> str:
 | |
|     """Returns the location for the given post
 | |
|     """
 | |
|     locn = None
 | |
|     locn_url = None
 | |
|     locn_address = None
 | |
| 
 | |
|     # location represented via a tag
 | |
|     post_obj = post_json_object
 | |
|     if has_object_dict(post_json_object):
 | |
|         post_obj = post_json_object['object']
 | |
|     if post_obj.get('tag'):
 | |
|         if isinstance(post_obj['tag'], list):
 | |
|             locn = _get_location_from_tags(post_obj['tag'])
 | |
| 
 | |
|     # location representation used by pixelfed
 | |
|     locn_exists = False
 | |
|     locn2 = None
 | |
|     if post_obj.get('location'):
 | |
|         locn2 = post_obj['location']
 | |
|         if isinstance(locn2, dict):
 | |
|             if locn2.get('longitude') and \
 | |
|                locn2.get('latitude'):
 | |
|                 if isinstance(locn2['longitude'], str) and \
 | |
|                    isinstance(locn2['latitude'], str):
 | |
|                     if is_float(locn2['longitude']) and \
 | |
|                        is_float(locn2['latitude']):
 | |
|                         locn_exists = True
 | |
|             if not locn_exists:
 | |
|                 if locn2.get('name'):
 | |
|                     if isinstance(locn2['name'], str):
 | |
|                         locn = locn2['name']
 | |
|             if locn2.get('url'):
 | |
|                 if isinstance(locn2['url'], str):
 | |
|                     locn_url = locn2['url']
 | |
|             if locn2.get('address'):
 | |
|                 if isinstance(locn2['address'], str):
 | |
|                     locn_address = remove_html(locn2['address'])
 | |
|                     locn_address = locn_address.replace(', ', '<br>')
 | |
|                 elif isinstance(locn2['address'], dict):
 | |
|                     locn_address = \
 | |
|                         _location_address_from_dict(locn2['address'])
 | |
|     if locn_exists:
 | |
|         # location geocoordinate
 | |
|         osm_domain = 'osm.org'
 | |
|         zoom = 17
 | |
|         locn = _geocoords_to_osm_link(osm_domain, zoom,
 | |
|                                       locn2['latitude'],
 | |
|                                       locn2['longitude'])
 | |
|     elif locn_url:
 | |
|         # location name and link
 | |
|         if locn:
 | |
|             if '<a href=' not in locn:
 | |
|                 locn = '<a href="' + locn_url + '" target="_blank" ' + \
 | |
|                     'rel="nofollow noopener noreferrer">' + locn + '</a>'
 | |
|             else:
 | |
|                 locn = locn_url
 | |
|     if locn_address:
 | |
|         # location name and address
 | |
|         if locn:
 | |
|             if '<address>' not in locn:
 | |
|                 locn += '<br><address>' + locn_address + '</address>'
 | |
|         else:
 | |
|             locn = '<address>' + locn_address + '</address>'
 | |
| 
 | |
|     return locn
 | |
| 
 | |
| 
 | |
| def get_category_from_post(post_json_object: {}, translate: {}) -> str:
 | |
|     """Returns the location category for the given post
 | |
|     """
 | |
|     catstr = ''
 | |
| 
 | |
|     # location represented via a tag
 | |
|     post_obj = post_json_object
 | |
|     if has_object_dict(post_json_object):
 | |
|         post_obj = post_json_object['object']
 | |
|     if post_obj.get('tag'):
 | |
|         if isinstance(post_obj['tag'], list):
 | |
|             catstr = _get_category_from_tags(post_obj['tag'], translate)
 | |
| 
 | |
|     if not catstr:
 | |
|         if post_obj.get('category'):
 | |
|             text = post_obj['category']
 | |
|             if isinstance(text, str):
 | |
|                 if translate.get(text):
 | |
|                     catstr = translate[text]
 | |
|             elif isinstance(text, list):
 | |
|                 catstr = ''
 | |
|                 for cat_text in text:
 | |
|                     if not isinstance(cat_text, str):
 | |
|                         continue
 | |
|                     if not translate.get(cat_text):
 | |
|                         continue
 | |
|                     if catstr:
 | |
|                         catstr += ', '
 | |
|                     catstr += translate[cat_text]
 | |
|     return catstr
 | |
| 
 | |
| 
 | |
| def get_event_time_span_from_post(post_json_object: {}) -> str:
 | |
|     """Returns the event start and end time for the given post
 | |
|     """
 | |
|     start_time = end_time = ''
 | |
|     start_date_str = start_time_str = end_date_str = end_time_str = ''
 | |
| 
 | |
|     # location represented via a tag
 | |
|     post_obj = post_json_object
 | |
|     if has_object_dict(post_json_object):
 | |
|         post_obj = post_json_object['object']
 | |
|     if post_obj.get('tag'):
 | |
|         if isinstance(post_obj['tag'], list):
 | |
|             start_date_str, start_time_str, end_date_str, end_time_str = \
 | |
|                 _get_event_time_span_from_tags(post_obj['tag'])
 | |
| 
 | |
|     # check for time span within the post object
 | |
|     if not start_date_str and post_obj.get('startTime'):
 | |
|         if not isinstance(post_obj['startTime'], str):
 | |
|             return None, None, None, None
 | |
|         start_time = remove_html(post_obj['startTime'])
 | |
|         if 'T' not in start_time:
 | |
|             return None, None, None, None
 | |
|         start_date_str = start_time.split('T')[0]
 | |
|         start_time_str = start_time.split('T')[1]
 | |
|         if ':' not in start_time_str:
 | |
|             return None, None, None, None
 | |
|         if '+' in start_time_str:
 | |
|             start_time_str = start_time_str.split('+')[0]
 | |
|         if '-' in start_time_str:
 | |
|             start_time_str = start_time_str.split('-')[0]
 | |
|         start_time_sections = start_time_str.split(':')
 | |
|         if len(start_time_sections) < 2:
 | |
|             return None, None, None, None
 | |
|         start_time_str = \
 | |
|             start_time_sections[0] + ':' + start_time_sections[1]
 | |
|         end_date_str = end_time_str = ''
 | |
|         if post_obj.get('endTime'):
 | |
|             if isinstance(post_obj['endTime'], str):
 | |
|                 end_time = remove_html(post_obj['endTime'])
 | |
|                 if 'T' in end_time:
 | |
|                     end_date_str = end_time.split('T')[0]
 | |
|                     end_time_str = end_time.split('T')[1]
 | |
|                     if '+' in end_time_str:
 | |
|                         end_time_str = end_time_str.split('+')[0]
 | |
|                     if '-' in end_time_str:
 | |
|                         end_time_str = end_time_str.split('-')[0]
 | |
|                     if ':' in end_time_str:
 | |
|                         end_time_sections = end_time_str.split(':')
 | |
|                         if len(end_time_sections) >= 2:
 | |
|                             end_time_str = \
 | |
|                                 end_time_sections[0] + ':' + \
 | |
|                                 end_time_sections[1]
 | |
| 
 | |
|     if start_date_str:
 | |
|         if not end_date_str:
 | |
|             return '<time datetime="' + start_time + '">' + \
 | |
|                 start_date_str + ' ' + start_time_str + '</time>'
 | |
|         if start_date_str == end_date_str:
 | |
|             return '<time datetime="' + start_time + '">' + \
 | |
|                 start_date_str + ' ' + start_time_str + '</time> - ' + \
 | |
|                 '<time datetime="' + end_time + '">' + end_time_str + '</time>'
 | |
|         return '<time datetime="' + start_time + '">' + \
 | |
|             start_date_str + ' ' + start_time_str + '</time> - ' + \
 | |
|             '<time datetime="' + end_time + '">' + \
 | |
|             end_date_str + ' ' + end_time_str + '</time>'
 | |
| 
 | |
|     return ''
 | |
| 
 | |
| 
 | |
| def _geocoords_from_osm_link(url: str, osm_domain: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from an OSM map link
 | |
|     """
 | |
|     if osm_domain not in url:
 | |
|         return None, None, None
 | |
|     if '#map=' not in url:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords_str = url.split('#map=')[1]
 | |
|     if '/' not in coords_str:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords = coords_str.split('/')
 | |
|     if len(coords) != 3:
 | |
|         return None, None, None
 | |
|     zoom = coords[0]
 | |
|     if not zoom.isdigit():
 | |
|         return None, None, None
 | |
|     latitude = coords[1]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
|     longitude = coords[2]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_osmorg_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from an OSM map link
 | |
|     """
 | |
|     osm_domain = 'osm.org'
 | |
|     if osm_domain not in url:
 | |
|         return None, None, None
 | |
|     if 'mlat=' not in url:
 | |
|         return None, None, None
 | |
|     if 'mlon=' not in url:
 | |
|         return None, None, None
 | |
|     if 'zoom=' not in url and '#map=' not in url:
 | |
|         return None, None, None
 | |
| 
 | |
|     latitude = url.split('mlat=')[1]
 | |
|     if '&' in latitude:
 | |
|         latitude = latitude.split('&')[0]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
| 
 | |
|     longitude = url.split('mlon=')[1]
 | |
|     if '&' in longitude:
 | |
|         longitude = longitude.split('&')[0]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
| 
 | |
|     if 'zoom=' in url:
 | |
|         zoom = url.split('zoom=')[1]
 | |
|         if '&' in zoom:
 | |
|             zoom = zoom.split('&')[0]
 | |
|     else:
 | |
|         zoom = url.split('#map=')[1]
 | |
|         if '/' in zoom:
 | |
|             zoom = zoom.split('/')[0]
 | |
| 
 | |
|     if not zoom.isdigit():
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_osmorg_go_link(url: str, session) -> (int, float, float):
 | |
|     """Returns geocoordinates from an OSM go map link
 | |
|     """
 | |
|     osm_domain = 'osm.org'
 | |
|     if osm_domain not in url:
 | |
|         return None, None, None
 | |
|     if 'mlat=' in url:
 | |
|         return None, None, None
 | |
|     if 'mlon=' in url:
 | |
|         return None, None, None
 | |
|     if '/go/' not in url:
 | |
|         return None, None, None
 | |
| 
 | |
|     # resolve url equivalent to
 | |
|     # curl -Ls -o /dev/null -w %{url_effective} [url]
 | |
|     resolved_url = get_resolved_url(session, url)
 | |
| 
 | |
|     if not resolved_url:
 | |
|         return None, None, None
 | |
| 
 | |
|     if 'osm.org' in resolved_url:
 | |
|         (zoom, latitude, longitude) = \
 | |
|             _geocoords_from_osmorg_link(resolved_url)
 | |
|     else:
 | |
|         (zoom, latitude, longitude) = \
 | |
|             _geocoords_from_osm_link(resolved_url, 'openstreetmap.org')
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_osmand_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from an OSM android map link
 | |
|     """
 | |
|     latitude = None
 | |
|     longitude = None
 | |
|     zoom = 10
 | |
| 
 | |
|     if 'pin=' in url:
 | |
|         pin_coords_str = url.split('pin=')[1]
 | |
|         if ',' in pin_coords_str:
 | |
|             latitude_str = pin_coords_str.split(',')[0]
 | |
|             longitude_str = pin_coords_str.split(',')[1]
 | |
|             if is_float(latitude_str) and is_float(longitude_str):
 | |
|                 latitude = float(latitude_str)
 | |
|                 longitude = float(longitude_str)
 | |
| 
 | |
|     if '#' in url:
 | |
|         coords_str = url.split('#')[1]
 | |
|         if '/' in coords_str:
 | |
|             sections = coords_str.split('/')
 | |
|             if len(sections) == 3:
 | |
|                 zoom_str = sections[0]
 | |
|                 latitude_str = sections[1]
 | |
|                 longitude_str = sections[2]
 | |
|                 if zoom_str.isnumeric() and \
 | |
|                    is_float(latitude_str) and \
 | |
|                    is_float(longitude_str):
 | |
|                     latitude = float(latitude_str)
 | |
|                     longitude = float(longitude_str)
 | |
|                     zoom = int(zoom_str)
 | |
| 
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_geo_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from an geo link
 | |
|     https://en.wikipedia.org/wiki/Geo_URI_scheme
 | |
|     """
 | |
|     latitude = None
 | |
|     longitude = None
 | |
|     zoom = 10
 | |
| 
 | |
|     coords_str = url.split('geo:')[1]
 | |
|     if ',' in coords_str:
 | |
|         coords_sections = coords_str.split(',')
 | |
|         if len(coords_sections) >= 2:
 | |
|             latitude_str = coords_sections[0]
 | |
|             longitude_str = coords_sections[1]
 | |
|             if ';' in longitude_str:
 | |
|                 longitude_str = longitude_str.split(';')[0]
 | |
|             if '?' in longitude_str:
 | |
|                 longitude_str = longitude_str.split('?')[0]
 | |
|             if ' ' in longitude_str:
 | |
|                 longitude_str = longitude_str.split(' ')[0]
 | |
|             if is_float(latitude_str) and is_float(longitude_str):
 | |
|                 latitude = float(latitude_str)
 | |
|                 longitude = float(longitude_str)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_gmaps_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from a Gmaps link
 | |
|     """
 | |
|     if '/maps/' not in url:
 | |
|         return None, None, None
 | |
|     coords_str = url.split('/maps', 1)[1]
 | |
|     if '/@' not in coords_str and '/place/' not in coords_str:
 | |
|         return None, None, None
 | |
| 
 | |
|     if '/@' in coords_str:
 | |
|         coords_str = coords_str.split('/@', 1)[1]
 | |
|     else:
 | |
|         coords_str = coords_str.split('/place/', 1)[1]
 | |
| 
 | |
|     # NOTE: zoom may have been replaced by metres elevation
 | |
|     zoom_exists = False
 | |
|     if 'z' in coords_str:
 | |
|         coords_str = coords_str.split('z', 1)[0]
 | |
|         zoom_exists = True
 | |
| 
 | |
|     if ',' not in coords_str:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords = coords_str.split(',')
 | |
|     if len(coords) not in (2, 3):
 | |
|         return None, None, None
 | |
|     zoom = '100'
 | |
|     if zoom_exists:
 | |
|         zoom = coords[2]
 | |
|         if not zoom.isdigit():
 | |
|             if is_float(str(zoom)):
 | |
|                 zoom = int(float(str(zoom)))
 | |
|             else:
 | |
|                 return None, None, None
 | |
|     latitude = coords[0]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
|     longitude = coords[1]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_bmaps_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from a bing map link
 | |
|     """
 | |
|     prefixes = ('/maps?cp=', '/maps/directions?cp=')
 | |
|     map_prefix = None
 | |
|     for prefix in prefixes:
 | |
|         if prefix in url:
 | |
|             map_prefix = prefix
 | |
|             break
 | |
|     if not map_prefix:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords_str = url.split(map_prefix)[1]
 | |
|     if '~' not in coords_str:
 | |
|         return None, None, None
 | |
|     orig_coords_str = coords_str
 | |
|     if '&' in coords_str:
 | |
|         coords_str = coords_str.split('&')[0]
 | |
|     if ';' in coords_str:
 | |
|         coords_str = coords_str.split(';')[0]
 | |
| 
 | |
|     coords = coords_str.split('~')
 | |
|     if len(coords) != 2:
 | |
|         return None, None, None
 | |
|     latitude = coords[0]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
|     longitude = coords[1]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
|     zoom = 17
 | |
|     if 'lvl=' in orig_coords_str:
 | |
|         zoom = orig_coords_str.split('lvl=')[1]
 | |
|         if '&' in zoom:
 | |
|             zoom = zoom.split('&')[0]
 | |
|         if ';' in zoom:
 | |
|             zoom = zoom.split(';')[0]
 | |
|     if not zoom.isdigit():
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_waze_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from a waze map link
 | |
|     """
 | |
|     prefixes = ['/ul?ll=']
 | |
|     map_prefix = None
 | |
|     for prefix in prefixes:
 | |
|         if prefix in url:
 | |
|             map_prefix = prefix
 | |
|             break
 | |
|     if not map_prefix:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords_str = url.split(map_prefix)[1]
 | |
|     orig_coords_str = coords_str
 | |
|     if '&' in coords_str:
 | |
|         coords_str = coords_str.split('&')[0]
 | |
|     if '%2C' not in coords_str and ',' not in coords_str:
 | |
|         return None, None, None
 | |
| 
 | |
|     if '%2C' in coords_str:
 | |
|         coords = coords_str.split('%2C')
 | |
|     else:
 | |
|         coords = coords_str.split(',')
 | |
|     if len(coords) != 2:
 | |
|         return None, None, None
 | |
|     latitude = coords[0]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
|     longitude = coords[1]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
|     zoom = 17
 | |
|     if 'zoom=' in orig_coords_str:
 | |
|         zoom = orig_coords_str.split('zoom=')[1]
 | |
|         if '&' in zoom:
 | |
|             zoom = zoom.split('&')[0]
 | |
|     if not zoom.isdigit():
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def _geocoords_from_wego_link(url: str) -> (int, float, float):
 | |
|     """Returns geocoordinates from a wego map link
 | |
|     """
 | |
|     prefixes = ['/?map=']
 | |
|     map_prefix = None
 | |
|     for prefix in prefixes:
 | |
|         if prefix in url:
 | |
|             map_prefix = prefix
 | |
|             break
 | |
|     if not map_prefix:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords_str = url.split(map_prefix)[1]
 | |
|     if ',' not in coords_str:
 | |
|         return None, None, None
 | |
| 
 | |
|     coords = coords_str.split(',')
 | |
|     if len(coords) < 3:
 | |
|         return None, None, None
 | |
|     latitude = coords[0]
 | |
|     if not is_float(latitude):
 | |
|         return None, None, None
 | |
|     longitude = coords[1]
 | |
|     if not is_float(longitude):
 | |
|         return None, None, None
 | |
|     zoom = coords[2]
 | |
|     if not zoom.isdigit():
 | |
|         return None, None, None
 | |
|     zoom = int(zoom)
 | |
|     latitude = float(latitude)
 | |
|     longitude = float(longitude)
 | |
|     return zoom, latitude, longitude
 | |
| 
 | |
| 
 | |
| def geocoords_from_map_link(url: str, osm_domain: str,
 | |
|                             session) -> (int, float, float):
 | |
|     """Returns geocoordinates from a map link url
 | |
|     """
 | |
|     if osm_domain in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_osm_link(url, osm_domain)
 | |
|         return zoom, latitude, longitude
 | |
|     if 'osm.org' in url and 'mlat=' not in url and '/go/' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_osmorg_go_link(url, session)
 | |
|         return zoom, latitude, longitude
 | |
|     if 'osm.org' in url and 'mlat=' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_osmorg_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if 'osmand.net' in url and '/map' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_osmand_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if '.google.co' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_gmaps_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if '.bing.co' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_bmaps_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if '.waze.co' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_waze_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if 'wego.here.co' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_wego_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     if 'geo:' in url and ',' in url:
 | |
|         zoom, latitude, longitude = \
 | |
|             _geocoords_from_geo_link(url)
 | |
|         return zoom, latitude, longitude
 | |
|     return None, None, None
 | |
| 
 | |
| 
 | |
| def html_open_street_map(url: str,
 | |
|                          bounding_box_degrees: float,
 | |
|                          translate: {}, session,
 | |
|                          session_onion, session_i2p,
 | |
|                          width: str = "725",
 | |
|                          height: str = "650") -> str:
 | |
|     """Returns embed html for an OSM link
 | |
|     """
 | |
|     osm_domain = 'openstreetmap.org'
 | |
|     map_session = session
 | |
|     if '.onion/' in url:
 | |
|         map_session = session_onion
 | |
|     elif '.i2p/' in url:
 | |
|         map_session = session_i2p
 | |
|     zoom, latitude, longitude = \
 | |
|         geocoords_from_map_link(url, osm_domain, map_session)
 | |
|     if not latitude:
 | |
|         return ''
 | |
|     if not longitude:
 | |
|         return ''
 | |
|     if not zoom:
 | |
|         return ''
 | |
|     osm_url = _geocoords_to_osm_link(osm_domain, zoom, latitude, longitude)
 | |
|     html_str = \
 | |
|         '<iframe width="' + width + '" height="' + height + \
 | |
|         '" frameborder="0" ' + \
 | |
|         'scrolling="no" marginheight="0" marginwidth="0" ' + \
 | |
|         'src="https://www.' + osm_domain + '/export/embed.html?' + \
 | |
|         'bbox=' + str(longitude - bounding_box_degrees) + \
 | |
|         '%2C' + \
 | |
|         str(latitude - bounding_box_degrees) + \
 | |
|         '%2C' + \
 | |
|         str(longitude + bounding_box_degrees) + \
 | |
|         '%2C' + \
 | |
|         str(latitude + bounding_box_degrees) + \
 | |
|         '&layer=mapnik" style="border: 1px solid black" ' + \
 | |
|         'sandbox="allow-scripts allow-same-origin">' + \
 | |
|         '</iframe><br/><small><a href="' + osm_url + \
 | |
|         '">' + translate['View Larger Map'] + '</a></small>\n'
 | |
|     return html_str
 | |
| 
 | |
| 
 | |
| def set_map_preferences_url(base_dir: str, nickname: str, domain: str,
 | |
|                             maps_website_url: str) -> None:
 | |
|     """Sets the preferred maps website for an account
 | |
|     """
 | |
|     maps_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/map_preferences.json'
 | |
|     if os.path.isfile(maps_filename):
 | |
|         maps_json = load_json(maps_filename)
 | |
|         maps_json['url'] = maps_website_url
 | |
|     else:
 | |
|         maps_json = {
 | |
|             'url': maps_website_url
 | |
|         }
 | |
|     save_json(maps_json, maps_filename)
 | |
| 
 | |
| 
 | |
| def get_map_preferences_url(base_dir: str, nickname: str, domain: str) -> str:
 | |
|     """Gets the preferred maps website for an account
 | |
|     """
 | |
|     maps_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/map_preferences.json'
 | |
|     if os.path.isfile(maps_filename):
 | |
|         maps_json = load_json(maps_filename)
 | |
|         if maps_json.get('url'):
 | |
|             url_str = get_url_from_post(maps_json['url'])
 | |
|             return remove_html(url_str)
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def set_map_preferences_coords(base_dir: str, nickname: str, domain: str,
 | |
|                                latitude: float, longitude: float,
 | |
|                                zoom: int) -> None:
 | |
|     """Sets the preferred maps website coordinates for an account
 | |
|     """
 | |
|     maps_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/map_preferences.json'
 | |
|     if os.path.isfile(maps_filename):
 | |
|         maps_json = load_json(maps_filename)
 | |
|         maps_json['latitude'] = latitude
 | |
|         maps_json['longitude'] = longitude
 | |
|         maps_json['zoom'] = zoom
 | |
|     else:
 | |
|         maps_json = {
 | |
|             'latitude': latitude,
 | |
|             'longitude': longitude,
 | |
|             'zoom': zoom
 | |
|         }
 | |
|     save_json(maps_json, maps_filename)
 | |
| 
 | |
| 
 | |
| def get_map_preferences_coords(base_dir: str, nickname: str,
 | |
|                                domain: str) -> (float, float, int):
 | |
|     """Gets the preferred maps website coordinates for an account
 | |
|     """
 | |
|     maps_filename = \
 | |
|         acct_dir(base_dir, nickname, domain) + '/map_preferences.json'
 | |
|     if os.path.isfile(maps_filename):
 | |
|         maps_json = load_json(maps_filename)
 | |
|         if maps_json.get('latitude') and \
 | |
|            maps_json.get('longitude') and \
 | |
|            maps_json.get('zoom'):
 | |
|             return maps_json['latitude'], \
 | |
|                 maps_json['longitude'], \
 | |
|                 maps_json['zoom']
 | |
|     return None, None, None
 | |
| 
 | |
| 
 | |
| def get_map_links_from_post_content(content: str, session) -> []:
 | |
|     """Returns a list of map links
 | |
|     """
 | |
|     osm_domain = 'openstreetmap.org'
 | |
|     sections = content.split('://')
 | |
|     map_links: list[str] = []
 | |
|     ctr = 0
 | |
|     for link_str in sections:
 | |
|         if ctr == 0:
 | |
|             ctr += 1
 | |
|             continue
 | |
|         url = link_str
 | |
|         if '"' in url:
 | |
|             url = url.split('"')[0]
 | |
|         if '<' in url:
 | |
|             url = url.split('<')[0]
 | |
|         if not url:
 | |
|             continue
 | |
| 
 | |
|         # complete the url
 | |
|         if 'http://' + url in content:
 | |
|             url = 'http://' + url
 | |
|         elif 'https://' + url in content:
 | |
|             url = 'https://' + url
 | |
| 
 | |
|         zoom, latitude, longitude = \
 | |
|             geocoords_from_map_link(url, osm_domain, session)
 | |
|         if not latitude:
 | |
|             continue
 | |
|         if not longitude:
 | |
|             continue
 | |
|         if not zoom:
 | |
|             continue
 | |
|         if url not in map_links:
 | |
|             map_links.append(url)
 | |
|         ctr += 1
 | |
| 
 | |
|     # https://en.wikipedia.org/wiki/Geo_URI_scheme
 | |
|     ctr = 0
 | |
|     sections = content.split('geo:')
 | |
|     for link_str in sections:
 | |
|         if ctr == 0:
 | |
|             ctr += 1
 | |
|             continue
 | |
|         if ',' not in link_str:
 | |
|             continue
 | |
|         coords_str = ''
 | |
|         for char in link_str:
 | |
|             if not char.isnumeric() and char not in (',', '-', '.'):
 | |
|                 break
 | |
|             coords_str += char
 | |
|         if ',' not in coords_str:
 | |
|             continue
 | |
|         coord_sections = coords_str.split(',')
 | |
|         if len(coord_sections) < 2:
 | |
|             continue
 | |
|         if not is_float(coord_sections[0]) or \
 | |
|            not is_float(coord_sections[1]):
 | |
|             continue
 | |
|         url = 'geo:' + coords_str
 | |
|         map_links.append(url)
 | |
|         ctr += 1
 | |
| 
 | |
|     return map_links
 | |
| 
 | |
| 
 | |
| def add_tag_map_links(tag_maps_dir: str, tag_name: str,
 | |
|                       map_links: [], published: str, post_url: str) -> None:
 | |
|     """Appends to a hashtag file containing map links
 | |
|     This is used to show a map for a particular hashtag
 | |
|     """
 | |
|     tag_map_filename = tag_maps_dir + '/' + tag_name + '.txt'
 | |
|     post_url = post_url.replace('#', '/')
 | |
| 
 | |
|     # read the existing map links
 | |
|     existing_map_links: list[str] = []
 | |
|     if os.path.isfile(tag_map_filename):
 | |
|         try:
 | |
|             with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
 | |
|                 existing_map_links = fp_tag.read().split('\n')
 | |
|         except OSError:
 | |
|             print('EX: error reading tag map ' + tag_map_filename)
 | |
| 
 | |
|     # combine map links with the existing list
 | |
|     secs_since_epoch = \
 | |
|         int((date_from_string_format(published, ['%Y-%m-%dT%H:%M:%S%z']) -
 | |
|              date_epoch()).total_seconds())
 | |
|     links_changed = False
 | |
|     for link in map_links:
 | |
|         line = str(secs_since_epoch) + ' ' + link + ' ' + post_url
 | |
|         if line in existing_map_links:
 | |
|             continue
 | |
|         links_changed = True
 | |
|         existing_map_links = [line] + existing_map_links
 | |
|     if not links_changed:
 | |
|         return
 | |
| 
 | |
|     # sort the list of map links
 | |
|     existing_map_links.sort(reverse=True)
 | |
|     map_links_str = ''
 | |
|     ctr = 0
 | |
|     for link in existing_map_links:
 | |
|         if not link:
 | |
|             continue
 | |
|         map_links_str += link + '\n'
 | |
|         ctr += 1
 | |
|         # don't allow the list to grow indefinitely
 | |
|         if ctr >= 2000:
 | |
|             break
 | |
| 
 | |
|     # save the tag
 | |
|     try:
 | |
|         with open(tag_map_filename, 'w+', encoding='utf-8') as fp_tag:
 | |
|             fp_tag.write(map_links_str)
 | |
|     except OSError:
 | |
|         print('EX: error writing tag map ' + tag_map_filename)
 | |
| 
 | |
| 
 | |
| def _gpx_location(latitude: float, longitude: float, post_id: str) -> str:
 | |
|     """Returns a gpx waypoint
 | |
|     """
 | |
|     map_str = '<wpt lat="' + str(latitude) + \
 | |
|         '" lon="' + str(longitude) + '">\n'
 | |
|     map_str += '  <name>' + post_id + '</name>\n'
 | |
|     map_str += '  <link href="' + post_id + '"/>\n'
 | |
|     map_str += '</wpt>\n'
 | |
|     return map_str
 | |
| 
 | |
| 
 | |
| def _kml_location(place_ctr: int,
 | |
|                   latitude: float, longitude: float, post_id: str) -> str:
 | |
|     """Returns a kml placemark
 | |
|     """
 | |
|     map_str = '<Placemark id="' + str(place_ctr) + '">\n'
 | |
|     map_str += '  <name>' + str(place_ctr) + '</name>\n'
 | |
|     map_str += '  <description><![CDATA[\n'
 | |
|     map_str += '<a href="' + post_id + '">' + \
 | |
|         post_id + '</a>\n]]>\n'
 | |
|     map_str += '  </description>\n'
 | |
|     map_str += '  <Point>\n'
 | |
|     map_str += '    <coordinates>' + str(longitude) + ',' + \
 | |
|         str(latitude) + ',0</coordinates>\n'
 | |
|     map_str += '  </Point>\n'
 | |
|     map_str += '</Placemark>\n'
 | |
|     return map_str
 | |
| 
 | |
| 
 | |
| def _hashtag_map_to_format(base_dir: str, tag_name: str,
 | |
|                            start_hours_since_epoch: int,
 | |
|                            end_hours_since_epoch: int,
 | |
|                            nickname: str, domain: str,
 | |
|                            map_format: str, session) -> str:
 | |
|     """Returns the KML/GPX for a given hashtag between the given times
 | |
|     """
 | |
|     place_ctr = 0
 | |
|     osm_domain = 'openstreetmap.org'
 | |
|     tag_map_filename = base_dir + '/tagmaps/' + tag_name + '.txt'
 | |
| 
 | |
|     if map_format == 'gpx':
 | |
|         map_str = '<?xml version="1.0" encoding="UTF-8"?>\n'
 | |
|         map_str += '<gpx version="1.0">\n'
 | |
|     else:
 | |
|         map_str = '<?xml version="1.0" encoding="UTF-8"?>\n'
 | |
|         map_str += '<kml xmlns="http://www.opengis.net/kml/2.2">\n'
 | |
|         map_str += '<Document>\n'
 | |
| 
 | |
|     if os.path.isfile(tag_map_filename):
 | |
|         map_links: list[str] = []
 | |
|         try:
 | |
|             with open(tag_map_filename, 'r', encoding='utf-8') as fp_tag:
 | |
|                 map_links = fp_tag.read().split('\n')
 | |
|         except OSError:
 | |
|             print('EX: unable to read tag map links ' + tag_map_filename)
 | |
|         if map_links:
 | |
|             start_secs_since_epoch = int(start_hours_since_epoch * 60 * 60)
 | |
|             end_secs_since_epoch = int(end_hours_since_epoch * 60 * 60)
 | |
|             for link_line in map_links:
 | |
|                 link_line = link_line.strip().split(' ')
 | |
|                 if len(link_line) < 3:
 | |
|                     continue
 | |
|                 # is this geocoordinate within the time range?
 | |
|                 secs_since_epoch = int(link_line[0])
 | |
|                 if secs_since_epoch < start_secs_since_epoch or \
 | |
|                    secs_since_epoch > end_secs_since_epoch:
 | |
|                     continue
 | |
|                 # get the geocoordinates from the map link
 | |
|                 map_link = link_line[1]
 | |
|                 zoom, latitude, longitude = \
 | |
|                     geocoords_from_map_link(map_link, osm_domain, session)
 | |
|                 if not zoom:
 | |
|                     continue
 | |
|                 if not latitude:
 | |
|                     continue
 | |
|                 if not longitude:
 | |
|                     continue
 | |
|                 post_id = link_line[2]
 | |
|                 # check if the post is muted, and exclude the
 | |
|                 # geolocation if it is
 | |
|                 if nickname:
 | |
|                     post_filename = \
 | |
|                         locate_post(base_dir, nickname, domain, post_id)
 | |
|                     if post_filename:
 | |
|                         if os.path.isfile(post_filename + '.muted'):
 | |
|                             continue
 | |
|                 place_ctr += 1
 | |
|                 if map_format == 'gpx':
 | |
|                     map_str += _gpx_location(latitude, longitude, post_id)
 | |
|                 else:
 | |
|                     map_str += \
 | |
|                         _kml_location(place_ctr, latitude, longitude, post_id)
 | |
| 
 | |
|     if map_format == 'gpx':
 | |
|         map_str += '</gpx>'
 | |
|     else:
 | |
|         map_str += '</Document>\n'
 | |
|         map_str += '</kml>'
 | |
|     if place_ctr == 0:
 | |
|         return None
 | |
|     return map_str
 | |
| 
 | |
| 
 | |
| def _hashtag_map_within_hours(base_dir: str, tag_name: str,
 | |
|                               hours: int, map_format: str,
 | |
|                               nickname: str, domain: str,
 | |
|                               session) -> str:
 | |
|     """Returns gpx/kml for a hashtag containing maps for the
 | |
|     last number of hours
 | |
|     """
 | |
|     secs_since_epoch = \
 | |
|         int((date_utcnow() -
 | |
|              date_epoch()).total_seconds())
 | |
|     curr_hours_since_epoch = int(secs_since_epoch / (60 * 60))
 | |
|     start_hours_since_epoch = curr_hours_since_epoch - abs(hours)
 | |
|     end_hours_since_epoch = curr_hours_since_epoch + 2
 | |
|     map_str = \
 | |
|         _hashtag_map_to_format(base_dir, tag_name,
 | |
|                                start_hours_since_epoch,
 | |
|                                end_hours_since_epoch,
 | |
|                                nickname, domain, map_format,
 | |
|                                session)
 | |
|     return map_str
 | |
| 
 | |
| 
 | |
| def _get_tagmaps_time_periods() -> {}:
 | |
|     """dict of time periods for map display
 | |
|     """
 | |
|     return {
 | |
|         "Last hour": -1,
 | |
|         "Last 3 hours": -3,
 | |
|         "Last 6 hours": -6,
 | |
|         "Last 12 hours": -12,
 | |
|         "Last day": -24,
 | |
|         "Last 2 days": -48,
 | |
|         "Last week": -24 * 7,
 | |
|         "Last 2 weeks": -24 * 7 * 2,
 | |
|         "Last month": -24 * 7 * 4,
 | |
|         "Last 6 months": -24 * 7 * 4 * 6,
 | |
|         "Last year": -24 * 7 * 4 * 12
 | |
|     }
 | |
| 
 | |
| 
 | |
| def map_format_from_tagmaps_path(base_dir: str, path: str,
 | |
|                                  map_format: str,
 | |
|                                  domain: str, session) -> str:
 | |
|     """Returns gpx/kml for a given tagmaps path
 | |
|     /tagmaps/tagname-time_period
 | |
|     """
 | |
|     if '/tagmaps/' not in path:
 | |
|         return None
 | |
|     time_period = _get_tagmaps_time_periods()
 | |
|     tag_name = path.split('/tagmaps/')[1]
 | |
|     if '-' in tag_name:
 | |
|         tag_name = tag_name.split('-')[0]
 | |
|     if not tag_name:
 | |
|         return None
 | |
|     for period_str, hours in time_period.items():
 | |
|         period_str2 = period_str.replace('Last ', '').lower()
 | |
|         endpoint_str = \
 | |
|             '/tagmaps/' + tag_name + '-' + period_str2.replace(' ', '_')
 | |
|         if path != endpoint_str:
 | |
|             continue
 | |
|         nickname = None
 | |
|         if '/users/' in path:
 | |
|             nickname = path.split('/users/')[1]
 | |
|             if '/' in nickname:
 | |
|                 nickname = nickname.split('/')[0]
 | |
|         return _hashtag_map_within_hours(base_dir, tag_name,
 | |
|                                          hours, map_format,
 | |
|                                          nickname, domain,
 | |
|                                          session)
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def html_hashtag_maps(base_dir: str, tag_name: str,
 | |
|                       translate: {}, map_format: str,
 | |
|                       nickname: str, domain: str,
 | |
|                       session, ua_str: str) -> str:
 | |
|     """Returns html for maps associated with a hashtag
 | |
|     """
 | |
|     tag_map_filename = base_dir + '/tagmaps/' + tag_name + '.txt'
 | |
|     if not os.path.isfile(tag_map_filename):
 | |
|         return ''
 | |
| 
 | |
|     time_period = _get_tagmaps_time_periods()
 | |
| 
 | |
|     html_str = ''
 | |
|     map_str = None
 | |
|     ua_str_lower = ua_str.lower()
 | |
|     for period_str, hours in time_period.items():
 | |
|         new_map_str = \
 | |
|             _hashtag_map_within_hours(base_dir, tag_name, hours,
 | |
|                                       map_format, nickname, domain,
 | |
|                                       session)
 | |
|         if not new_map_str:
 | |
|             continue
 | |
|         if new_map_str == map_str:
 | |
|             continue
 | |
|         map_str = new_map_str
 | |
|         period_str2 = period_str.replace('Last ', '').lower()
 | |
|         tag_name_str = tag_name + '-' + period_str2.replace(' ', '_')
 | |
|         endpoint_str = '/tagmaps/' + tag_name_str
 | |
|         if html_str:
 | |
|             html_str += ' '
 | |
|         description = period_str
 | |
|         if translate.get(period_str):
 | |
|             description = translate[period_str]
 | |
|         if browser_supports_download_filename(ua_str_lower):
 | |
|             html_str += '<a href="' + endpoint_str + \
 | |
|                 '" download="' + tag_name_str + '.kml">' + \
 | |
|                 description + '</a>'
 | |
|         else:
 | |
|             # NOTE: don't use download="preferredfilename" which is
 | |
|             # unsupported by some browsers
 | |
|             html_str += '<a href="' + endpoint_str + '" download>' + \
 | |
|                 description + '</a>'
 | |
|     if html_str:
 | |
|         html_str = '📌 ' + html_str
 | |
|     return html_str
 |