Using the 70mai A810 Dashcam for Mapillary

I use the 70mai A810 dashcam. It is not supported by the Mapillary app. I am not a programmer, but with the help of ChatGPT, I managed to create two scripts for uploading videos to the Mapillary service.

The first script uses the Mapillary desktop application, and I described its functionality on a technical forum dedicated to the 70mai A810 dashcam here: 70mai Dash Cam 4K A810 - 4PDA. Here is an example of how this script works: Mapillary

The second script uses mapillary_tools and creates tracks with higher point density. Its functionality is described here: 70mai Dash Cam 4K A810 - 4PDA . Additionally, it allows you to specify the device used to capture the images:

I am not a programmer and do not know English (this text was created using a translator). However, there is very little information on the internet about 70mai dashcams and their integration with Mapillary. That’s why I decided to share what I’ve found and use myself.

2 Likes

Hi Doriel,

This is very interesting, because previously we have not been able to extract GPS data from the 70mai A810 dashcam.

However, I have two questions:

  1. In the forum you link to A810_GPX_Track_Creator_by_File.exe - however that link is broken, return a 404.

  2. Could you publish the source code for this application, we may be able to include support natively in Mapillary desktop uploader so that other users can benefit as well.


Привет, Дориэль,

Это очень интересно, потому что ранее нам не удавалось извлечь данные GPS из видеорегистратора 70mai A810.

Однако у меня есть два вопроса:

  1. На форуме вы ссылаетесь на A810_GPX_Track_Creator_by_File.exe - однако эта ссылка не работает, возвращается 404.

  2. Не могли бы вы опубликовать исходный код этого приложения, возможно, мы сможем включить поддержку изначально в загрузчик Mapillary для настольных компьютеров, чтобы другие пользователи также могли воспользоваться.

Boris, hi. The file A810_GPX_Track_Creator_by_File.exe was created for me by one of the forum users. I don’t have the source code for this file. I’m not exactly sure how to extract it from the exe file, but I have uploaded the file to Google Drive, and you can download it from there.

Thanks so much! Maybe you can ask that forum user for the source code? It would also be good for us to incorporate into Mapillary natively so that users who don’t have Windows can also run it (I have Mac OS for example)

Ah, I see the instructions for how the .exe works in the forum post you linked to. So basically the video metadata is located in a separate .txt file and this .exe converts them to .gpx files for upload to Mapillary. Thanks again!

1 Like

Maybe that’s all it does, maybe not. Guys, please be extremely careful downloading and executing code from a dubious source. The modus oprandi for building trust in code on the internet is through open source or by checking code signing signatures (although signatures by themselves will not stop malicious intent or code). And, I am really :worried: worried that Mapillary employees openly condone this kind of conduct.

1 Like

@GITNE - that’s a good callout - I agree that everyone should be careful about downloading and executing .exe files. I think that can be left to individual the discretion (using a virus scan together with a sandbox or VM for example). My request above was for the OP to share the source code. I was also happy to see the description in the forum for how it works so that folks could re-create it. Here’s what it says for reference:

Inside GPSData000001.txt looks like:
1735534427,A,46.835285,29.454746,0.27,-86.5.56,NO20241230-145333-000000F.MP4,0,0,0
Sometimes such lines are separated by lines “$V02” and lines containing "nan”

  • 1735534427 — Time in seconds since the start of the Unix era (January 1, 1970). This is a timestamp indicating when GPS data was recorded

  • A — Satellite signal status indicator: “A” (Active) - data is valid, the device has successfully established communication with satellites and determined coordinates.

  • 46.835285 — Geographic latitude, expressed in decimal degrees.

  • 29.454746 — Geographic longitude, expressed in decimal degrees.

  • 0 — Orientation (17000 / 100 = 170.00 degrees… )

  • 27 — Speed ​​in centimeters per second (cm/s). In this case, the device was moving at this speed at the time of recording

  • -86 — unknown

  • 5 — unknown

  • 56 — unknown

  • NO20241230-145333-000000F.MP4 — The name of the video file that is currently being recorded. It states that this video was taken on December 30, 2024 at 14:53:33 (device time), with a unique identifier of “000000F”

  • 0, 0, 0 — These are likely additional parameters such as recording system status or additional labels

Line $V02 shares data between DVR starts. Lines containing “nan” , indicate that it was not possible to contact the satellites.

Ideally someone could take this description and contribute an implementation to mapillary_tools to handle this natively to benefit the whole community.

1 Like

Boris, hi. I think you might be a bit confused. The .exe file you asked about (I requested its code to be shared with me) indeed splits the original file GPSData000001.txt into a set of .txt files and converts them to GPX. After that, the user manually uploads the videos using the Mapillary desktop application. This .exe file is not related to mapillary_tools in any way.

However, in my first message, I mentioned that I also managed to obtain a script for Windows that fully automates the upload of videos with denser point recording via mapillary_tools. This is my own script, and I have its code, which I will attach below.

I want to clarify that I encountered an issue where, when calling mapillary_tools commands, the tool exclusively uses the computer’s CPU power and ignores the GPU capabilities. But I have made progress and managed to modify the script to utilize the full power of the PC (CPU + NVIDIA GPU). If you’re interested, I can share that version as well.

