import argparse
import requests
import os
import piexif
from datetime import datetime
from tqdm import tqdm
# export MAPILLARY_TOKEN='your_token_here'
ACCESS_TOKEN = os.getenv('MAPILLARY_TOKEN', 'YOUR_ACTUAL_TOKEN_HERE')
def to_deg_high_prec(value, loc):
"""Convert decimal coordinates to DMS with 1,000,000x precision."""
abs_value = abs(value)
deg = int(abs_value)
t1 = (abs_value - deg) * 60
min = int(t1)
sec = (t1 - min) * 60
ref = loc[0] if value < 0 else loc[1]
return (deg, 1), (min, 1), (int(round(sec * 1000000)), 1000000), ref
def write_full_metadata(file_path, lat, lon, alt, heading, captured_at, orient, make, model):
"""Writes GPS, Heading, Orientation, Date, and Camera info to the image."""
try:
dt_obj = datetime.fromtimestamp(captured_at / 1000.0)
exif_date = dt_obj.strftime("%Y:%m:%d %H:%M:%S")
zeroth_ifd = {
piexif.ImageIFD.Make: str(make) if make else "Unknown",
piexif.ImageIFD.Model: str(model) if model else "Unknown",
piexif.ImageIFD.DateTime: exif_date,
piexif.ImageIFD.Orientation: orient if orient else 1
}
exif_ifd = {
piexif.ExifIFD.DateTimeOriginal: exif_date,
piexif.ExifIFD.DateTimeDigitized: exif_date,
}
lat_deg, lat_min, lat_sec, lat_ref = to_deg_high_prec(lat, ["S", "N"])
lon_deg, lon_min, lon_sec, lon_ref = to_deg_high_prec(lon, ["W", "E"])
gps_ifd = {
piexif.GPSIFD.GPSLatitudeRef: lat_ref,
piexif.GPSIFD.GPSLatitude: (lat_deg, lat_min, lat_sec),
piexif.GPSIFD.GPSLongitudeRef: lon_ref,
piexif.GPSIFD.GPSLongitude: (lon_deg, lon_min, lon_sec),
piexif.GPSIFD.GPSVersionID: (2, 2, 0, 0)
}
if alt is not None:
gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 0 if alt >= 0 else 1
gps_ifd[piexif.GPSIFD.GPSAltitude] = (int(round(abs(alt) * 1000)), 1000)
if heading is not None:
gps_ifd[piexif.GPSIFD.GPSImgDirectionRef] = "T"
gps_ifd[piexif.GPSIFD.GPSImgDirection] = (int(round(heading * 100)), 100)
exif_dict = {"0th": zeroth_ifd, "Exif": exif_ifd, "GPS": gps_ifd}
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, file_path)
except Exception as e:
print(f"Error tagging {file_path}: {e}")
def process_sequence(sequence_id, size_threshold):
headers = {"Authorization": f"OAuth {ACCESS_TOKEN}"}
list_url = f"https://graph.mapillary.com/image_ids?sequence_id={sequence_id}"
try:
res = requests.get(list_url, headers=headers)
res.raise_for_status()
image_ids = [item['id'] for item in res.json().get('data', [])]
except Exception as e:
print(f"API Error: {e}"); return
valid_data = []
to_delete_report = []
delete_ids_only = []
seen_metadata = set()
fields = "id,width,height,geometry,compass_angle,thumb_original_url,captured_at,exif_orientation,make,model"
# 1. Analysis Phase
for img_id in tqdm(image_ids, desc="Analyzing Sequence", unit="img"):
try:
data = requests.get(f"https://graph.mapillary.com/{img_id}?fields={fields}", headers=headers).json()
w, h = data.get("width", 0), data.get("height", 0)
coords = data.get("geometry", {}).get("coordinates")
heading = data.get("compass_angle")
meta_key = (coords[0], coords[1], heading) if coords else None
reason = None
if w <= size_threshold or h <= size_threshold:
reason = f"Small Size ({w}x{h}px)"
elif meta_key and meta_key in seen_metadata:
reason = f"Duplicate Position/Heading"
if reason:
to_delete_report.append(f"ID: {img_id} | Reason: {reason}")
delete_ids_only.append(img_id)
else:
if meta_key: seen_metadata.add(meta_key)
valid_data.append(data)
except: continue
# 2. Output Deletion Reports
with open(f"cleanup_report_{sequence_id}.txt", "w") as f:
f.write(f"Cleanup Report for {sequence_id}\n" + "\n".join(to_delete_report))
with open(f"delete_list_{sequence_id}.txt", "w") as f:
f.write("\n".join(delete_ids_only))
# 3. Download & Tag Phase
if valid_data:
folder = f"mapillary_{sequence_id}"
os.makedirs(folder, exist_ok=True)
for item in tqdm(valid_data, desc="Downloading & Tagging", unit="img"):
img_res = requests.get(item["thumb_original_url"], timeout=30)
if img_res.status_code == 200:
path = os.path.join(folder, f"{item['id']}.jpg")
with open(path, 'wb') as f: f.write(img_res.content)
c = item["geometry"]["coordinates"]
write_full_metadata(
path, c[1], c[0], c[2] if len(c) > 2 else None,
item.get("compass_angle"), item.get("captured_at"),
item.get("exif_orientation"), item.get("make"), item.get("model")
)
print(f"\nDone. Flagged: {len(delete_ids_only)} | Downloaded: {len(valid_data)}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("sequence_id")
parser.add_argument("--size", type=int, default=100)
args = parser.parse_args()
process_sequence(args.sequence_id, args.size)