#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import subprocess import logging from os import listdir, remove import os def get_infos(file): ''' Cette fonction extrait les informations du film à l'aide de ffprobe et les stocke dans un dictionnaire pour une utilisation ultérieure. -> http://ffmpeg.org/ffprobe.html ''' v_infos = { 'index': None, 'height': None, 'width': None, 'color_primaries': None, 'color_space': None, 'color_transfer': None, 'pix_fmt': None, 'display_aspect_ratio': None, } a_infos = {} v_infos_cmd = f'ffprobe -v quiet -print_format json -show_format -show_streams -show_frames -read_intervals "%+#1" -select_streams v {file}' v_infos_raw = subprocess.getoutput(v_infos_cmd) a_infos_cmd = f"ffprobe -v quiet -print_format json -show_format -show_streams -select_streams a {file}" a_infos_raw = subprocess.getoutput(a_infos_cmd) s_infos_cmd = f"ffprobe -v quiet -print_format json -show_streams -select_streams s {file}" s_infos_raw = subprocess.getoutput(s_infos_cmd) full_v_infos = json.loads(v_infos_raw) full_a_infos = json.loads(a_infos_raw) full_s_infos = json.loads(s_infos_raw) v_stream = full_v_infos['streams'][0] for prop in v_infos.keys(): try: v_infos.update({prop: v_stream[prop]}) except KeyError: pass try: v_infos.update({'side_data_list': full_v_infos['frames'][0]['side_data_list']}) except KeyError: pass a_infos = [] for a_stream in full_a_infos['streams']: a_stream_infos = { 'index': a_stream['index'], 'channels': a_stream['channels'], 'channel_layout': a_stream['channel_layout'], 'language': a_stream['tags']['language'] } try: a_stream_infos.update({'title': a_stream['tags']['title']}) except KeyError: a_stream_infos.update({'title': 'No title'}) a_infos.append(a_stream_infos) s_infos = [] for s_stream in full_s_infos['streams']: s_stream_infos = { 'index': s_stream['index'], 'language': s_stream['tags']['language']} s_infos.append(s_stream_infos) format_name = full_v_infos.get('format', {}).get('format_name', '') duration = float(full_v_infos['format']['duration']) is_mp4_container = 'mp4' in format_name or 'mov' in format_name bsf = '-bsf:v hevc_mp4toannexb' if is_mp4_container else '' hdr10_v_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy {bsf} -f hevc - | hdr10plus_parser --verify -' hdr10_v_raw = subprocess.getoutput(hdr10_v_cmd) if 'metadata detected' in hdr10_v_raw: hdr10_meta = f'/tmp/{os.path.basename(file)}_hdr10_metadata.json' hdr10_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy {bsf} -f hevc - | hdr10plus_parser -o {hdr10_meta} -' hdr10_cmd_res = subprocess.getoutput(hdr10_cmd) logging.debug(hdr10_cmd_res) v_infos.update({'hdr10plus': True, 'hdr10plus_metadata': hdr10_meta}) try: for side_data in full_v_infos['frames'][0].get('side_data_list', []): if side_data.get('side_data_type') == 'DOVI configuration record': dv_profile = side_data.get('dv_profile', '?') logging.warning( f"Dolby Vision détecté (Profile {dv_profile}). " f"Encodage sans couche DV — HDR10 utilisé si disponible." ) v_infos.update({'dolby_vision_profile': dv_profile}) break except (KeyError, IndexError): pass infos = {'duration': duration, 'video': v_infos, 'audio': a_infos, 'subtitles': s_infos} logging.debug("Informations du film : \n" + json.dumps(infos, indent=True)) return infos def cropping(file, infos): ''' Cette fonction detecte les bandes inutiles de la vidéo ''' logging.info("Détection de la taille de l'image...") duration_tier = int(infos['duration'] / 3) command = f"ffmpeg -loglevel info -i {file} -ss {duration_tier} -t {int(duration_tier / 10)} -an -f null -vf cropdetect -y /dev/null" logging.debug(command) line = -3 cropsize = "times" while cropsize == "times" or ":" not in cropsize: cropsize = subprocess.getoutput(command).splitlines()[line].split()[-1] line -= 1 logging.debug(f"Paramètre de découpe : {cropsize}") return cropsize def volume_audio(file, infos): ''' Cette fonction ajuste le volume vers 0dB ''' volumes = {} for piste_audio in infos['audio']: piste = piste_audio['index'] command = f"ffmpeg -loglevel info -i {file} -map 0:{piste} -af volumedetect -f null -y /dev/null" logging.info(f"Détection du volume de la piste {piste}...") logging.debug(command) volumedetect = subprocess.getoutput(command) for line in volumedetect.splitlines(): if "max_volume" in line: volume = line.split()[-2] volume = f"{str(-float(volume))}dB" logging.debug(f"Ajustement du volume de la piste {piste} : {volume}") volumes.update({piste: volume}) return volumes VMAF_HDR_MODEL = '/usr/share/vmaf/model/vmaf_4k_v0.6.1.json' def find_crf(file, enc_options, hdr=False): ''' Détermine le CRF optimal via ab-av1 crf-search pour atteindre VMAF 96. Pour les sources HDR, utilise un modèle VMAF calibré 4K HDR si disponible. ''' logging.info("Recherche du CRF optimal (VMAF 96)...") vmaf_model = '' if hdr and os.path.exists(VMAF_HDR_MODEL): vmaf_model = f'--vmaf-model path={VMAF_HDR_MODEL}' logging.debug(f"Modèle VMAF HDR utilisé : {VMAF_HDR_MODEL}") cmd = f'ab-av1 crf-search --input {file} --encoder libsvtav1 --vmaf 96 {vmaf_model} --enc {enc_options}' logging.debug(cmd) result = subprocess.getoutput(cmd) logging.debug(result) found_crf = None for line in result.splitlines(): # ab-av1 émet des lignes intermédiaires puis une ligne finale "crf 32 VMAF 96.21 ..." if 'crf' in line and 'VMAF' in line: try: found_crf = int(line.split('crf')[1].split()[0]) except (IndexError, ValueError): pass if found_crf is not None: logging.info(f"CRF optimal trouvé : {found_crf}") return found_crf logging.warning("ab-av1 crf-search a échoué, utilisation du CRF par défaut (32)") return 32 def stabilization(file): ''' Cette fonction permet de stabiliser l'image, par exemple quand filmé au smartphone. ''' cmd_stab = f'ffmpeg -i {file} -vf vidstabdetect=shakiness=10:accuracy=10:result="/tmp/vidstab.trf" -f null - ' subprocess.getoutput(cmd_stab) def extract_subs(file, track, lang): command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -an -c:s copy -metadata language="{lang}" -y {file}_subtitle_{track}_{lang}.mkv' logging.info(f"Extraction du sous-titre {track}, langue : {lang}...") logging.debug(command) result = subprocess.getoutput(command) if result != "": logging.error(result) def convert_audio(file, track, volume_adj, channels, channel_layout, language, title): if channel_layout == "5.1(side)": channel_layout = "5.1" if channels <= 2: bitrate = "128k" elif channels == 6: bitrate = "320k" else: bitrate = "450k" codec = f'libopus -vbr on -b:a {bitrate}' metadatas = f'-metadata language="{language}" -metadata title="{title}"' command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -sn -c:a {codec} -mapping_family 1 -filter:a volume={volume_adj},aformat=channel_layouts={channel_layout} {metadatas} -y {file}_audio_{track}_{language}.mka' logging.debug(command) result = subprocess.getoutput(command) if result != "": logging.error(result) def convert_video(file, infos, crf, crop, enc_options, interlaced, vhs): output = f'{file}_video.mkv' fmt = "yuv420p10le" track = infos['video']['index'] codec = 'libsvtav1' svtav1_params = enc_options if interlaced: crop = f"{crop},yadif" if vhs: crop = f"{crop},hqdn3d,unsharp=5:5:0.8:3:3:0.4" color_args = '' if 'side_data_list' in infos['video'].keys(): try: light_level = f"{infos['video']['side_data_list'][1]['max_content']},{infos['video']['side_data_list'][1]['max_average']}" color_primaries = infos['video']['color_primaries'] color_transfer = infos['video']['color_transfer'] color_space = infos['video']['color_space'] if all([color_primaries, color_transfer, color_space]): color_args = f'-color_primaries {color_primaries} -color_trc {color_transfer} -colorspace {color_space}' green_x = infos['video']['side_data_list'][0]['green_x'].split('/') green_x = int(int(green_x[0])*(int(green_x[1])/50000)) green_y = infos['video']['side_data_list'][0]['green_y'].split('/') green_y = int(int(green_y[0])*(int(green_y[1])/50000)) green = f'G({green_x},{green_y})' blue_x = infos['video']['side_data_list'][0]['blue_x'].split('/') blue_x = int(int(blue_x[0])*(int(blue_x[1])/50000)) blue_y = infos['video']['side_data_list'][0]['blue_y'].split('/') blue_y = int(int(blue_y[0])*(int(blue_y[1])/50000)) blue = f'B({blue_x},{blue_y})' red_x = infos['video']['side_data_list'][0]['red_x'].split('/') red_x = int(int(red_x[0])*(int(red_x[1])/50000)) red_y = infos['video']['side_data_list'][0]['red_y'].split('/') red_y = int(int(red_y[0])*(int(red_y[1])/50000)) red = f'R({red_x},{red_y})' white_point_x = infos['video']['side_data_list'][0]['white_point_x'].split('/') white_point_x = int(int(white_point_x[0])*(int(white_point_x[1])/50000)) white_point_y = infos['video']['side_data_list'][0]['white_point_y'].split('/') white_point_y = int(int(white_point_y[0])*(int(white_point_y[1])/50000)) white_point = f'WP({white_point_x},{white_point_y})' min_luminance = infos['video']['side_data_list'][0]['min_luminance'].split('/') min_luminance = int(int(min_luminance[0])*(int(min_luminance[1])/10000)) max_luminance = infos['video']['side_data_list'][0]['max_luminance'].split('/') max_luminance = int(int(max_luminance[0])*(int(max_luminance[1])/10000)) luminance = f'L({max_luminance},{min_luminance})' master_display = green + blue + red + white_point + luminance hdr = f'mastering-display={master_display}:content-light={light_level}' if svtav1_params: svtav1_params = f'{svtav1_params}:{hdr}' else: svtav1_params = hdr except Exception as err: logging.debug(f"Aucune information HDR statique : {err}") if 'hdr10plus' in infos['video']: hdr10plus_param = f"hdr10plus-json={infos['video']['hdr10plus_metadata']}" if svtav1_params: svtav1_params = f'{svtav1_params}:{hdr10plus_param}' else: svtav1_params = hdr10plus_param svtav1_args = f'-svtav1-params {svtav1_params}' if svtav1_params else '' command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -an -sn -c:v {codec} {svtav1_args} -crf {crf} -pix_fmt {fmt} {color_args} -filter:v {crop} -y {output}' logging.info("Encodage vidéo en cours (AV1-SVT)...") logging.debug(command) result = subprocess.getoutput(command) if result != "": logging.error(result) def create_mkv(filename): json_data = [] json_data.append("--output") json_data.append(f"NEW_{filename}.mkv") video_file = f"{filename}_video.mkv" json_data += ["--no-track-tags", "--no-global-tags", "--no-chapters", "(", video_file, ")"] for file in listdir(): if f"{filename}_audio" in file: lang = file[-7:][:-4] json_data += ["--no-track-tags", "--no-global-tags", "--no-chapters", "--language", f"0:{lang}", "(", file, ")"] for file in listdir(): if f"{filename}_subtitle" in file: lang = file[-7:][:-4] json_data += ["--no-track-tags", "--no-global-tags", "--no-chapters", "--language", f"0:{lang}", "(", file, ")"] json_path = f"/tmp/{os.path.basename(filename)}.json" with open(json_path, "w") as mkvmerge_options: mkvmerge_options.write(json.dumps(json_data)) command = f"mkvmerge -v @{json_path}" logging.debug(command) result = subprocess.getoutput(command) logging.info(result) remove(json_path) for file in listdir(): if file == video_file: remove(file) if f"{filename}_audio" in file: remove(file) if f"{filename}_subtitle" in file: remove(file) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("f_input") parser.add_argument("-d", "--debug", dest="debug", action="store_true") parser.add_argument("-s", "--stabilise", dest="stab", action="store_true") parser.add_argument("-a", "--animation", dest="animation", action="store_true") parser.add_argument("-c", "--vhs", dest="vhs", action="store_true") parser.add_argument("--interlaced", dest="interlaced", action="store_true") args = parser.parse_args() if args.debug: logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.DEBUG, datefmt='%d/%m/%Y %H:%M:%S') else: logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.INFO, datefmt='%d/%m/%Y %H:%M:%S') file = args.f_input infos = get_infos(file) if args.stab: stabilization(file) cropsize = cropping(file, infos) volumes = volume_audio(file, infos) base = 'preset=3:enable-qm=1:qm-min=0:scd=1' if args.animation: enc_options = f'{base}:tune=2:film-grain=4' elif args.vhs: enc_options = f'{base}:tune=0:irefresh-type=1:enable-tf=0' else: enc_options = f'{base}:tune=0:film-grain=8' is_hdr = 'side_data_list' in infos['video'] or 'hdr10plus' in infos['video'] crf = find_crf(file, enc_options, hdr=is_hdr) for track in infos['subtitles']: extract_subs(file, track['index'], track['language']) for track in infos['audio']: convert_audio(file, track['index'], volumes[track['index']], track['channels'], track['channel_layout'], track['language'], track['title']) convert_video(file, infos, crf, cropsize, enc_options, args.interlaced, args.vhs) create_mkv(file)