Here is the code for the mapillary_tools script. The main script, map.py, consists of three separate scripts, each executed sequentially. Since the entire code was developed with the help of ChatGPT, I was unable to merge all actions into a single large script. However, this approach makes it easier to track errors at each stage.

map.py

import subprocess
import sys
import os
from colorama import Fore, Style, init

# Инициализация colorama для работы на Windows
init(autoreset=True)

def run_script(script_path):  
    """
    Функция для последовательного запуска Python-скриптов.
    :param script_path: Путь к исполняемому Python-скрипту.
    """
    try:
        print(f"{Fore.YELLOW}Запуск скрипта: {script_path}")
        subprocess.run([sys.executable, script_path], check=True)
        print(f"{Fore.GREEN}Скрипт выполнен: {script_path}\n")
    except subprocess.CalledProcessError as e:
        print(f"{Fore.RED}Ошибка при выполнении {script_path}: {e}")
    except Exception as e:
        print(f"{Fore.RED}Непредвиденная ошибка: {e}")

def ask_to_upload():
    """
    Функция для запроса у пользователя о загрузке файлов в Mapillary.
    Если пользователь выбирает "да", запускает `three_script.py`.
    """
    third_script = os.path.join(os.path.dirname(__file__), "three_script.py")
    while True:
        response = input("Локальная обработка файлов завершена. Желаете загрузить файлы в Mapillary? (да/нет): ").strip().lower()
        if response in ["да", "д", "yes", "y"]:
            print(f"{Fore.YELLOW}Запуск процесса загрузки в Mapillary...")
            run_script(third_script)
            break
        elif response in ["нет", "н", "no", "n"]:
            print(f"{Fore.CYAN}Загрузка в Mapillary отменена.")
            break
        else:
            print(f"{Fore.RED}Пожалуйста, введите 'да' или 'нет'.")

def main():
    # Пути к двум основным скриптам
    first_script = "first_script.py"
    second_script = "second_script.py"
    
    # Последовательный запуск
    run_script(first_script)
    run_script(second_script)
    
    # Вопрос о загрузке в Mapillary
    ask_to_upload()

if __name__ == "__main__":
    main()

first_script.py

import csv
import math
from datetime import datetime
import time
import xml.etree.ElementTree as ET
import os
from xml.dom.minidom import parseString

# Константы
R = 6371000  # Радиус Земли в метрах

# Функция для вычисления расстояния по формуле Хаверсина
def haversine(lat1, lon1, lat2, lon2):
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Функция для преобразования времени в миллисекунды
def convert_to_ms(time_str):
    """Преобразуем строку времени в формате 'YYYY-MM-DD HH:MM:SS' в миллисекунды от эпохи."""
    time_obj = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
    epoch = datetime(1970, 1, 1)
    return int((time_obj - epoch).total_seconds() * 1000)

