#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import subprocess import logging from os import listdir, remove def get_infos(file): ''' Cette fonction extrait les informations du film à l'aide de ffprobe et les stocke dans un dictionnaire pour une utilisation ultérieure. -> http://ffmpeg.org/ffprobe.html ''' v_infos = { 'index': None, 'height': None, 'width': None, 'color_primaries': None, 'color_space': None, 'color_transfer': None, 'pix_fmt': None, 'display_aspect_ratio': None, } a_infos = {} v_infos_cmd = f'ffprobe -v quiet -print_format json -show_format -show_streams -show_frames -read_intervals "%+#1" -select_streams v {file}' v_infos_raw = subprocess.getoutput(v_infos_cmd) a_infos_cmd = f"ffprobe -v quiet -print_format json -show_format -show_streams -select_streams a {file}" a_infos_raw = subprocess.getoutput(a_infos_cmd) s_infos_cmd = f"ffprobe -v quiet -print_format json -show_streams -select_streams s {file}" s_infos_raw = subprocess.getoutput(s_infos_cmd) full_v_infos = json.loads(v_infos_raw) full_a_infos = json.loads(a_infos_raw) full_s_infos = json.loads(s_infos_raw) v_stream = full_v_infos['streams'][0] for prop in v_infos.keys(): try: v_infos.update({prop: v_stream[prop]}) except KeyError: pass try: v_infos.update({'side_data_list': full_v_infos['frames'][0]['side_data_list']}) except KeyError: pass a_infos = [] for a_stream in full_a_infos['streams']: a_stream_infos = { 'index': a_stream['index'], 'channels': a_stream['channels'], 'channel_layout': a_stream['channel_layout'], 'language': a_stream['tags']['language'], 'title': a_stream['tags']['title']} a_infos.append(a_stream_infos) s_infos = [] for s_stream in full_s_infos['streams']: s_stream_infos = { 'index': s_stream['index'], 'language': s_stream['tags']['language']} s_infos.append(s_stream_infos) duration = subprocess.getoutput(f"ffprobe -v quiet -print_format json -show_format {file}") duration = json.loads(duration) duration = float(duration['format']['duration']) hdr10_v_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy -vbsf hevc_mp4toannexb -f hevc - | hdr10plus_parser -o metadata.json --verify -' hdr10_v_raw = subprocess.getoutput(hdr10_v_cmd) # logging.debug(hdr10_v_raw) if 'metadata detected' in hdr10_v_raw: hdr10_cmd = f'ffmpeg -loglevel panic -i {file} -c:v copy -vbsf hevc_mp4toannexb -f hevc - | hdr10plus_parser -o /tmp/{file}_hdr10_metadata.json -' v_infos.update({'hdr10': True, 'hdr10_metdata': f'/tmp/{file}_hdr10_metadata.json'}) infos = {'duration': duration, 'video': v_infos, 'audio': a_infos, 'subtitles': s_infos} logging.debug("Informations du film : \n" + json.dumps(infos, indent=True)) return infos def is_interlaced(file, infos): ''' Cette fonction detecte si la vidéo est entrelacée. -> https://fr.wikipedia.org/wiki/Entrelacement_(vid%C3%A9o) ''' duration_tier = int(infos['duration'] / 3) command = f"ffmpeg -loglevel info -ss {duration_tier} -t {duration_tier} -i {file} -an -filter:v idet -f null -y /dev/null" result = subprocess.getoutput(command) for line in result.splitlines(): if "Multi" in line: TFF = int(line.split('TFF:')[1].split()[0]) BFF = int(line.split('BFF:')[1].split()[0]) Progressive = int(line.split('Progressive:')[1].split()[0]) try: pct = ((TFF + BFF) / (TFF + BFF + Progressive)) * 100 pct = round(pct) except ZeroDivisionError: pct = 100 if pct > 10: logging.debug("Vidéo entrelacée à {pct}%") return True else: logging.debug("Vidéo non entrelacée") return False def cropping(file, infos): ''' Cette fonction detecte les bandes inutiles de la vidéo ''' logging.info("Détection de la taille de l'image...") duration_tier = int(infos['duration'] / 3) command = f"ffmpeg -loglevel info -i {file} -ss {duration_tier} -t {duration_tier} -an -f null -vf cropdetect -y /dev/null" logging.debug(command) cropsize = subprocess.getoutput(command).splitlines()[-3].split()[-1] logging.debug(f"Paramètre de découpe : {cropsize}") return cropsize def volume_audio(file, infos): ''' Cette fonction ajuste le volume vers 0dB ''' volumes = {} for piste_audio in infos['audio']: piste = piste_audio['index'] command = f"ffmpeg -loglevel info -i {file} -map 0:{piste} -af volumedetect -f null -y /dev/null" logging.info(f"Détection du volume de la piste {piste}...") logging.debug(command) volumedetect = subprocess.getoutput(command) for line in volumedetect.splitlines(): if "max_volume" in line: volume = line.split()[-2] volume = f"{str(-float(volume))}dB" logging.debug(f"Ajustement du volume de la piste {piste} : {volume}") volumes.update({piste: volume}) return volumes def stabilization(file): ''' Cette fonction permet de stabiliser l'image, par exemple quand filmé au smartphone. ''' cmd_stab = f'ffmpeg -i {file} -vf vidstabdetect=shakiness=10:accuracy=10:result="/tmp/vidstab.trf" -f null - ' subprocess.getoutput(cmd_stab) def extract_subs(file, track, lang): command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -an -c:s copy -metadata language="{lang}" -y {file}_subtitle_{track}_{lang}.mkv' logging.info(f"Extraction du sous-titre {track}, langue : {lang}...") logging.debug(command) result = subprocess.getoutput(command) if result != "": logging.info(result) def convert_audio(file, track, volume_adj, channels, channel_layout, language, title): bitrate = f'{64*channels}k' if channel_layout == "5.1(side)": channel_layout = "5.1" codec = 'libopus' metadatas = f'-metadata language="{language}" -metadata title="{title}"' command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -map_metadata -1 -vn -sn -c:a {codec} -b:a {bitrate} -mapping_family 1 -filter:a volume={volume_adj},aformat=channel_layouts={channel_layout} {metadatas} -y {file}_audio_{track}_{language}.mka' logging.debug(command) result = subprocess.getoutput(command) logging.info(result) def convert_video(file, infos, start, crop, crf): str_start = "{:05d}".format(start) output = f'{file}_video_t{str_start}.mkv' fmt = infos['video']['pix_fmt'] if fmt == "yuv420p": fmt = "yuv420p10le" track = infos['video']['index'] codec = 'libx265 -preset slow' hdr = '' if 'side_data_list' in infos['video'].keys(): try: light_level = f"{infos['video']['side_data_list'][1]['max_content']},{infos['video']['side_data_list'][1]['max_average']}" color_primaries = infos['video']['color_primaries'] color_transfer = infos['video']['color_transfer'] color_space = infos['video']['color_space'] green_x = infos['video']['side_data_list'][0]['green_x'].split('/') green_x = int(int(green_x[0])*(int(green_x[1])/50000)) green_y = infos['video']['side_data_list'][0]['green_y'].split('/') green_y = int(int(green_y[0])*(int(green_y[1])/50000)) green = f'G\({green_x},{green_y}\)' blue_x = infos['video']['side_data_list'][0]['blue_x'].split('/') blue_x = int(int(blue_x[0])*(int(blue_x[1])/50000)) blue_y = infos['video']['side_data_list'][0]['blue_y'].split('/') blue_y = int(int(blue_y[0])*(int(blue_y[1])/50000)) blue = f'B\({blue_x},{blue_y}\)' red_x = infos['video']['side_data_list'][0]['red_x'].split('/') red_x = int(int(red_x[0])*(int(red_x[1])/50000)) red_y = infos['video']['side_data_list'][0]['red_y'].split('/') red_y = int(int(red_y[0])*(int(red_y[1])/50000)) red = f'R\({red_x},{red_y}\)' white_point_x = infos['video']['side_data_list'][0]['white_point_x'].split('/') white_point_x = int(int(white_point_x[0])*(int(white_point_x[1])/50000)) white_point_y = infos['video']['side_data_list'][0]['white_point_y'].split('/') white_point_y = int(int(white_point_y[0])*(int(white_point_y[1])/50000)) white_point = f'WP\({white_point_x},{white_point_y}\)' min_luminance = infos['video']['side_data_list'][0]['min_luminance'].split('/') min_luminance = int(int(min_luminance[0])*(int(min_luminance[1])/10000)) max_luminance = infos['video']['side_data_list'][0]['max_luminance'].split('/') max_luminance = int(int(max_luminance[0])*(int(max_luminance[1])/10000)) luminance = f'L\({max_luminance},{min_luminance}\)' master_display = green + blue + red + white_point + luminance hdr = f'-x265-params hdr-opt=1:repeat-headers=1:colorprim={color_primaries}:transfer={color_transfer}:colormatrix={color_space}:master-display={master_display}:max-cll={light_level}' except: logging.debug("Aucune information HDR") command = f'ffmpeg -loglevel error -i {file} -map 0:{track} -ss {start} -t 300 -an -sn -c:v {codec} {hdr} -crf {crf} -pix_fmt {fmt} -filter:v {crop} -y {output}' logging.debug(command) result = subprocess.getoutput(command) logging.info(result) def create_mkv(filename): json_data = [] json_data.append("--output") json_data.append(f"{filename}_FINAL.mkv") for file in listdir(): if f"{filename}_video_t" in file: json_data.append("--no-track-tags") json_data.append("--no-global-tags") json_data.append("--no-chapters") if not "t00000" in file: json_data.append("+") json_data.append("(") json_data.append(file) json_data.append(")") for file in listdir(): if f"{filename}_audio" in file: json_data.append("--no-track-tags") json_data.append("--no-global-tags") json_data.append("--no-chapters") if "fra" in file: json_data.append("--language") json_data.append("0:fr") elif "eng" in file: json_data.append("--language") json_data.append("0:en") elif "deu" in file: json_data.append("--language") json_data.append("0:de") elif "nld" in file: json_data.append("--language") json_data.append("0:nl") elif "spa" in file: json_data.append("--language") json_data.append("0:es") elif "ita" in file: json_data.append("--language") json_data.append("0:it") json_data.append("(") json_data.append(file) json_data.append(")") for file in listdir(): if f"{filename}_subtitle" in file: json_data.append("--no-track-tags") json_data.append("--no-global-tags") json_data.append("--no-chapters") if "fra" in file: json_data.append("--language") json_data.append("0:fr") elif "eng" in file: json_data.append("--language") json_data.append("0:en") elif "deu" in file: json_data.append("--language") json_data.append("0:de") elif "nld" in file: json_data.append("--language") json_data.append("0:nl") elif "spa" in file: json_data.append("--language") json_data.append("0:es") elif "ita" in file: json_data.append("--language") json_data.append("0:it") json_data.append("(") json_data.append(file) json_data.append(")") with open(f"/tmp/{filename}.json", "w") as mkvmerge_options: mkvmerge_options.write(json.dumps(json_data)) command = f"mkvmerge -v @/tmp/{filename}.json" logging.debug(command) result = subprocess.getoutput(command) logging.info(result) remove(f"/tmp/{filename}.json") for file in listdir(): if f"{filename}_video" in file: remove(file) if f"{filename}_audio" in file: remove(file) if f"{filename}_subtitle" in file: remove(file) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("f_input") parser.add_argument("-d", "--debug", dest="debug", action="store_true") parser.add_argument("-s", "--stabilise", dest="stab", action="store_true") parser.add_argument("-t", "--starttime", dest="starttime") args = parser.parse_args() if args.debug: logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.DEBUG, datefmt='%d/%m/%Y %H:%M:%S') else: logging.basicConfig(format='[%(asctime)s]\n%(message)s', level=logging.INFO, datefmt='%d/%m/%Y %H:%M:%S') file = args.f_input infos = get_infos(file) # interlaced = is_interlaced(file, infos) cropsize = cropping(file, infos) volumes = volume_audio(file, infos) if args.stab: stabilization(file) if not args.starttime: for track in infos['subtitles']: extract_subs(file, track['index'], track['language']) for track in infos['audio']: convert_audio(file, track['index'], volumes[track['index']], track['channels'], track['channel_layout'], track['language'], track['title']) if args.starttime: vid_part_time = int(args.starttime) else: vid_part_time = 0 while vid_part_time < infos['duration']: crf = 19 convert_video(file, infos, vid_part_time, cropsize, crf) vid_part_time += 300 create_mkv(file)