Merge pull request 'correct text-array to download script' (#6) from matthias/mapillary_download:bugfix/download-array into tykayn/batch-get-mapillary-sequences

Reviewed-on: https://forge.chapril.org/tykayn/mapillary_download/pulls/6
This commit is contained in:
tykayn 2024-10-26 23:24:17 +02:00
commit 4258a8b84b
8 changed files with 398 additions and 189 deletions

7
batch_get_username.sh Normal file → Executable file
View File

@ -5,7 +5,12 @@
# Liste des usernames # Liste des usernames
# example: # example:
# usernames=( "riri" "fifi" "loulou") # usernames=( "riri" "fifi" "loulou")
usernames=( "someone_having_nice_pictures" "someone_else" "oh_look_a_these_usernames" ) usernames=( )
if test -z $usernames; then
read -p "Please enter a mapillary username: " ANS
usernames=$ANS
fi
# check env variables are valid # check env variables are valid
if [ -f "secrets_variables.sh" ]; then if [ -f "secrets_variables.sh" ]; then

View File

@ -12,9 +12,19 @@ import shutil
import exifread import exifread
# Définition du rectangle entourant la France métropolitaine et un peu autour # Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max) france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la France métropolitaine et un peu autour # Définition du rectangle entourant la France métropolitaine et un peu autour
france_bbox: tuple[float, float, float, float] = (42.0, -5.0, 51.0, 10.0) # (lat_min, lon_min, lat_max, lon_max) france_bbox: tuple[float, float, float, float] = (
42.0,
-5.0,
51.0,
10.0,
) # (lat_min, lon_min, lat_max, lon_max)
# Définition du rectangle entourant la Guadeloupe # Définition du rectangle entourant la Guadeloupe
guadeloupe_bbox: tuple[float, float, float, float] = (15.8, -61.8, 17.3, -59.3) guadeloupe_bbox: tuple[float, float, float, float] = (15.8, -61.8, 17.3, -59.3)
@ -50,11 +60,11 @@ pf_bbox: tuple[float, float, float, float] = (-27.5, -140.0, -7.5, -134.0)
taaf_bbox: tuple[float, float, float, float] = (-49.5, 68.5, -37.5, 77.5) taaf_bbox: tuple[float, float, float, float] = (-49.5, 68.5, -37.5, 77.5)
# Chemin du répertoire source # Chemin du répertoire source
source_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/' source_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/"
# Chemin du répertoire destination # Chemin du répertoire destination
destination_dir: str = '/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/' destination_dir: str = "/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/"
sequence_folder: str = 'principale_sequence' sequence_folder: str = "principale_sequence"
count_files_all: int = 0 count_files_all: int = 0
count_files_moved: int = 0 count_files_moved: int = 0
# Crée le répertoire destination si il n'existe pas # Crée le répertoire destination si il n'existe pas
@ -71,18 +81,19 @@ def move_file_if_in_france(filepath, sequence_folder):
latitude, longitude = get_gps_info(filepath) latitude, longitude = get_gps_info(filepath)
if latitude and longitude: if latitude and longitude:
print(f'Latitude: {latitude}, Longitude: {longitude}') print(f"Latitude: {latitude}, Longitude: {longitude}")
if are_lat_lon_in_france(latitude, longitude): if are_lat_lon_in_france(latitude, longitude):
move_file_in_destination(filepath, sequence_folder) move_file_in_destination(filepath, sequence_folder)
else: else:
print('Informations GPS non trouvées') print("Informations GPS non trouvées")
def move_file_in_destination(filepath, sequence_folder): def move_file_in_destination(filepath, sequence_folder):
global count_files_moved global count_files_moved
# Déplace le fichier dans le sous-répertoire "photos_in_france" # Déplace le fichier dans le sous-répertoire "photos_in_france"
dest_subdir = os.path.join(destination_dir, sequence_folder, dest_subdir = os.path.join(
os.path.basename(os.path.dirname(filepath))) destination_dir, sequence_folder, os.path.basename(os.path.dirname(filepath))
)
if not os.path.exists(dest_subdir): if not os.path.exists(dest_subdir):
os.makedirs(dest_subdir) os.makedirs(dest_subdir)
shutil.move(filepath, os.path.join(dest_subdir, filepath)) shutil.move(filepath, os.path.join(dest_subdir, filepath))
@ -90,6 +101,7 @@ def move_file_in_destination(filepath, sequence_folder):
print(f"Moved {filepath} to {dest_subdir}") print(f"Moved {filepath} to {dest_subdir}")
return True return True
def are_lat_lon_in_france(gps_lat, gps_lon): def are_lat_lon_in_france(gps_lat, gps_lon):
""" """
recherche d'une zone du territoire français recherche d'une zone du territoire français
@ -117,37 +129,63 @@ def are_lat_lon_in_france(gps_lat, gps_lon):
print("lat lon :", gps_lat, gps_lon) print("lat lon :", gps_lat, gps_lon)
if (france_bbox[0] <= gps_lat <= france_bbox[2] and france_bbox[1] <= gps_lon <= france_bbox[3]): if (
france_bbox[0] <= gps_lat <= france_bbox[2]
and france_bbox[1] <= gps_lon <= france_bbox[3]
):
return "France métropolitaine" return "France métropolitaine"
elif (taaf_bbox[0] <= gps_lat <= taaf_bbox[2] and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]): elif (
taaf_bbox[0] <= gps_lat <= taaf_bbox[2]
and taaf_bbox[1] <= gps_lon <= taaf_bbox[3]
):
return "Terres australes et antarctiques françaises" return "Terres australes et antarctiques françaises"
elif (guyane_bbox[0] <= gps_lat <= guyane_bbox[2] and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]): elif (
guyane_bbox[0] <= gps_lat <= guyane_bbox[2]
and guyane_bbox[1] <= gps_lon <= guyane_bbox[3]
):
return "Guyane française" return "Guyane française"
elif (reunion_bbox[0] <= gps_lat <= reunion_bbox[2] and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]): elif (
reunion_bbox[0] <= gps_lat <= reunion_bbox[2]
and reunion_bbox[1] <= gps_lon <= reunion_bbox[3]
):
return "La Réunion" return "La Réunion"
elif (wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]): elif wf_bbox[0] <= gps_lat <= wf_bbox[2] and wf_bbox[1] <= gps_lon <= wf_bbox[3]:
return "Wallis-et-Futuna" return "Wallis-et-Futuna"
elif (stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2] and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]): elif (
stm_sbh_bbox[0] <= gps_lat <= stm_sbh_bbox[2]
and stm_sbh_bbox[1] <= gps_lon <= stm_sbh_bbox[3]
):
return "Saint-Martin et Saint-Barthélemy" return "Saint-Martin et Saint-Barthélemy"
elif (spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]): elif (
spm_bbox[0] <= gps_lat <= spm_bbox[2] and spm_bbox[1] <= gps_lon <= spm_bbox[3]
):
return "Saint-Pierre-et-Miquelon" return "Saint-Pierre-et-Miquelon"
elif (mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2] and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]): elif (
mayotte_bbox[0] <= gps_lat <= mayotte_bbox[2]
and mayotte_bbox[1] <= gps_lon <= mayotte_bbox[3]
):
return "Mayotte" return "Mayotte"
elif (martinique_bbox[0] <= gps_lat <= martinique_bbox[2] and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]): elif (
martinique_bbox[0] <= gps_lat <= martinique_bbox[2]
and martinique_bbox[1] <= gps_lon <= martinique_bbox[3]
):
return "Martinique" return "Martinique"
elif (guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2] and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]): elif (
guadeloupe_bbox[0] <= gps_lat <= guadeloupe_bbox[2]
and guadeloupe_bbox[1] <= gps_lon <= guadeloupe_bbox[3]
):
return "Guadeloupe" return "Guadeloupe"
elif (pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]): elif pf_bbox[0] <= gps_lat <= pf_bbox[2] and pf_bbox[1] <= gps_lon <= pf_bbox[3]:
return "Polynésie française" return "Polynésie française"
elif (nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]): elif nc_bbox[0] <= gps_lat <= nc_bbox[2] and nc_bbox[1] <= gps_lon <= nc_bbox[3]:
return "Nouvelle-Calédonie" return "Nouvelle-Calédonie"
else: else:
return None # "Hors de France" return None # "Hors de France"
def get_gps_info(filepath): def get_gps_info(filepath):
with open(filepath, 'rb') as f: with open(filepath, "rb") as f:
tags = exifread.process_file(f) tags = exifread.process_file(f)
gps_info = {} gps_info = {}
@ -155,12 +193,12 @@ def get_gps_info(filepath):
# print("clés exif ", tags.keys()) # print("clés exif ", tags.keys())
for tag in tags.keys(): for tag in tags.keys():
if tag.startswith('GPS'): if tag.startswith("GPS"):
gps_info[tag] = tags[tag] gps_info[tag] = tags[tag]
# Extraction des informations de latitude et de longitude # Extraction des informations de latitude et de longitude
gps_latitude = convert_rational_to_float(gps_info.get('GPS GPSLatitude')) gps_latitude = convert_rational_to_float(gps_info.get("GPS GPSLatitude"))
gps_longitude = convert_rational_to_float(gps_info.get('GPS GPSLongitude')) gps_longitude = convert_rational_to_float(gps_info.get("GPS GPSLongitude"))
if gps_latitude and gps_longitude: if gps_latitude and gps_longitude:
return gps_latitude, gps_longitude return gps_latitude, gps_longitude
@ -172,22 +210,38 @@ def convert_rational_to_float(rational):
return float(rational.values[0].num) / float(rational.values[0].den) return float(rational.values[0].num) / float(rational.values[0].den)
if __name__ == "__main__":
if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--source_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/', help='Chemin du répertoire source') parser.add_argument(
parser.add_argument('--destination_dir', default='/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/', help='Chemin du répertoire destination') "--source_dir",
parser.add_argument('--sequence_folder', default='principale_sequence', help='Nom du dossier de séquence') default="/home/cipherbliss/Téléchargements/FIBRELAND/TEST_IN_FR/",
help="Chemin du répertoire source",
)
parser.add_argument(
"--destination_dir",
default="/home/cipherbliss/Téléchargements/FIBRELAND/IN_FRANCE/",
help="Chemin du répertoire destination",
)
parser.add_argument(
"--sequence_folder",
default="principale_sequence",
help="Nom du dossier de séquence",
)
args = parser.parse_args() args = parser.parse_args()
# Parcourt tous les fichiers dans le répertoire source et ses sous-répertoires # Parcourt tous les fichiers dans le répertoire source et ses sous-répertoires
for root, dirs, files in os.walk(args.source_dir): for root, dirs, files in os.walk(args.source_dir):
for filename in files: for filename in files:
# Vérifie si le fichier est une image # Vérifie si le fichier est une image
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tif')): if filename.lower().endswith(
(".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tif")
):
filepath = os.path.join(root, filename) filepath = os.path.join(root, filename)
move_file_if_in_france(filepath, sequence_folder) move_file_if_in_france(filepath, sequence_folder)
print('fichiers se situant en france déplacés: ', count_files_moved, ' / ', count_files_all) print(
"fichiers se situant en france déplacés: ",
count_files_moved,
" / ",
count_files_all,
)