# Функция для вычисления промежуточных точек через каждые step_size метра
def interpolate_points(start, end, step_size):
    distance = haversine(start[0], start[1], end[0], end[1])
    num_points = int(distance // step_size)  # Количество точек, которые нужно добавить
    points = [start]
    for i in range(1, num_points):
        lat = start[0] + (end[0] - start[0]) * (i / num_points)
        lon = start[1] + (end[1] - start[1]) * (i / num_points)
        # Добавляем точку только если она уникальна
        if (lat, lon) != points[-1]:  # Если координаты отличаются от последней добавленной точки
            points.append((lat, lon))
    points.append(end)
    return points

# Функция для обработки данных с интерполяцией и разбивкой по видео
def process_and_split_gps_data(input_file, step_size=1.0):  # Используем шаг 1 метр
    video_times = {}
    video_points = {}

    # Чтение исходных данных для сбора временных интервалов для каждого видео
    with open(input_file, 'r') as infile:
        for line in infile:
            if "$V02" in line or "nan" in line:
                continue

            parts = line.strip().split(',')
            if len(parts) > 10:
                video_name = parts[9]
                timestamp_ms = int(float(parts[0]) * 1000)

                if video_name not in video_times:
                    video_times[video_name] = {"start": timestamp_ms, "end": timestamp_ms}
                else:
                    video_times[video_name]["end"] = timestamp_ms

                lat = float(parts[2])
                lon = float(parts[3])

                if video_name not in video_points:
                    video_points[video_name] = []

                video_points[video_name].append((lat, lon, timestamp_ms))

    # Обработка данных с распределением времени по интервалам
    all_lines = []
    for video_name, points in video_points.items():
        video_time_range = video_times[video_name]
        start_time = video_time_range["start"]
        end_time = video_time_range["end"]
        time_diff = end_time - start_time

        # Равномерное распределение времени между точками
        num_points = len(points)
        time_step = time_diff / (num_points - 1)  # Время между точками

        for i in range(num_points - 1):
            start_point = points[i]
            end_point = points[i + 1]

            # Интерполируем точки между start_point и end_point с шагом 1 метр
            interpolated_points = interpolate_points(start_point[:2], end_point[:2], step_size)

            # Присваиваем времени каждой точке
            for j, (lat, lon) in enumerate(interpolated_points):
                timestamp_ms = start_time + int(i * time_step) + int(j * time_step / len(interpolated_points))
                # Только добавляем точку, если она отличается от последней добавленной
                if len(all_lines) == 0 or (all_lines[-1][1], all_lines[-1][2]) != (round(lat, 6), round(lon, 6)):
                    all_lines.append([timestamp_ms, round(lat, 6), round(lon, 6), video_name])

    # Разбивка на файлы по имени видео
    video_files = {}

    for line in all_lines:
        video_name = line[3]

        # Убираем ".MP4" из имени видео
        video_name = video_name.replace(".MP4", "")

        # Если файл для этого видео еще не открыт, открываем новый файл
        if video_name not in video_files:
            video_files[video_name] = open(f"{video_name}.txt", 'w', newline='')

        # Записываем строку в соответствующий файл
        writer = csv.writer(video_files[video_name])
        writer.writerow(line)

    # Закрытие всех файлов
    for video_file in video_files.values():
        video_file.close()

# Функция для форматирования XML
def prettify_xml(elem):
    """Функция для форматирования XML с отступами."""
    rough_string = ET.tostring(elem, encoding='utf-8')
    parsed = parseString(rough_string)
    return parsed.toprettyxml(indent="  ")

# Функция для преобразования txt в gpx
def convert_to_gpx(input_file):
    """Преобразует данные из txt файла в формат gpx."""
    with open(input_file, 'r') as file:
        lines = file.readlines()

    if not lines:
        return

    video_tracks = {}

    for line in lines:
        if not line.strip():  # Пропускаем пустые строки
            continue
        try:
            timestamp, latitude, longitude, filename = line.strip().split(',')
        except ValueError:
            continue

        # Преобразование временной метки в читаемое время с миллисекундами
        timestamp = int(timestamp)
        time_seconds = timestamp // 1000  # Целые секунды
        time_millis = timestamp % 1000  # Миллисекунды

        # Преобразование секунд в формат времени
        time_str = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(time_seconds))

        # Преобразуем миллисекунды
        time_with_millis = f"{time_str[:-1]}.{time_millis:03d}Z"  # Вставляем миллисекунды

        if filename not in video_tracks:
            video_tracks[filename] = []

        video_tracks[filename].append({
            'lat': latitude,
            'lon': longitude,
            'time': time_with_millis
        })

    # Генерация gpx файлов
    for filename, track_points in video_tracks.items():
        output_file = os.path.splitext(filename)[0] + '.gpx'

        try:
            gpx = ET.Element('gpx', version='1.1', xmlns='http://www.topografix.com/GPX/1/1')
            metadata = ET.SubElement(gpx, 'metadata')
            metadata_name = ET.SubElement(metadata, 'name')
            metadata_name.text = f'Converted GPS Data for {filename}'

            trk = ET.SubElement(gpx, 'trk')
            trkseg = ET.SubElement(trk, 'trkseg')

            for point in track_points:
                trkpt = ET.SubElement(trkseg, 'trkpt', lat=point['lat'], lon=point['lon'])
                time_elem = ET.SubElement(trkpt, 'time')
                time_elem.text = point['time']

            # Форматирование и запись в файл
            pretty_xml_as_string = prettify_xml(gpx)
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(pretty_xml_as_string)

        except Exception as e:
            pass

# Функция для обработки всех txt файлов в директории и конвертации их в gpx
def process_all_txt_files():
    """Обрабатывает все .txt файлы в директории и конвертирует их в .gpx."""
    files = [f for f in os.listdir() if f.endswith('.txt') and f != 'gps_to_gpx.py']  # Исключаем скрипт
    for file in files:
        if file != 'GPSData000001.txt':  # Исключаем исходный файл
            convert_to_gpx(file)

# Функция для удаления всех .txt файлов после обработки, кроме GPSData000001.txt
def delete_txt_files():
    """Удаляет все .txt файлы в текущей директории, кроме GPSData000001.txt."""
    files = [f for f in os.listdir() if f.endswith('.txt') and f != 'gps_to_gpx.py' and f != 'GPSData000001.txt']  # Исключаем скрипт и исходный файл
    for file in files:
        os.remove(file)

def process_and_convert(input_file):
    """Обрабатывает исходный txt файл, делит его и конвертирует в gpx."""
    process_and_split_gps_data(input_file, step_size=1.0)
    process_all_txt_files()
    delete_txt_files()  # Удаление .txt файлов после обработки

# Запуск обработки
input_file = 'GPSData000001.txt'
process_and_convert(input_file)

second_script.py

import os
import shutil
import subprocess
import xml.etree.ElementTree as ET

def create_temp_folder():
    temp_folder = os.path.join(os.getcwd(), 'temp')
    if not os.path.exists(temp_folder):
        os.makedirs(temp_folder)
    return temp_folder

def get_start_time_from_gpx(gpx_file):
    tree = ET.parse(gpx_file)
    root = tree.getroot()
    namespaces = {'': 'http://www.topografix.com/GPX/1/1'}
    time_element = root.find('.//trkpt/time', namespaces)

    if time_element is not None:
        time_str = time_element.text.strip()
        time_parts = time_str.split("T")
        date = time_parts[0]
        time = time_parts[1].split(".")[0]
        year, month, day = date.split("-")
        hour, minute, second = time.split(":")
        formatted_time = f"{year}_{month}_{day}_{hour}_{minute}_{second}_000000"
        return formatted_time
    return None

