Files
vid_convert/vid_convert.py
2026-03-22 23:26:04 +01:00

310 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import subprocess
import logging
from os import listdir, remove
import os
def get_infos(file):
'''
Cette fonction extrait les informations du film à l'aide de ffprobe et les stocke
dans un dictionnaire pour une utilisation ultérieure.
-> http://ffmpeg.org/ffprobe.html
'''
v_infos = {
'index': None,
'height': None,
'width': None,
'color_primaries': None,
'color_space': None,
'color_transfer': None,
'pix_fmt': None,
'display_aspect_ratio': None,
}
a_infos = {}
v_infos_cmd = f'ffprobe -v quiet -print_format json -show_format -show_streams -show_frames -read_intervals "%+#1" -select_streams v {file}'
v_infos_raw = subprocess.getoutput(v_infos_cmd)
a_infos_cmd = f"ffprobe -v quiet -print_format json -show_format -show_streams -select_streams a {file}"
a_infos_raw = subprocess.getoutput(a_infos_cmd)
s_infos_cmd = f"ffprobe -v quiet -print_format json -show_streams -select_streams s {file}"
s_infos_raw = subprocess.getoutput(s_infos_cmd)
full_v_infos = json.loads(v_infos_raw)
full_a_infos = json.loads(a_infos_raw)
full_s_infos = json.loads(s_infos_raw)
v_stream = full_v_infos['streams'][0]
for prop in v_infos.keys():
try:
v_infos.update({prop: v_stream[prop]})
except KeyError:
pass
try:
v_infos.update({'side_data_list': full_v_infos['frames'][0]['side_data_list']})
except KeyError:
pass
a_infos = []
for a_stream in full_a_infos['streams']:
a_stream_infos = {
'index': a_stream['index'],
'channels': a_stream['channels'],
'channel_layout': a_stream['channel_layout'],
'language': a_stream['tags']['language']
}
try:
a_stream_infos.update({'title': a_stream['tags']['title']})
except KeyError:
a_stream_infos.update({'title': 'No title'})
a_infos.append(a_stream_infos)
s_infos = []
for s_stream in full_s_infos['streams']:
s_stream_infos = {
'index': s_stream['index'],
'language': s_stream['tags']['language']}
s_infos.append(s_stream_infos)
duration = subprocess.getoutput(f"ffprobe -v quiet -print_format json -show_format {file}")
duration = json.loads(duration)
duration = float(duration['format']['duration'])
hdr10_v_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy -vbsf hevc_mp4toannexb -f hevc - | hdr10plus_parser -o metadata.json --verify -'
hdr10_v_raw = subprocess.getoutput(hdr10_v_cmd)
if 'metadata detected' in hdr10_v_raw:
hdr10_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy -vbsf hevc_mp4toannexb -f hevc - | hdr10plus_parser -o /tmp/{file}_hdr10_metadata.json -'
hdr10_cmd_res = subprocess.getoutput(hdr10_cmd)
logging.debug(hdr10_cmd_res)
v_infos.update({'hdr10plus': True, 'hdr10plus_metadata': f'/tmp/{file}_hdr10_metadata.json'})
infos = {'duration': duration, 'video': v_infos, 'audio': a_infos, 'subtitles': s_infos}
logging.debug("Informations du film : \n" + json.dumps(infos, indent=True))
return infos
def cropping(file, infos):
'''
Cette fonction detecte les bandes inutiles de la vidéo
'''
logging.info("Détection de la taille de l'image...")
duration_tier = int(infos['duration'] / 3)
command = f"ffmpeg -loglevel info -i {file} -ss {duration_tier} -t {int(duration_tier / 10)} -an -f null -vf cropdetect -y /dev/null"
logging.debug(command)
line = -3
cropsize = "times"
while cropsize == "times" or ":" not in cropsize:
cropsize = subprocess.getoutput(command).splitlines()[line].split()[-1]
line -= 1
logging.debug(f"Paramètre de découpe : {cropsize}")
return cropsize
def volume_audio(file, infos):
'''
Cette fonction ajuste le volume vers 0dB
'''
volumes = {}
for piste_audio in infos['audio']:
piste = piste_audio['index']
command = f"ffmpeg -loglevel info -i {file} -map 0:{piste} -af volumedetect -f null -y /dev/null"
logging.info(f"Détection du volume de la piste {piste}...")
logging.debug(command)
volumedetect = subprocess.getoutput(command)
for line in volumedetect.splitlines():
if "max_volume" in line:
volume = line.split()[-2]
volume = f"{str(-float(volume))}dB"
logging.debug(f"Ajustement du volume de la piste {piste} : {volume}")
volumes.update({piste: volume})
return volumes
VMAF_HDR_MODEL = '/usr/share/vmaf/model/vmaf_4k_v0.6.1.json'
def find_crf(file, enc_options, hdr=False):
'''
Détermine le CRF optimal via ab-av1 crf-search pour atteindre VMAF 96.
Pour les sources HDR, utilise un modèle VMAF calibré 4K HDR si disponible.
'''
logging.info("Recherche du CRF optimal (VMAF 96)...")
vmaf_model = ''
if hdr and os.path.exists(VMAF_HDR_MODEL):
vmaf_model = f'--vmaf-model path={VMAF_HDR_MODEL}'
logging.debug(f"Modèle VMAF HDR utilisé : {VMAF_HDR_MODEL}")
cmd = f'ab-av1 crf-search --input {file} --encoder libsvtav1 --vmaf 96 {vmaf_model} --enc {enc_options}'
logging.debug(cmd)
result = subprocess.getoutput(cmd)
logging.debug(result)
for line in result.splitlines():
# ab-av1 émet une ligne du type : "crf 32 VMAF 96.21 ..."
if 'crf' in line and 'VMAF' in line:
try:
crf = int(line.split('crf')[1].split()[0])
logging.info(f"CRF optimal trouvé : {crf}")
return crf
except (IndexError, ValueError):
pass
logging.warning("ab-av1 crf-search a échoué, utilisation du CRF par défaut (32)")
return 32
def stabilization(file):
'''
Cette fonction permet de stabiliser l'image,
par exemple quand filmé au smartphone.
'''
cmd_stab = f'ffmpeg -i {file} -vf vidstabdetect=shakiness=10:accuracy=10:result="/tmp/vidstab.trf" -f null - '
subprocess.getoutput(cmd_stab)
def extract_subs(file, track, lang):
command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -an -c:s copy -metadata language="{lang}" -y {file}_subtitle_{track}_{lang}.mkv'
logging.info(f"Extraction du sous-titre {track}, langue : {lang}...")
logging.debug(command)
result = subprocess.getoutput(command)
if result != "":
logging.error(result)
def convert_audio(file, track, volume_adj, channels, channel_layout, language, title):
if channel_layout == "5.1(side)":
channel_layout = "5.1"
if channels <= 2:
bitrate = "128k"
elif channels == 6:
bitrate = "320k"
else:
bitrate = "450k"
codec = f'libopus -vbr on -b:a {bitrate}'
metadatas = f'-metadata language="{language}" -metadata title="{title}"'
command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -sn -c:a {codec} -mapping_family 1 -filter:a volume={volume_adj},aformat=channel_layouts={channel_layout} {metadatas} -y {file}_audio_{track}_{language}.mka'
logging.debug(command)
result = subprocess.getoutput(command)
if result != "":
logging.error(result)
def convert_video(file, infos, start, crop, crf, animation, interlaced, vhs):
str_start = "{:05d}".format(start)
output = f'{file}_video_t{str_start}.mkv'
fmt = "yuv420p10le"
track = infos['video']['index']
codec = 'libx265 -preset slower'
hdr = ''
if animation:
tune = "-tune animation"
else:
tune = ""
if interlaced:
crop = f"{crop},yadif"
if vhs:
crop = f"{crop},hqdn3d,unsharp=5:5:0.8:3:3:0.4"
if 'side_data_list' in infos['video'].keys():
try:
light_level = f"{infos['video']['side_data_list'][1]['max_content']},{infos['video']['side_data_list'][1]['max_average']}"
color_primaries = infos['video']['color_primaries']
color_transfer = infos['video']['color_transfer']
color_space = infos['video']['color_space']
green_x = infos['video']['side_data_list'][0]['green_x'].split('/')
green_x = int(int(green_x[0])*(int(green_x[1])/50000))
green_y = infos['video']['side_data_list'][0]['green_y'].split('/')
green_y = int(int(green_y[0])*(int(green_y[1])/50000))
green = f'G\({green_x},{green_y}\)'
blue_x = infos['video']['side_data_list'][0]['blue_x'].split('/')
blue_x = int(int(blue_x[0])*(int(blue_x[1])/50000))
blue_y = infos['video']['side_data_list'][0]['blue_y'].split('/')
blue_y = int(int(blue_y[0])*(int(blue_y[1])/50000))
blue = f'B\({blue_x},{blue_y}\)'
red_x = infos['video']['side_data_list'][0]['red_x'].split('/')
red_x = int(int(red_x[0])*(int(red_x[1])/50000))
red_y = infos['video']['side_data_list'][0]['red_y'].split('/')
red_y = int(int(red_y[0])*(int(red_y[1])/50000))
red = f'R\({red_x},{red_y}\)'
white_point_x = infos['video']['side_data_list'][0]['white_point_x'].split('/')
white_point_x = int(int(white_point_x[0])*(int(white_point_x[1])/50000))
white_point_y = infos['video']['side_data_list'][0]['white_point_y'].split('/')
white_point_y = int(int(white_point_y[0])*(int(white_point_y[1])/50000))
white_point = f'WP\({white_point_x},{white_point_y}\)'
min_luminance = infos['video']['side_data_list'][0]['min_luminance'].split('/')
min_luminance = int(int(min_luminance[0])*(int(min_luminance[1])/10000))
max_luminance = infos['video']['side_data_list'][0]['max_luminance'].split('/')
max_luminance = int(int(max_luminance[0])*(int(max_luminance[1])/10000))
luminance = f'L\({max_luminance},{min_luminance}\)'
master_display = green + blue + red + white_point + luminance
hdr = f'-x265-params hdr-opt=1:repeat-headers=1:colorprim={color_primaries}:transfer={color_transfer}:colormatrix={color_space}:master-display={master_display}:max-cll={light_level}'
except Exception as err:
logging.debug(f"Aucune information HDR : {err}")
command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -ss {start} -t 300 -an -sn -c:v {codec} {tune} {hdr} -crf {crf} -pix_fmt {fmt} -filter:v {crop} -y {output}'
logging.debug(command)
result = subprocess.getoutput(command)
if result != "":
logging.error(result)
def create_mkv(filename):
json_data = []
json_data.append("--output")
json_data.append(f"{filename}_FINAL.mkv")
for file in listdir():
if f"{filename}_video_t" in file:
json_data.append("--no-track-tags")
json_data.append("--no-global-tags")
json_data.append("--no-chapters")
if "t00000" not in file:
json_data.append("+")
json_data.append("(")
json_data.append(file)
json_data.append(")")
for file in listdir():
if f"{filename}_audio" in file:
lang = file[-7:][:-4]
json_data.append("--no-track-tags")
json_data.append("--no-global-tags")
json_data.append("--no-chapters")
json_data.append("--language")
json_data.append(f"0:{lang}")
json_data.append("(")
json_data.append(file)
json_data.append(")")
for file in listdir():
if f"{filename}_subtitle" in file:
json_data.append("--no-track-tags")
json_data.append("--no-global-tags")
json_data.append("--no-chapters")
lang = file[-7:][:-4]
json_data.append("--language")
json_data.append(f"0:{lang}")
json_data.append("(")
json_data.append(file)
json_data.append(")")
with open(f"/tmp/{filename}.json", "w") as mkvmerge_options:
mkvmerge_options.write(json.dumps(json_data))
command = f"mkvmerge -v @/tmp/{filename}.json"
logging.debug(command)
result = subprocess.getoutput(command)
logging.info(result)
remove(f"/tmp/{filename}.json")
for file in listdir():
if f"{filename}_video" in file:
remove(file)
if f"{filename}_audio" in file:
remove(file)
if f"{filename}_subtitle" in file:
remove(file)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("f_input")
parser.add_argument("-d", "--debug", dest="debug", action="store_true")
parser.add_argument("-s", "--stabilise", dest="stab", action="store_true")
parser.add_argument("-a", "--animation", dest="animation", action="store_true")
parser.add_argument("-c", "--vhs", dest="vhs", action="store_true")
parser.add_argument("--interlaced", dest="interlaced", action="store_true")
args = parser.parse_args()
if args.debug:
logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.DEBUG, datefmt='%d/%m/%Y %H:%M:%S')
else:
logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.INFO, datefmt='%d/%m/%Y %H:%M:%S')
pass