View File

@ -4,12 +4,12 @@ echo "Prenez un token oauth sur https://www.mapillary.com/app/user/$1"
USERNAME=$1 USERNAME=$1
response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers') response=$(curl "https://graph.mapillary.com/graphql?doc=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20%20%20%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getNewSequences(%24username%3A%20String!)%20%7B%0A%20%20user_by_username(username%3A%20%24username)%20%7B%0A%20%20%20%20id%0A%20%20%20%20new_sequences%20%7B%0A%20%20%20%20%20%20sequence_keys%0A%20%20%20%20%20%20geojson%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getNewSequences&variables=%7B%22username%22%3A%22${USERNAME}%22%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H "authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD" -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS)
ID=$(echo "$response" | jq -r '.data.user_by_username.id') ID=$(echo "$response" | jq -r '.data.user_by_username.id')
echo "ID: $ID" echo "ID: $ID"
curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' > "out_${1}.json" curl "https://graph.mapillary.com/graphql?doc=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20%20%20%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20%20%20%20%20id%0A%20%20%20%20%20%20%20%20feed(first%3A%20%24first%2C%20after%3A%20%24after%2C%20hide_failed_sequences_after_days%3A%20%24hide_after)%20%7B%0A%20%20%20%20%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20cluster_id%20type%20created_at_seconds%20captured_at_seconds%20thumb_url%20item_count%20image_id%20status%20initial_processing_status%20anonymization_status%20tiler_status%20error_code%20timezone%0A%20%20%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D&query=query%20getLatestActivity(%24id%3A%20ID!%2C%20%24first%3A%20Int%2C%20%24after%3A%20ID%2C%20%24hide_after%3A%20Int)%20%7B%0A%20%20fetch__User(id%3A%20%24id)%20%7B%0A%20%20%20%20id%0A%20%20%20%20feed(%0A%20%20%20%20%20%20first%3A%20%24first%0A%20%20%20%20%20%20after%3A%20%24after%0A%20%20%20%20%20%20hide_failed_sequences_after_days%3A%20%24hide_after%0A%20%20%20%20)%20%7B%0A%20%20%20%20%20%20page_info%20%7B%0A%20%20%20%20%20%20%20%20start_cursor%0A%20%20%20%20%20%20%20%20end_cursor%0A%20%20%20%20%20%20%20%20has_next_page%0A%20%20%20%20%20%20%20%20has_previous_page%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20nodes%20%7B%0A%20%20%20%20%20%20%20%20cluster_id%0A%20%20%20%20%20%20%20%20type%0A%20%20%20%20%20%20%20%20created_at_seconds%0A%20%20%20%20%20%20%20%20captured_at_seconds%0A%20%20%20%20%20%20%20%20thumb_url%0A%20%20%20%20%20%20%20%20item_count%0A%20%20%20%20%20%20%20%20image_id%0A%20%20%20%20%20%20%20%20status%0A%20%20%20%20%20%20%20%20initial_processing_status%0A%20%20%20%20%20%20%20%20anonymization_status%0A%20%20%20%20%20%20%20%20tiler_status%0A%20%20%20%20%20%20%20%20error_code%0A%20%20%20%20%20%20%20%20timezone%0A%20%20%20%20%20%20%20%20__typename%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20__typename%0A%20%20%20%20%7D%0A%20%20%20%20__typename%0A%20%20%7D%0A%20%20__typename%0A%7D&operationName=getLatestActivity&variables=%7B%22id%22%3A%22${ID}%22%2C%22first%22%3A10000%2C%22after%22%3Anull%2C%22hide_after%22%3A14%7D" --compressed -H 'User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:127.0) Gecko/20100101 Firefox/127.0' -H 'Accept: */*' -H 'Accept-Language: fr,en-US;q=0.7,en;q=0.3' -H 'Accept-Encoding: gzip, deflate, br, zstd' -H 'Referer: https://www.mapillary.com/' -H 'content-type: application/json' -H 'authorization: OAuth MLYARA3tSkHGXL0kEKYPx49q2BjzoZCfpZAl9HO7R8YdUKT99yMZB2pJxPzkSd3khd6C1ZBPgrUYZCE3wlsCG3ZC4UNn4RuJZChbIzmRfiE10ZA7eX06KGEhUb9yIA8HZBzyKg2PRlflG3h0pwZDZD' -H 'Origin: https://www.mapillary.com' -H 'Connection: keep-alive' -H 'Sec-Fetch-Dest: empty' -H 'Sec-Fetch-Mode: cors' -H 'Sec-Fetch-Site: same-site' -H 'Sec-GPC: 1' -H 'Priority: u=4' -H 'TE: trailers' -sS > "out_${1}.json"
echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'" echo " lancez: python3 get_sequences_of_username.py --username=\"$1\" --dev_token='$MAPILLARY_DEV_TOKEN' --max_sequence=99999; bash text_array_to_download_script.py --username=$1 --dev_token='$MAPILLARY_DEV_TOKEN'"

View File

@ -1,30 +1,38 @@
import json import json
import requests import requests
# lit un json listant les id de photo de chaque séquence et va # lit un json listant les id de photo de chaque séquence et va
# chercher la séquence par API. # chercher la séquence par API.
import argparse import argparse
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--username', type=str, help='Username to get the sequences id of', required=True) parser.add_argument(
parser.add_argument('--dev_token', type=str, help='Your mapillary developer token') "--username",
parser.add_argument('--max_sequence', type=str, help='Limit the amount of retrieved sequence ids') type=str,
help="Username to get the sequences id of",
required=True,
)
parser.add_argument("--dev_token", type=str, help="Your mapillary developer token")
parser.add_argument(
"--max_sequence", type=str, help="Limit the amount of retrieved sequence ids"
)
global args global args
args = parser.parse_args(argv) args = parser.parse_args(argv)
print(args) print(args)
# Initialisation de la liste pour stocker les réponses # Initialisation de la liste pour stocker les réponses
responses = [] responses = []
sequences = [] sequences = []
def get_image_data_from_sequences(): def get_image_data_from_sequences():
username = args.username username = args.username
input_file = "out_"+username+".json" input_file = "out_" + username + ".json"
# Chargement du fichier JSON d'entrée # Chargement du fichier JSON d'entrée
with open(input_file, "r") as file: with open(input_file, "r") as file:
@ -32,7 +40,7 @@ def get_image_data_from_sequences():
# Itération sur les noeuds pour collectionner les image_ids # Itération sur les noeuds pour collectionner les image_ids
nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"] nodelist = input_data["data"]["fetch__User"]["feed"]["nodes"]
print( 'séquences : ', len(nodelist)) print("séquences : ", len(nodelist))
image_ids = [node["image_id"] for node in nodelist] image_ids = [node["image_id"] for node in nodelist]
print(image_ids) print(image_ids)
@ -41,15 +49,21 @@ def get_image_data_from_sequences():
# Préparation de la tête d'autorisation pour toutes les futures requêtes # Préparation de la tête d'autorisation pour toutes les futures requêtes
header = {"Access-Token": dev_token} header = {"Access-Token": dev_token}
ii=0 ii = 0
limit_requests = 1000000000 limit_requests = 1000000000
# limit_requests = 5 # pour tester # limit_requests = 5 # pour tester
# Boucle sur chaque image_id pour interroger l'API Mapillary # Boucle sur chaque image_id pour interroger l'API Mapillary
for image_id in image_ids: for image_id in image_ids:
ii+=1 ii += 1
if limit_requests >= ii and image_id: if limit_requests >= ii and image_id:
params = {"id": image_id, "fields": "id,sequence"} params = {"id": image_id, "fields": "id,sequence"}
request_url = "https://graph.mapillary.com/" + str(image_id)+"?access_token="+dev_token+"&fields=id,sequence" request_url = (
"https://graph.mapillary.com/"
+ str(image_id)
+ "?access_token="
+ dev_token
+ "&fields=id,sequence"
)
# print("requete: "+request_url) # print("requete: "+request_url)
response = requests.get(request_url) response = requests.get(request_url)
@ -63,23 +77,31 @@ def get_image_data_from_sequences():
parsed_response["sequence"] = raw_response["sequence"] parsed_response["sequence"] = raw_response["sequence"]
sequences.append(parsed_response["sequence"]) sequences.append(parsed_response["sequence"])
print("séquence trouvée: "+str(ii)+"/"+args.max_sequence+" : "+raw_response["sequence"]) print(
"séquence trouvée: "
+ str(ii)
+ "/"
+ args.max_sequence
+ " : "
+ raw_response["sequence"]
)
else: else:
print(response) print(response)
responses.append(parsed_response) responses.append(parsed_response)
def persist_files(): def persist_files():
# Sauvegarde des nouveaux résultats dans le fichier output.json # Sauvegarde des nouveaux résultats dans le fichier output.json
output_file = "sequences_"+args.username+".json" output_file = "sequences_" + args.username + ".json"
with open(output_file, "w") as file: with open(output_file, "w") as file:
json.dump(responses, file) json.dump(responses, file)
sequence_filename = "sequences_"+args.username+".txt" sequence_filename = "sequences_" + args.username + ".txt"
with open(sequence_filename, "w") as file: with open(sequence_filename, "w") as file:
json.dump(sequences, file) json.dump(sequences, file)
print('fichier sauvegardé: '+sequence_filename) print("fichier sauvegardé: " + sequence_filename)
parse_args() parse_args()

View File

@ -1,29 +1,42 @@
import requests, json import os, requests, json
import argparse import argparse
from urllib.parse import quote from urllib.parse import quote
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--access_token', type=str, help='Your mapillary access token') parser.add_argument(
parser.add_argument('--username', type=str, help='Username to get the sequences id of') "--access_token",
parser.add_argument('--pictures', type=str, help='Limit of pictures to fetch') type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
parser.add_argument(
"--pictures",
type=str,
default=500,
help="Limit of pictures to fetch, max=5000",
)
global args global args
args = parser.parse_args(argv) args = parser.parse_args(argv)
if __name__ == '__main__': if __name__ == "__main__":
parse_args() parse_args()
if args.access_token == None:
print('please provide the access_token')
exit()
mly_key = args.access_token mly_key = args.access_token
creator_username = args.username creator_username = args.username
max_img= args.pictures max_img = args.pictures
url = f'https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence' url = f"https://graph.mapillary.com/images?access_token={mly_key}&creator_username={creator_username}&limit={max_img}&fields=id,sequence"
print(url)
response = requests.get(url) response = requests.get(url)
@ -31,7 +44,7 @@ if __name__ == '__main__':
json = response.json() json = response.json()
# tri des séquences uniques # tri des séquences uniques
sequences_ids = [obj['sequence'] for obj in json['data']] sequences_ids = [obj["sequence"] for obj in json["data"]]
unique_ids = list(set(sequences_ids)) unique_ids = list(set(sequences_ids))
print(unique_ids) print(unique_ids)
else: else:

View File

@ -14,35 +14,67 @@ session = requests.Session()
retries_strategies = Retry( retries_strategies = Retry(
total=5, total=5,
backoff_factor=1, backoff_factor=1,
status_forcelist=[429,502, 503, 504], status_forcelist=[429, 502, 503, 504],
) )
session.mount('https://', HTTPAdapter(max_retries=retries_strategies)) session.mount("https://", HTTPAdapter(max_retries=retries_strategies))
def parse_args(argv =None):
def parse_args(argv=None):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('access_token', type=str, help='Your mapillary access token') parser.add_argument("access_token", type=str, help="Your mapillary access token")
parser.add_argument('--sequence_ids', type=str, nargs='*', help='The mapillary sequence id(s) to download') parser.add_argument(
parser.add_argument('--image_ids', type=int, nargs='*', help='The mapillary image id(s) to get their sequence id(s)') "--sequence_ids",
parser.add_argument('--destination', type=str, default='data', help='Path destination for the images') type=str,
parser.add_argument('--image_limit', type=int, default=None, help='How many images you want to download') nargs="*",
parser.add_argument('--overwrite', default=False, action='store_true', help='overwrite existing images') help="The mapillary sequence id(s) to download",
)
parser.add_argument(
"--image_ids",
type=int,
nargs="*",
help="The mapillary image id(s) to get their sequence id(s)",
)
parser.add_argument(
"--destination",
type=str,
default="data",
help="Path destination for the images",
)
parser.add_argument(
"--image_limit",
type=int,
default=None,
help="How many images you want to download",
)
parser.add_argument(
"--overwrite",
default=False,
action="store_true",
help="overwrite existing images",
)
parser.add_argument("-v", "--version", action="version", version="release 1.6") parser.add_argument("-v", "--version", action="version", version="release 1.6")
args = parser.parse_args(argv) args = parser.parse_args(argv)
if args.sequence_ids is None and args.image_ids is None: if args.sequence_ids is None and args.image_ids is None:
parser.error("Please enter at least one sequence id or image id") parser.error("Please enter at least one sequence id or image id")
return args return args
def download(url, filepath, metadata=None): def download(url, filepath, metadata=None):
#print(asizeof.asizeof(image)/1024, "MB") # print(asizeof.asizeof(image)/1024, "MB")
with open(str(filepath), "wb") as f: with open(str(filepath), "wb") as f:
r = session.get(url, stream=True, timeout=6) r = session.get(url, stream=True, timeout=6)
try:
image = write_exif(r.content, metadata) image = write_exif(r.content, metadata)
except Exception as e:
print(f"FAILED to write exif data for {filepath}. Error: {e}")
f.write(image) f.write(image)
print("{} downloaded".format(filepath)) print("{} downloaded {}".format(filepath, r))
def get_single_image_data(image_id, mly_header): def get_single_image_data(image_id, mly_header):
req_url = 'https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence'.format(image_id) req_url = "https://graph.mapillary.com/{}?fields=creator,thumb_original_url,altitude,make,model,camera_type,captured_at,compass_angle,geometry,exif_orientation,sequence".format(
image_id
)
r = session.get(req_url, headers=mly_header) r = session.get(req_url, headers=mly_header)
data = r.json() data = r.json()
print(data) print(data)
@ -50,52 +82,66 @@ def get_single_image_data(image_id, mly_header):
def get_image_data_from_sequences(sequences_id, mly_header): def get_image_data_from_sequences(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id): for i, sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id) url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header) r = requests.get(url, headers=header)
data = r.json() data = r.json()
image_ids = data['data'] image_ids = data["data"]
total_image = len(image_ids) total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id)) print(
print('getting images data') "{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
for x in range(0, total_image): for x in range(0, total_image):
image_id = image_ids[x]['id'] image_id = image_ids[x]["id"]
image_data = get_single_image_data(image_id, mly_header) image_data = get_single_image_data(image_id, mly_header)
image_data['sequence_id'] = sequence_id image_data["sequence_id"] = sequence_id
yield image_data yield image_data
def get_image_data_from_sequences__future(sequences_id, mly_header): def get_image_data_from_sequences__future(sequences_id, mly_header):
for i,sequence_id in enumerate(sequences_id): for i, sequence_id in enumerate(sequences_id):
url = 'https://graph.mapillary.com/image_ids?sequence_id={}'.format(sequence_id) url = "https://graph.mapillary.com/image_ids?sequence_id={}".format(sequence_id)
r = requests.get(url, headers=header) r = requests.get(url, headers=header)
data = r.json() data = r.json()
if data.get('data') == []: if data.get("data") == []:
print("Empty or wrong sequence {} of {} - id : {}".format(i+1, len(sequences_id), sequence_id)) print(
"Empty or wrong sequence {} of {} - id : {}".format(
i + 1, len(sequences_id), sequence_id
)
)
continue continue
image_ids = data['data'] image_ids = data["data"]
total_image = len(image_ids) total_image = len(image_ids)
print("{} images in sequence {} of {} - id : {}".format(total_image, i+1, len(sequences_id), sequence_id)) print(
print('getting images data') "{} images in sequence {} of {} - id : {}".format(
total_image, i + 1, len(sequences_id), sequence_id
)
)
print("getting images data")
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {} future_to_url = {}
for x in range(0, total_image): for x in range(0, total_image):
image_id = image_ids[x]['id'] image_id = image_ids[x]["id"]
future_to_url[executor.submit(get_single_image_data, image_id, mly_header)] = image_id future_to_url[
executor.submit(get_single_image_data, image_id, mly_header)
] = image_id
for future in concurrent.futures.as_completed(future_to_url): for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future] url = future_to_url[future]
image_data = future.result() image_data = future.result()
image_data['sequence_id'] = sequence_id image_data["sequence_id"] = sequence_id
#print(image_data) # print(image_data)
yield image_data yield image_data
def write_exif(picture, img_metadata): def write_exif(picture, img_metadata):
''' """
Write exif metadata Write exif metadata
''' """
#{'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca', # {'thumb_original_url': 'https://scontent-cdg4-2.xx.fbcdn.net/m1/v/t6/An9Zy2SrH9vXJIF01QkBODyUbg7XSKfwL48UwHyvihSwvECGjVbG0vSw9uhxe2-Dq-k2eUcigb83buO6zo-7eVbykfp5aQIe1kgd-MJr66nU_H-o_mwBLZXgVbj5I_5WX-C9c6FxJruHkV962F228O0?ccb=10-5&oh=00_AfDOKD869DxL-4ZNCbVo8Rn29vsc0JyjMAU2ctx4aAFVMQ&oe=65256C25&_nc_sid=201bca',
# 'captured_at': 1603459736644, 'geometry': {'type': 'Point', 'coordinates': [2.5174596904057, 48.777089857534]}, 'id': '485924785946693'} # 'captured_at': 1603459736644, 'geometry': {'type': 'Point', 'coordinates': [2.5174596904057, 48.777089857534]}, 'id': '485924785946693'}
with writer.Writer(picture) as image: with writer.Writer(picture) as image:
@ -113,61 +159,84 @@ def write_exif(picture, img_metadata):
return updated_image return updated_image
if __name__ == '__main__': if __name__ == "__main__":
args = parse_args() args = parse_args()
sequence_ids= args.sequence_ids if args.sequence_ids is not None else [] sequence_ids = args.sequence_ids if args.sequence_ids is not None else []
images_ids = args.image_ids images_ids = args.image_ids
access_token = args.access_token access_token = args.access_token
images_data = [] images_data = []
header = {'Authorization' : 'OAuth {}'.format(access_token)} header = {"Authorization": "OAuth {}".format(access_token)}
if images_ids: if images_ids:
for image_id in images_ids: for image_id in images_ids:
image_data = get_single_image_data(image_id, header) image_data = get_single_image_data(image_id, header)
if 'error' in image_data: if "error" in image_data:
print("data : ", image_data) print("data : ", image_data)
print("something wrong happened ! Please check your image id and/or your connection") print(
"something wrong happened ! Please check your image id and/or your connection"
)
sys.exit() sys.exit()
else: else:
sequence_ids.append(image_data.get('sequence')) sequence_ids.append(image_data.get("sequence"))
#for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)): # for i,image_data in enumerate(get_image_data_from_sequences(sequence_ids, header)):
for i,image_data in enumerate(get_image_data_from_sequences__future(sequence_ids, header)): for i, image_data in enumerate(
get_image_data_from_sequences__future(sequence_ids, header)
):
if args.image_limit is not None and i >= args.image_limit: if args.image_limit is not None and i >= args.image_limit:
break break
if 'error' in image_data: if "error" in image_data:
print("data : ", image_data) print("data : ", image_data)
print("something wrong happened ! Please check your token and/or your connection") print(
"something wrong happened ! Please check your token and/or your connection"
)
sys.exit() sys.exit()
images_data.append(image_data) images_data.append(image_data)
#sys.exit() # sys.exit()
print('downloading.. this process will take a while. please wait') print("downloading.. this process will take a while. please wait")
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for i,image_data in enumerate(images_data): for i, image_data in enumerate(images_data):
# create a folder for each unique sequence ID to group images by sequence # create a folder for each unique sequence ID to group images by sequence
path_destination = os.path.join(args.destination, image_data['sequence_id']) path_destination = os.path.join(args.destination, image_data["sequence_id"])
if not os.path.exists(path_destination): if not os.path.exists(path_destination):
os.makedirs(path_destination) os.makedirs(path_destination)
date_time_image_filename = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000).strftime('%Y-%m-%d_%HH%Mmn%Ss%f')[:-3] + '.jpg' date_time_image_filename = (
datetime.utcfromtimestamp(
int(image_data["captured_at"]) / 1000
).strftime("%Y-%m-%d_%HH%Mmn%Ss%f")[:-3]
+ ".jpg"
)
path = os.path.join(path_destination, date_time_image_filename) path = os.path.join(path_destination, date_time_image_filename)
img_metadata = writer.PictureMetadata( img_metadata = writer.PictureMetadata(
capture_time = datetime.utcfromtimestamp(int(image_data['captured_at'])/1000), capture_time=datetime.utcfromtimestamp(
artist = image_data['creator']['username'], int(image_data["captured_at"]) / 1000
camera_make = image_data['make'], ),
camera_model = image_data['model'], artist=image_data["creator"]["username"],
longitude = image_data['geometry']['coordinates'][0], camera_make=image_data["make"],
latitude = image_data['geometry']['coordinates'][1], camera_model=image_data["model"],
picture_type = PictureType("equirectangular") if image_data.get('camera_type') == 'spherical' or image_data.get('camera_type') == 'equirectangular' else PictureType("flat"), longitude=image_data["geometry"]["coordinates"][0],
direction = image_data['compass_angle'], latitude=image_data["geometry"]["coordinates"][1],
altitude = image_data['altitude'], picture_type=(
PictureType("equirectangular")
if image_data.get("camera_type") == "spherical"
or image_data.get("camera_type") == "equirectangular"
else PictureType("flat")
),
direction=image_data["compass_angle"],
altitude=image_data["altitude"],
) )
#print("metadata: ", img_metadata) # print("metadata: ", img_metadata)
#print("path: ", image_data) # print("path: ", image_data)
image_exists = os.path.exists(path) image_exists = os.path.exists(path)
if not args.overwrite and image_exists: if not args.overwrite and image_exists:
print("{} already exists. Skipping ".format(path)) print("{} already exists. Skipping ".format(path))
continue continue
executor.submit(download, url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata) executor.submit(
#download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata) download,
url=image_data["thumb_original_url"],
filepath=path,
metadata=img_metadata,
)
# download(url=image_data['thumb_original_url'], filepath=path, metadata=img_metadata)

View File

@ -1,53 +1,61 @@
import os import os
input_file = 'input_file'
import argparse import argparse
def parse_args(argv =None):
input_file = "input_file"
def parse_args(argv=None):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--dev_token', type=str, help='Your mapillary access token') parser.add_argument(
parser.add_argument('--username', type=str, help='Username to get the sequences id of') "--dev_token",
type=str,
default=os.environ["MAPILLARY_DEV_TOKEN"],
help="Your mapillary access token",
)
parser.add_argument(
"--username",
type=str,
required=True,
help="Username to get the sequences id of",
)
global args global args
args = parser.parse_args(argv) args = parser.parse_args(argv)
if __name__ == "__main__":
if __name__ == '__main__': print(
print("Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)") "Construction du script bash de récupération des images de chaque séquences pour Mapillary_download (https://github.com/Stefal/mapillary_download.git)"
)
parse_args() parse_args()
username=args.username username = args.username
input_file = f"sequences_{username}.txt" input_file = f"sequences_{username}.txt"
if not args.dev_token:
print(f"Erreur : Le token de développeur de mapillary manque, vérifiez le fichier de variables secretes. Arrêt du script.")
exit(1)
if not os.path.exists(input_file) or not os.path.isfile(input_file): if not os.path.exists(input_file) or not os.path.isfile(input_file):
print(f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script.") print(
f"Erreur : Le fichier '{input_file}' n'a pas été trouvé. Arrêt du script."
)
exit(1) exit(1)
else: else:
print(f"Fichier '{input_file}' trouvé.") print(f"Fichier '{input_file}' trouvé.")
output_file = f"script_bash_get_sequences_for_user_{username}.sh" output_file = f"script_bash_get_sequences_for_user_{username}.sh"
access_token = "--access_token='"+args.dev_token+"' " access_token = "$MAPILLARY_DEV_TOKEN" # or, if you want to use the password in clear text: "'"+args.dev_token+"' "
format_string = "/usr/bin/python3 mapillary_download.py {} --sequence_id={}\n"
with open(output_file, "w") as output: with open(output_file, "w") as output:
with open(input_file, "r") as input_handle: with open(input_file, "r") as input_handle:
content = input_handle.read() content = input_handle.read()
sequences = eval(content) sequences = eval(content)
for seq in sequences: for seq in sequences:
full_cmd = f"/usr/bin/python3 mapillary_download.py {access_token} --sequence_id='{seq}' --username={username}\n" full_cmd = f"python mapillary_download.py {access_token} --sequence_ids={seq}\n"
output.write(full_cmd) output.write(full_cmd)
print(output_file) print(output_file)
print(f"\n Script Bash généré avec succès.") print(f"\n Script Bash généré avec succès.")
print(f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}") print(
f"Lancez le pour récupérer les photos de l'utilisateur {username}: \n bash {output_file}"
)

View File

@ -1,4 +1,4 @@
#source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py # source : https://gitlab.com/geovisio/geo-picture-tag-reader/-/blob/main/geopic_tag_reader/writer.py
from typing import Optional, Tuple from typing import Optional, Tuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dataclasses import dataclass from dataclasses import dataclass
@ -31,7 +31,8 @@ class PictureMetadata:
direction: Optional[float] = None direction: Optional[float] = None
orientation: Optional[int] = 1 orientation: Optional[int] = 1
class Writer():
class Writer:
def __init__(self, picture: bytes) -> None: def __init__(self, picture: bytes) -> None:
self.content = picture self.content = picture
self.image = pyexiv2.ImageData(picture) self.image = pyexiv2.ImageData(picture)
@ -53,7 +54,11 @@ class Writer():
if self.updated_xmp: if self.updated_xmp:
self.image.modify_xmp(self.updated_xmp) self.image.modify_xmp(self.updated_xmp)
except Exception as e: except Exception as e:
print("exception \nexif: {}\nxmp: {}".format(self.updated_exif, self.updated_xmp)) print(
"exception \nexif: {}\nxmp: {}".format(
self.updated_exif, self.updated_xmp
)
)
def close(self) -> None: def close(self) -> None:
self.image.close() self.image.close()
@ -65,7 +70,12 @@ class Writer():
""" """
Override exif metadata on raw picture and return updated bytes Override exif metadata on raw picture and return updated bytes
""" """
if not metadata.capture_time and not metadata.longitude and not metadata.latitude and not metadata.picture_type: if (
not metadata.capture_time
and not metadata.longitude
and not metadata.latitude
and not metadata.picture_type
):
return return
if metadata.capture_time: if metadata.capture_time:
@ -83,12 +93,20 @@ class Writer():
Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef Add latitude and longitude values in GPSLatitude + GPSLAtitudeRef and GPSLongitude + GPSLongitudeRef
""" """
if metadata.latitude is not None: if metadata.latitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = "N" if metadata.latitude > 0 else "S" self.updated_exif["Exif.GPSInfo.GPSLatitudeRef"] = (
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(metadata.latitude) "N" if metadata.latitude > 0 else "S"
)
self.updated_exif["Exif.GPSInfo.GPSLatitude"] = self._to_exif_dms(
metadata.latitude
)
if metadata.longitude is not None: if metadata.longitude is not None:
self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = "E" if metadata.longitude > 0 else "W" self.updated_exif["Exif.GPSInfo.GPSLongitudeRef"] = (
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(metadata.longitude) "E" if metadata.longitude > 0 else "W"
)
self.updated_exif["Exif.GPSInfo.GPSLongitude"] = self._to_exif_dms(
metadata.longitude
)
def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None: def add_altitude(self, metadata: PictureMetadata, precision: int = 1000) -> None:
""" """
@ -98,18 +116,24 @@ class Writer():
if altitude is not None: if altitude is not None:
negative_altitude = 0 if altitude >= 0 else 1 negative_altitude = 0 if altitude >= 0 else 1
self.updated_exif['Exif.GPSInfo.GPSAltitude'] = f"{int(abs(altitude * precision))} / {precision}" self.updated_exif["Exif.GPSInfo.GPSAltitude"] = (
self.updated_exif['Exif.GPSInfo.GPSAltitudeRef'] = negative_altitude f"{int(abs(altitude * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSAltitudeRef"] = negative_altitude
def add_direction(self, metadata: PictureMetadata, ref: str = 'T', precision: int = 1000) -> None: def add_direction(
self, metadata: PictureMetadata, ref: str = "T", precision: int = 1000
) -> None:
""" """
Add direction value in GPSImgDirection and GPSImgDirectionRef Add direction value in GPSImgDirection and GPSImgDirectionRef
""" """
direction = metadata.direction direction = metadata.direction
if metadata.direction is not None: if metadata.direction is not None:
self.updated_exif['Exif.GPSInfo.GPSImgDirection'] = f"{int(abs(direction % 360.0 * precision))} / {precision}" self.updated_exif["Exif.GPSInfo.GPSImgDirection"] = (
self.updated_exif['Exif.GPSInfo.GPSImgDirectionRef'] = ref f"{int(abs(direction % 360.0 * precision))} / {precision}"
)
self.updated_exif["Exif.GPSInfo.GPSImgDirectionRef"] = ref
def add_gps_datetime(self, metadata: PictureMetadata) -> None: def add_gps_datetime(self, metadata: PictureMetadata) -> None:
""" """
@ -120,14 +144,20 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata) metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override GPSInfo time and DatetimeOriginal # for capture time, override GPSInfo time and DatetimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset() offset = metadata.capture_time.utcoffset()
if offset is not None: if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset) self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC) utc_dt = metadata.capture_time.astimezone(tz=pytz.UTC)
self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d") self.updated_exif["Exif.GPSInfo.GPSDateStamp"] = utc_dt.strftime("%Y:%m:%d")
self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime("%H/1 %M/1 %S/1") self.updated_exif["Exif.GPSInfo.GPSTimeStamp"] = utc_dt.strftime(
"%H/1 %M/1 %S/1"
)
def add_datetimeoriginal(self, metadata: PictureMetadata) -> None: def add_datetimeoriginal(self, metadata: PictureMetadata) -> None:
""" """
@ -138,12 +168,18 @@ class Writer():
metadata.capture_time = self.localize(metadata.capture_time, metadata) metadata.capture_time = self.localize(metadata.capture_time, metadata)
# for capture time, override DatetimeOriginal and SubSecTimeOriginal # for capture time, override DatetimeOriginal and SubSecTimeOriginal
self.updated_exif["Exif.Photo.DateTimeOriginal"] = metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S") self.updated_exif["Exif.Photo.DateTimeOriginal"] = (
metadata.capture_time.strftime("%Y:%m:%d %H:%M:%S")
)
offset = metadata.capture_time.utcoffset() offset = metadata.capture_time.utcoffset()
if offset is not None: if offset is not None:
self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(offset) self.updated_exif["Exif.Photo.OffsetTimeOriginal"] = self.format_offset(
offset
)
if metadata.capture_time.microsecond != 0: if metadata.capture_time.microsecond != 0:
self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = metadata.capture_time.strftime("%f") self.updated_exif["Exif.Photo.SubSecTimeOriginal"] = (
metadata.capture_time.strftime("%f")
)
def add_img_projection(self, metadata: PictureMetadata) -> None: def add_img_projection(self, metadata: PictureMetadata) -> None:
""" """
@ -162,15 +198,15 @@ class Writer():
if metadata.artist is not None: if metadata.artist is not None:
self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'") self.updated_exif["Exif.Image.Artist"] = ascii(metadata.artist).strip("'")
def add_camera_make(self, metadata: PictureMetadata) -> None: def add_camera_make(self, metadata: PictureMetadata) -> None:
""" """
Add camera manufacture in Exif Make tag Add camera manufacture in Exif Make tag
""" """
if metadata.camera_make is not None: if metadata.camera_make is not None:
self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip("'") self.updated_exif["Exif.Image.Make"] = ascii(metadata.camera_make).strip(
"'"
)
def add_camera_model(self, metadata: PictureMetadata) -> None: def add_camera_model(self, metadata: PictureMetadata) -> None:
""" """
@ -178,7 +214,9 @@ class Writer():
""" """
if metadata.camera_model is not None: if metadata.camera_model is not None:
self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip("'") self.updated_exif["Exif.Image.Model"] = ascii(metadata.camera_model).strip(
"'"
)
def format_offset(self, offset: timedelta) -> str: def format_offset(self, offset: timedelta) -> str:
"""Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset """Format offset for OffsetTimeOriginal. Format is like "+02:00" for paris offset
@ -197,7 +235,7 @@ class Writer():
""" """
new_lat_lon = metadata.longitude is not None and metadata.latitude is not None new_lat_lon = metadata.longitude is not None and metadata.latitude is not None
if new_lat_lon : if new_lat_lon:
lon = metadata.longitude lon = metadata.longitude
lat = metadata.latitude lat = metadata.latitude