def process_video_and_move_json(video_file, gpx_file, video_folder):
    temp_folder = create_temp_folder()
    video_start_time = get_start_time_from_gpx(gpx_file)
    if not video_start_time:
        return

    video_file_path = os.path.join(os.getcwd(), video_file)
    command = [
        "mapillary_tools", "video_process", 
        video_file_path,
        "--geotag_source", "gpx", 
        "--geotag_source_path", gpx_file, 
        "--video_sample_distance", "-1", 
        "--video_sample_interval", "0.1", 
        "--video_start_time", video_start_time, 
        "--skip_process_errors"
    ]

    subprocess.run(command, check=True)

    sampled_folder = os.path.join(os.getcwd(), 'mapillary_sampled_video_frames')

    json_file = None
    for file in os.listdir(sampled_folder):
        if file.endswith(".json"):
            json_file = os.path.join(sampled_folder, file)
            break

    if json_file:
        video_folder_path = os.path.join(sampled_folder, video_folder + ".MP4")
        if not os.path.exists(video_folder_path):
            os.makedirs(video_folder_path)

        shutil.move(json_file, os.path.join(video_folder_path, f"{video_folder}.MP4.json"))
        shutil.move(video_folder_path, temp_folder)

def move_files_from_temp_to_mapillary():
    temp_folder = os.path.join(os.getcwd(), 'temp')
    sampled_folder = os.path.join(os.getcwd(), 'mapillary_sampled_video_frames')

    if not os.path.exists(sampled_folder):
        os.makedirs(sampled_folder)

    for item in os.listdir(temp_folder):
        temp_item_path = os.path.join(temp_folder, item)
        if os.path.isdir(temp_item_path):
            shutil.move(temp_item_path, sampled_folder)

    if os.path.exists(temp_folder) and not os.listdir(temp_folder):
        os.rmdir(temp_folder)

def main():
    video_files = [f for f in os.listdir(os.getcwd()) if f.lower().endswith(".mp4")]
    for video_file in video_files:
        gpx_file = video_file.replace(".MP4", ".gpx").replace(".mp4", ".gpx")
        if os.path.exists(gpx_file):
            video_folder = video_file.replace(".MP4", "").replace(".mp4", "")
            process_video_and_move_json(video_file, gpx_file, video_folder)
        else:
            print(f"Не найден gpx файл для {video_file}. Пропускаем.")

    move_files_from_temp_to_mapillary()

    sampled_folder = os.path.join(os.getcwd(), 'mapillary_sampled_video_frames')
    if os.path.exists(sampled_folder) and not os.listdir(sampled_folder):
        os.rmdir(sampled_folder)

if __name__ == "__main__":
    main()

three_script.py

import os
import subprocess

def list_folders(directory):
    """Возвращает словарь {номер папки: имя папки}."""
    try:
        folders = [
            item for item in os.listdir(directory)
            if os.path.isdir(os.path.join(directory, item))
        ]
        folder_dict = {
            int(folder.split('-')[-1].split('F')[0]): folder
            for folder in folders if 'F' in folder
        }
        return folder_dict
    except Exception as e:
        print(f"Ошибка при получении списка папок: {e}")
        return {}

def authenticate_user(user_name):
    """Выполняет команду авторизации Mapillary."""
    try:
        print(f"Выполняется авторизация для пользователя: {user_name}...")
        command = ["mapillary_tools", "authenticate", "--user_name", user_name]
        subprocess.run(command, check=True)
        print("Авторизация успешно завершена.")
    except subprocess.CalledProcessError as e:
        print(f"Ошибка при авторизации: {e}")
        exit(1)

def process_selected_folders(directory, selected_numbers, folder_dict, user_name):
    """Обрабатывает выбранные папки и выполняет команды."""
    try:
        invalid_numbers = [num for num in selected_numbers if num not in folder_dict]
        if invalid_numbers:
            print(f"Ошибка: следующие номера папок отсутствуют: {invalid_numbers}")
        else:
            for number in selected_numbers:
                folder = folder_dict[number]
                folder_path = os.path.join(directory, folder)
                json_filename = f"{folder}.json"  # Имя файла JSON
                json_path = os.path.join(folder_path, json_filename)
                
                # Выводим пути для проверки
                print(f"Проверяю путь папки: {folder_path}")
                print(f"Проверяю путь файла: {json_path}")
                
                if not os.path.exists(folder_path):
                    print(f"Ошибка: папка {folder_path} не найдена. Пропускаю.")
                    continue

                if not os.path.exists(json_path):
                    print(f"Ошибка: файл {json_path} не найден. Пропускаю папку {folder}.")
                    continue

                command = [
                    "mapillary_tools",
                    "upload",
                    folder_path,
                    "--desc_path",
                    json_path,
                    "--user_name",
                    user_name  # Логин пользователя
                ]
                print(f"Выполняю команду: {' '.join(command)}")

                try:
                    subprocess.run(command, check=True)
                    print("\n==============================")
                    print(f"Папка {folder} успешно обработана.")
                    print("==============================\n")
                except subprocess.CalledProcessError as e:
                    print(f"Ошибка при выполнении команды для папки {folder}: {e}")
    except Exception as e:
        print(f"Ошибка при обработке папок: {e}")

