second_script.py
import os
import gpxpy
import json
import subprocess
import shutil
def extract_frames_with_ffmpeg(video_path, output_folder, sample_interval=0.1):
command = [
"ffmpeg", "-hide_banner", "-hwaccel", "cuda", "-hwaccel_output_format", "cuda",
"-i", video_path, "-vf", f"fps=1/{sample_interval},hwdownload,format=nv12",
"-qscale:v", "1", os.path.join(output_folder, "frame_%03d.jpg")
]
subprocess.run(command, check=True)
print(f"\n{'-'*60}")
print(f"Кадры успешно извлечены для {video_path}.")
print(f"{'-'*60}\n")
def process_gpx(gpx_path):
with open(gpx_path, 'r') as gpx_file:
gpx = gpxpy.parse(gpx_file)
geo_data = [
{'time': point.time, 'lat': point.latitude, 'lon': point.longitude}
for track in gpx.tracks for segment in track.segments for point in segment.points
]
return geo_data
def detect_stationary_motion(geo_data, movement_threshold=0.00001):
""" Проверяет, есть ли заметные изменения в координатах. """
stationary = True
last_lat, last_lon = geo_data[0]['lat'], geo_data[0]['lon']
for point in geo_data[1:]:
lat, lon = point['lat'], point['lon']
if abs(lat - last_lat) > movement_threshold or abs(lon - last_lon) > movement_threshold:
stationary = False
break
last_lat, last_lon = lat, lon
return stationary
def generate_json_data(geo_data, set_folder, start_idx=0, device_make="70mai", device_model="A810"):
json_data = []
frame_files = [f for f in os.listdir(set_folder) if f.endswith('.jpg')]
frame_files.sort() # Убедимся, что они идут в правильном порядке
for idx, geo_point in enumerate(geo_data[start_idx:start_idx + 500]):
frame_name = f"frame_{start_idx + idx + 1:03d}.jpg"
frame_path = os.path.join(set_folder, frame_name)
if os.path.exists(frame_path):
capture_time = geo_point['time'].strftime("%Y_%m_%d_%H_%M_%S_%f")[:-3]
json_data.append({
"MAPDeviceMake": device_make,
"MAPDeviceModel": device_model,
"MAPLatitude": geo_point['lat'],
"MAPLongitude": geo_point['lon'],
"MAPCaptureTime": capture_time,
"filename": frame_path.replace("\\", "/"),
"filetype": "image"
})
# Убираем вывод сообщения, если кадр не найден
return json_data
def save_json(json_data, set_folder, video_name, set_number):
json_filename = os.path.join(set_folder, f"{video_name}_geotags_{set_number}.json")
with open(json_filename, 'w') as json_file:
json.dump(json_data, json_file, indent=4)
print(f"JSON для набора {set_number} сохранен: {json_filename}")
def process_video(video_file, gpx_file, base_directory, failed_videos):
video_name, _ = os.path.splitext(video_file)
output_folder = os.path.join(base_directory, video_name)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
video_path = os.path.join(base_directory, video_file)
extract_frames_with_ffmpeg(video_path, output_folder)
geo_data = process_gpx(gpx_file)
# Разделяем изображения на группы по 500
total_frames = len([f for f in os.listdir(output_folder) if f.endswith('.jpg')])
set_number = 1
for start_idx in range(0, total_frames, 500):
print(f"\nОбрабатываем набор {set_number}...")
# Создаем папку для набора set_X
set_folder = os.path.join(output_folder, f"set_{set_number}")
os.makedirs(set_folder, exist_ok=True)
# Перемещаем кадры в папку set_X
for frame_idx in range(start_idx, min(start_idx + 500, total_frames)):
frame_name = f"frame_{frame_idx + 1:03d}.jpg"
frame_path = os.path.join(output_folder, frame_name)
if os.path.exists(frame_path):
shutil.move(frame_path, os.path.join(set_folder, frame_name))
# Генерируем JSON для текущего набора
json_data = generate_json_data(geo_data, set_folder, start_idx=start_idx)
if json_data: # Только если есть данные
save_json(json_data, set_folder, video_name, set_number)
else:
print(f"[WARNING] Для набора {set_number} не найдено данных GPS. Возможно, автомобиль стоял на месте.")
failed_videos.add(video_file) # Добавляем видео в список неудачных
set_number += 1
def process_all_pairs(base_directory):
video_files = [f for f in os.listdir(base_directory) if f.lower().endswith('.mp4')]
failed_videos = set() # Используем set для устранения дублирования
for video_file in video_files:
video_name, _ = os.path.splitext(video_file)
gpx_file = os.path.join(base_directory, f"{video_name}.gpx")
if os.path.exists(gpx_file):
print(f"\n{'='*60}")
print(f"Обрабатываем пару: {video_file} и {gpx_file}")
print(f"{'='*60}\n")
process_video(video_file, gpx_file, base_directory, failed_videos)
print(f"\n{'-'*60}")
print(f"Завершена обработка видео: {video_file}")
print(f"{'-'*60}\n")
else:
print(f"GPX файл не найден для видео {video_file}. Пропускаем.")
# Выводим информацию о видеороликах с ошибками в JSON
if failed_videos:
print("\n\nВидеоролики с ошибками в JSON:")
for video in failed_videos:
print(f"- {video}")
else:
print("\n\nНет видеороликов с ошибками в JSON.")
def main():
base_directory = os.getcwd() # Используется текущая директория
process_all_pairs(base_directory)
if __name__ == "__main__":
main()