if __name__ == "__main__":
    base_directory = "mapillary_sampled_video_frames"

    # Спрашиваем пользователя, авторизирован ли он
    authorized = input("Вы авторизированы в Mapillary? (да/нет): ").strip().lower()
    user_name = ""
    if authorized in ("да", "yes"):
        # Запрашиваем логин пользователя
        user_name = input("Введите ваш логин Mapillary: ").strip()
        if not user_name:
            print("Ошибка: логин не может быть пустым. Завершение работы.")
            exit(1)
    elif authorized in ("нет", "no"):
        # Запрашиваем логин пользователя
        user_name = input("Введите ваш логин Mapillary для авторизации: ").strip()
        if not user_name:
            print("Ошибка: логин не может быть пустым. Завершение работы.")
            exit(1)
        # Выполняем авторизацию
        authenticate_user(user_name)
    else:
        print("Ошибка: ответ должен быть 'да' или 'нет'. Завершение работы.")
        exit(1)

    folder_dict = list_folders(base_directory)
    if not folder_dict:
        print("Папки не найдены или произошла ошибка.")
    else:
        print(f"Номера найденных папок: {sorted(folder_dict.keys())}")

        selected_input = input("Введите номера папок для обработки через запятую. Для выбора всех папок напишите 'all': ").strip().lower()
        try:
            if selected_input == "all":
                selected_numbers = list(folder_dict.keys())
            else:
                selected_numbers = list(map(int, selected_input.split(',')))
            
            process_selected_folders(base_directory, selected_numbers, folder_dict, user_name)
        except ValueError:
            print("Ошибка: введите корректные номера через запятую или напишите 'all'.")

Sure, it always is a user’s personal decision what they run or don’t run. Nobody wants to or can change this. But, please note that the vast majority of users are not computer geeks who a) are mostly aware of many security risks and b) use VMs. Virus scanners won’t protect geeks nor “your grandma” either, except for known viruses. It is about the general conduct and setting a bad example.
Regardless of this, I am all for improving mapillary_tools’ device support.

1 Like

I thought you were not a programmer? So, how comes you have written three scripts now, including GPU support (which I seriously doubt because you can’t expect any speedup by vectorizing this sort of data conversion on the GPU, hence there is also no need for this)? Anyhow, if you have anything to add, please make a pull request on GitHub - mapillary/mapillary_tools: Command line tools for processing and uploading Mapillary imagery and I am sure @tao is going to be happy to merge it.

I’m sincerely sorry that you are so negatively inclined and doubt everything. Instead of clarifying the details, you are only trying to catch me in a lie. It feels like ChatGPT is something distant for you, and you probably haven’t studied the Mapillary_tools documentation thoroughly and don’t fully understand which tools it uses in its work.

FFmpeg is used for video processing. If you run video processing through Mapillary_tools, it’s impossible to influence FFmpeg. The processing speed will depend on the processor. However, if you use FFmpeg separately, you can leverage the power of the graphics card using the Nvidia CUDA Toolkit. FFmpeg supports this feature.

So, my script, which you doubt, performs the following steps:

  1. Processes videos using FFmpeg (with GPU acceleration).

  2. Uploads them to the service using Mapillary_tools.

I am attaching the script code that uses the GPU below:

map.py


import subprocess
import sys
import os

def run_script(script_path):  
    """
    Функция для последовательного запуска Python-скриптов.
    :param script_path: Путь к исполняемому Python-скрипту.
    """
    try:
        print(f"Запуск скрипта: {script_path}")
        subprocess.run([sys.executable, script_path], check=True)
        print(f"Скрипт выполнен: {script_path}\n")
    except subprocess.CalledProcessError as e:
        print(f"Ошибка при выполнении {script_path}: {e}")
    except Exception as e:
        print(f"Непредвиденная ошибка: {e}")

def ask_to_upload():
    """
    Функция для запроса у пользователя о загрузке файлов в Mapillary.
    Если пользователь выбирает "да", запускает `third_script_script.py`.
    """
    third_script = os.path.join(os.path.dirname(__file__), "third_script_script.py")
    while True:
        response = input("Локальная обработка файлов завершена. Желаете загрузить файлы в Mapillary? (да/нет): ").strip().lower()
        if response in ["да", "д", "yes", "y"]:
            print("Запуск процесса загрузки в Mapillary...")
            run_script(third_script)
            break
        elif response in ["нет", "н", "no", "n"]:
            print("Загрузка в Mapillary отменена.")
            break
        else:
            print("Пожалуйста, введите 'да' или 'нет'.")

def main():
    # Пути к двум основным скриптам
    first_script = "first_script.py"
    second_script = "second_script.py"
    
    # Последовательный запуск
    run_script(first_script)
    run_script(second_script)
    
    # Вопрос о загрузке в Mapillary
    ask_to_upload()

if __name__ == "__main__":
    main()

first_script.py

import csv
import math
from datetime import datetime
import time
import xml.etree.ElementTree as ET
import os
from xml.dom.minidom import parseString

# Константы
R = 6371000  # Радиус Земли в метрах

# Функция для вычисления расстояния по формуле Хаверсина
def haversine(lat1, lon1, lat2, lon2):
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Функция для вычисления промежуточных точек через каждые step_size метра
def interpolate_points(start, end, step_size):
    distance = haversine(start[0], start[1], end[0], end[1])
    num_points = int(distance // step_size)
    points = [start]
    for i in range(1, num_points):
        lat = start[0] + (end[0] - start[0]) * (i / num_points)
        lon = start[1] + (end[1] - start[1]) * (i / num_points)
        if (lat, lon) != points[-1]:
            points.append((lat, lon))
    points.append(end)
    return points

# Функция для обработки данных с интерполяцией и увеличением времени
def process_and_split_gps_data(input_file, time_offset_seconds, step_size=1.0):
    video_times = {}
    video_points = {}

    with open(input_file, 'r') as infile:
        for line in infile:
            if "$V02" in line or "nan" in line:
                continue

            parts = line.strip().split(',')
            if len(parts) > 10:
                video_name = parts[9]
                original_timestamp_ms = int(float(parts[0]) * 1000)
                timestamp_ms = original_timestamp_ms + time_offset_seconds * 1000

                # Проверка корректности увеличения времени

                if video_name not in video_times:
                    video_times[video_name] = {"start": timestamp_ms, "end": timestamp_ms}
                else:
                    video_times[video_name]["end"] = timestamp_ms

                lat = float(parts[2])
                lon = float(parts[3])

                if video_name not in video_points:
                    video_points[video_name] = []

                video_points[video_name].append((lat, lon, timestamp_ms))

    all_lines = []
    for video_name, points in video_points.items():
        video_time_range = video_times[video_name]
        start_time = video_time_range["start"]
        end_time = video_time_range["end"]
        time_diff = end_time - start_time
        num_points = len(points)
        time_step = time_diff / (num_points - 1)

        for i in range(num_points - 1):
            start_point = points[i]
            end_point = points[i + 1]
            interpolated_points = interpolate_points(start_point[:2], end_point[:2], step_size)

            for j, (lat, lon) in enumerate(interpolated_points):
                timestamp_ms = start_time + int(i * time_step) + int(j * time_step / len(interpolated_points))
                if len(all_lines) == 0 or (all_lines[-1][1], all_lines[-1][2]) != (round(lat, 6), round(lon, 6)):
                    all_lines.append([timestamp_ms, round(lat, 6), round(lon, 6), video_name])

    video_files = {}
    for line in all_lines:
        video_name = line[3]
        video_name = video_name.replace(".MP4", "")
        if video_name not in video_files:
            video_files[video_name] = open(f"{video_name}.txt", 'w', newline='')
        writer = csv.writer(video_files[video_name])
        writer.writerow(line)

    for video_file in video_files.values():
        video_file.close()

# Функция для форматирования XML
def prettify_xml(elem):
    rough_string = ET.tostring(elem, encoding='utf-8')
    parsed = parseString(rough_string)
    return parsed.toprettyxml(indent="  ")

# Функция для преобразования txt в gpx
def convert_to_gpx(input_file):
    with open(input_file, 'r') as file:
        lines = file.readlines()

    if not lines:
        return

    video_tracks = {}

    for line in lines:
        if not line.strip():
            continue
        try:
            timestamp, latitude, longitude, filename = line.strip().split(',')
        except ValueError:
            continue

        timestamp = int(timestamp)
        time_seconds = timestamp // 1000
        time_millis = timestamp % 1000
        time_str = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(time_seconds))
        time_with_millis = f"{time_str[:-1]}.{time_millis:03d}Z"

        if filename not in video_tracks:
            video_tracks[filename] = []

        video_tracks[filename].append({
            'lat': latitude,
            'lon': longitude,
            'time': time_with_millis
        })

    for filename, track_points in video_tracks.items():
        output_file = os.path.splitext(filename)[0] + '.gpx'
        try:
            gpx = ET.Element('gpx', version='1.1', xmlns='http://www.topografix.com/GPX/1/1')
            metadata = ET.SubElement(gpx, 'metadata')
            metadata_name = ET.SubElement(metadata, 'name')
            metadata_name.text = f'Converted GPS Data for {filename}'

            trk = ET.SubElement(gpx, 'trk')
            trkseg = ET.SubElement(trk, 'trkseg')

            for point in track_points:
                trkpt = ET.SubElement(trkseg, 'trkpt', lat=point['lat'], lon=point['lon'])
                time_elem = ET.SubElement(trkpt, 'time')
                time_elem.text = point['time']

            pretty_xml_as_string = prettify_xml(gpx)
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(pretty_xml_as_string)

        except Exception as e:
            pass

def process_all_txt_files():
    files = [f for f in os.listdir() if f.endswith('.txt') and f != 'gps_to_gpx.py']
    for file in files:
        if file != 'GPSData000001.txt':
            convert_to_gpx(file)

def delete_txt_files():
    files = [f for f in os.listdir() if f.endswith('.txt') and f != 'gps_to_gpx.py' and f != 'GPSData000001.txt']
    for file in files:
        os.remove(file)

def process_and_convert(input_file, time_offset_hours):
    time_offset_seconds = time_offset_hours * 3600
    process_and_split_gps_data(input_file, time_offset_seconds, step_size=1.0)
    process_all_txt_files()
    delete_txt_files()

# Запуск обработки
if __name__ == "__main__":
    input_file = 'GPSData000001.txt'
    time_offset_hours = int(input("На сколько часов увеличить время? "))
    process_and_convert(input_file, time_offset_hours)

second_script.py

import os
import gpxpy
import json
import subprocess
import shutil

def extract_frames_with_ffmpeg(video_path, output_folder, sample_interval=0.1):
    command = [
        "ffmpeg", "-hide_banner", "-hwaccel", "cuda", "-hwaccel_output_format", "cuda",
        "-i", video_path, "-vf", f"fps=1/{sample_interval},hwdownload,format=nv12",
        "-qscale:v", "1", os.path.join(output_folder, "frame_%03d.jpg")
    ]
    subprocess.run(command, check=True)
    print(f"\n{'-'*60}")
    print(f"Кадры успешно извлечены для {video_path}.")
    print(f"{'-'*60}\n")

def process_gpx(gpx_path):
    with open(gpx_path, 'r') as gpx_file:
        gpx = gpxpy.parse(gpx_file)
    geo_data = [
        {'time': point.time, 'lat': point.latitude, 'lon': point.longitude}
        for track in gpx.tracks for segment in track.segments for point in segment.points
    ]
    return geo_data

def detect_stationary_motion(geo_data, movement_threshold=0.00001):
    """ Проверяет, есть ли заметные изменения в координатах. """
    stationary = True
    last_lat, last_lon = geo_data[0]['lat'], geo_data[0]['lon']
    
    for point in geo_data[1:]:
        lat, lon = point['lat'], point['lon']
        if abs(lat - last_lat) > movement_threshold or abs(lon - last_lon) > movement_threshold:
            stationary = False
            break
        last_lat, last_lon = lat, lon
    
    return stationary

def generate_json_data(geo_data, set_folder, start_idx=0, device_make="70mai", device_model="A810"):
    json_data = []
    frame_files = [f for f in os.listdir(set_folder) if f.endswith('.jpg')]
    frame_files.sort()  # Убедимся, что они идут в правильном порядке

    for idx, geo_point in enumerate(geo_data[start_idx:start_idx + 500]):
        frame_name = f"frame_{start_idx + idx + 1:03d}.jpg"
        frame_path = os.path.join(set_folder, frame_name)
        
        if os.path.exists(frame_path):
            capture_time = geo_point['time'].strftime("%Y_%m_%d_%H_%M_%S_%f")[:-3]

            json_data.append({
                "MAPDeviceMake": device_make,
                "MAPDeviceModel": device_model,
                "MAPLatitude": geo_point['lat'],
                "MAPLongitude": geo_point['lon'],
                "MAPCaptureTime": capture_time,
                "filename": frame_path.replace("\\", "/"),
                "filetype": "image"
            })
        # Убираем вывод сообщения, если кадр не найден

    return json_data

def save_json(json_data, set_folder, video_name, set_number):
    json_filename = os.path.join(set_folder, f"{video_name}_geotags_{set_number}.json")
    with open(json_filename, 'w') as json_file:
        json.dump(json_data, json_file, indent=4)
    print(f"JSON для набора {set_number} сохранен: {json_filename}")

def process_video(video_file, gpx_file, base_directory, failed_videos):
    video_name, _ = os.path.splitext(video_file)
    output_folder = os.path.join(base_directory, video_name)
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    video_path = os.path.join(base_directory, video_file)
    extract_frames_with_ffmpeg(video_path, output_folder)
    geo_data = process_gpx(gpx_file)

    # Разделяем изображения на группы по 500
    total_frames = len([f for f in os.listdir(output_folder) if f.endswith('.jpg')])
    set_number = 1
    for start_idx in range(0, total_frames, 500):
        print(f"\nОбрабатываем набор {set_number}...")
        # Создаем папку для набора set_X
        set_folder = os.path.join(output_folder, f"set_{set_number}")
        os.makedirs(set_folder, exist_ok=True)

        # Перемещаем кадры в папку set_X
        for frame_idx in range(start_idx, min(start_idx + 500, total_frames)):
            frame_name = f"frame_{frame_idx + 1:03d}.jpg"
            frame_path = os.path.join(output_folder, frame_name)
            if os.path.exists(frame_path):
                shutil.move(frame_path, os.path.join(set_folder, frame_name))

        # Генерируем JSON для текущего набора
        json_data = generate_json_data(geo_data, set_folder, start_idx=start_idx)
        if json_data:  # Только если есть данные
            save_json(json_data, set_folder, video_name, set_number)
        else:
            print(f"[WARNING] Для набора {set_number} не найдено данных GPS. Возможно, автомобиль стоял на месте.")
            failed_videos.add(video_file)  # Добавляем видео в список неудачных

        set_number += 1

def process_all_pairs(base_directory):
    video_files = [f for f in os.listdir(base_directory) if f.lower().endswith('.mp4')]
    failed_videos = set()  # Используем set для устранения дублирования

    for video_file in video_files:
        video_name, _ = os.path.splitext(video_file)
        gpx_file = os.path.join(base_directory, f"{video_name}.gpx")
        if os.path.exists(gpx_file):
            print(f"\n{'='*60}")
            print(f"Обрабатываем пару: {video_file} и {gpx_file}")
            print(f"{'='*60}\n")
            process_video(video_file, gpx_file, base_directory, failed_videos)
            
            print(f"\n{'-'*60}")
            print(f"Завершена обработка видео: {video_file}")
            print(f"{'-'*60}\n")
        else:
            print(f"GPX файл не найден для видео {video_file}. Пропускаем.")
    
    # Выводим информацию о видеороликах с ошибками в JSON
    if failed_videos:
        print("\n\nВидеоролики с ошибками в JSON:")
        for video in failed_videos:
            print(f"- {video}")
    else:
        print("\n\nНет видеороликов с ошибками в JSON.")

def main():
    base_directory = os.getcwd()  # Используется текущая директория
    process_all_pairs(base_directory)

if __name__ == "__main__":
    main()

third_script.py

import os
import subprocess

def get_script_directory():
    """Возвращает директорию, где находится сам скрипт."""
    return os.path.dirname(os.path.abspath(__file__))

def list_folders(directory):
    """Возвращает словарь {номер папки: имя папки} для главных папок."""
    try:
        folders = [
            item for item in os.listdir(directory)
            if os.path.isdir(os.path.join(directory, item))
        ]
        folder_dict = {i + 1: folder for i, folder in enumerate(folders)}
        return folder_dict
    except Exception as e:
        print(f"Ошибка при получении списка папок: {e}")
        return {}

def find_sets(folder_path):
    """Возвращает список путей к подкаталогам (set_1, set_2 и т. д.)."""
    try:
        return [
            os.path.join(folder_path, subfolder)
            for subfolder in os.listdir(folder_path)
            if os.path.isdir(os.path.join(folder_path, subfolder))
        ]
    except Exception as e:
        print(f"Ошибка при получении подкаталогов: {e}")
        return []

def process_selected_folders(directory, selected_numbers, folder_dict, user_name):
    """Обрабатывает выбранные папки и выполняет команды."""
    try:
        for number in selected_numbers:
            main_folder = folder_dict[number]
            main_folder_path = os.path.join(directory, main_folder)
            sets = find_sets(main_folder_path)

            for set_path in sets:
                # Поиск JSON-файлов в текущем наборе
                json_files = [
                    file for file in os.listdir(set_path) if file.endswith('.json')
                ]
                if not json_files:
                    print(f"Ошибка: JSON-файлы не найдены в {set_path}. Пропускаю.")
                    continue

                for json_file in json_files:
                    json_path = os.path.join(set_path, json_file)

                    # Выполняем команду загрузки
                    command = [
                        "mapillary_tools",
                        "upload",
                        set_path,
                        "--desc_path",
                        json_path,
                        "--user_name",
                        user_name
                    ]
                    print(f"Выполняю команду: {' '.join(command)}")

                    try:
                        subprocess.run(command, check=True)
                        print(f"Набор {set_path} с файлом {json_file} успешно обработан.")
                    except subprocess.CalledProcessError as e:
                        print(f"Ошибка при выполнении команды для набора {set_path}: {e}")
    except Exception as e:
        print(f"Ошибка при обработке папок: {e}")

if __name__ == "__main__":
    # Получаем директорию, где находится скрипт
    base_directory = get_script_directory()

    authorized = input("Вы авторизированы в Mapillary? (да/нет): ").strip().lower()
    user_name = ""
    if authorized in ("да", "yes"):
        user_name = input("Введите ваш логин Mapillary: ").strip()
        if not user_name:
            print("Ошибка: логин не может быть пустым. Завершение работы.")
            exit(1)
    elif authorized in ("нет", "no"):
        user_name = input("Введите ваш логин Mapillary для авторизации: ").strip()
        if not user_name:
            print("Ошибка: логин не может быть пустым. Завершение работы.")
            exit(1)
        authenticate_user(user_name)
    else:
        print("Ошибка: ответ должен быть 'да' или 'нет'. Завершение работы.")
        exit(1)

    folder_dict = list_folders(base_directory)
    if not folder_dict:
        print("Папки не найдены или произошла ошибка.")
    else:
        print(f"Номера найденных папок: {sorted(folder_dict.keys())}")

        selected_input = input("Введите номера папок для обработки через запятую. Для выбора всех папок напишите 'all': ").strip().lower()
        try:
            if selected_input == "all":
                selected_numbers = list(folder_dict.keys())
            else:
                selected_numbers = list(map(int, selected_input.split(',')))

            process_selected_folders(base_directory, selected_numbers, folder_dict, user_name)
        except ValueError:
            print("Ошибка: введите корректные номера через запятую или напишите 'all'.")

Here is an example of a sequence obtained using this script: Mapillary

I am not negative about everything. I am using critical thinking and voicing reasonable concerns, which both serve well in life and is far more than what any artificial stupidity has currently to offer.

Nobody is trying to this. Maybe this an artifact of artificial stupidty’s inept translation efforts?

Oh, I think I am well enough educated and experienced to know about artificial stupidity and mapillary_tools’ inner workings to make my own judgement and to make a qualified statement on both, so please do not try to outsmart me with it.

What I was referring to was the text file data conversion @boris was talking about. You can’t expect any speedup doing this conversion on a GPU because this is not what GPUs have been designed for. I am not saying that it is impossible to do but it is non‑sense to do. It is like taking a sledgehammer to crack a nut.

Anyhow, currently mapillary_tools does not explicitly use hardware coding with FFmpeg and indeed you could expect a speedup by doing so. However, the point is that it has nothing to do mapillary_tools in the first place because you can configure FFmpeg already to use hardware coding whenever FFmpeg is used. The core issue is that the level and quality of hardware coding support varies widely across platforms. Some platforms work well, others advertise hardware coding but do not work or work mixed at best, and yet others do not work at all. This is why mapillary_tools has to play it safe and take a more conservative approach. Though, it might be a good idea to add an option to make use of hardware coding if available. But, users will need to check the output when using hardware coding.

CUDA has nothing to do with hardware video and JPEG coding in this context.

I am happy it works for you. Can you turn off the figures overlay in the dashcam? Thanks.