import collections
+import contextvars
import itertools
import json
import os
from .common import PostProcessor
from ..compat import functools, imghdr
from ..utils import (
+ MEDIA_EXTENSIONS,
ISO639Utils,
Popen,
PostProcessingError,
_get_exe_version_output,
+ deprecation_warning,
detect_exe_version,
determine_ext,
dfxp2srt,
traverse_obj,
variadic,
write_json_file,
- write_string,
)
EXT_TO_OUT_FORMATS = {
'ts': 'mpegts',
'wma': 'asf',
'wmv': 'asf',
+ 'weba': 'webm',
'vtt': 'webvtt',
}
ACODECS = {
class FFmpegPostProcessor(PostProcessor):
+ _ffmpeg_location = contextvars.ContextVar('ffmpeg_location', default=None)
+
def __init__(self, downloader=None):
PostProcessor.__init__(self, downloader)
self._prefer_ffmpeg = self.get_param('prefer_ffmpeg', True)
def _determine_executables(self):
programs = [*self._ffmpeg_to_avconv.keys(), *self._ffmpeg_to_avconv.values()]
- location = self.get_param('ffmpeg_location')
+ location = self.get_param('ffmpeg_location', self._ffmpeg_location.get())
if location is None:
return {p: p for p in programs}
if not os.path.exists(location):
- self.report_warning(f'ffmpeg-location {location} does not exist! Continuing without ffmpeg')
+ self.report_warning(
+ f'ffmpeg-location {location} does not exist! Continuing without ffmpeg', only_once=True)
return {}
elif os.path.isdir(location):
- dirname, basename = location, None
+ dirname, basename, filename = location, None, None
else:
- basename = os.path.splitext(os.path.basename(location))[0]
- basename = next((p for p in programs if basename.startswith(p)), 'ffmpeg')
+ filename = os.path.basename(location)
+ basename = next((p for p in programs if p in filename), 'ffmpeg')
dirname = os.path.dirname(os.path.abspath(location))
if basename in self._ffmpeg_to_avconv.keys():
self._prefer_ffmpeg = True
paths = {p: os.path.join(dirname, p) for p in programs}
+ if basename and basename in filename:
+ for p in programs:
+ path = os.path.join(dirname, filename.replace(basename, p))
+ if os.path.exists(path):
+ paths[p] = path
if basename:
paths[basename] = location
return paths
path = self._paths.get(prog)
if path in self._version_cache:
return self._version_cache[path], self._features_cache.get(path, {})
- out = _get_exe_version_output(path, ['-bsfs'], to_screen=self.write_debug)
+ out = _get_exe_version_output(path, ['-bsfs'])
ver = detect_exe_version(out) if out else False
if ver:
regexs = [
return self.probe_basename
def _get_version(self, kind):
- executables = (kind, self._ffmpeg_to_avconv[kind])
+ executables = (kind, )
if not self._prefer_ffmpeg:
- executables = reversed(executables)
+ executables = (kind, self._ffmpeg_to_avconv[kind])
basename, version, features = next(filter(
lambda x: x[1], ((p, *self._get_ffmpeg_version(p)) for p in executables)), (None, None, {}))
if kind == 'ffmpeg':
else:
self.probe_basename = basename
if basename == self._ffmpeg_to_avconv[kind]:
- self.deprecation_warning(
- f'Support for {self._ffmpeg_to_avconv[kind]} is deprecated and may be removed in a future version. Use {kind} instead')
+ self.deprecated_feature(f'Support for {self._ffmpeg_to_avconv[kind]} is deprecated and '
+ f'may be removed in a future version. Use {kind} instead')
return version
@functools.cached_property
None)
return num, len(streams)
+ def _fixup_chapters(self, info):
+ last_chapter = traverse_obj(info, ('chapters', -1))
+ if last_chapter and not last_chapter.get('end_time'):
+ last_chapter['end_time'] = self._get_real_video_duration(info['filepath'])
+
def _get_real_video_duration(self, filepath, fatal=True):
try:
duration = float_or_none(
_, stderr, returncode = Popen.run(
cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
if returncode not in variadic(expected_retcodes):
+ self.write_debug(stderr)
raise FFmpegPostProcessorError(stderr.strip().splitlines()[-1])
for out_path, _ in output_path_opts:
if out_path:
"""
concat_file = f'{out_file}.concat'
self.write_debug(f'Writing concat spec to {concat_file}')
- with open(concat_file, 'wt', encoding='utf-8') as f:
+ with open(concat_file, 'w', encoding='utf-8') as f:
f.writelines(self._concat_spec(in_files, concat_opts))
out_flags = list(self.stream_copy_opts(ext=determine_ext(out_file)))
class FFmpegExtractAudioPP(FFmpegPostProcessor):
- COMMON_AUDIO_EXTS = ('wav', 'flac', 'm4a', 'aiff', 'mp3', 'ogg', 'mka', 'opus', 'wma')
+ COMMON_AUDIO_EXTS = MEDIA_EXTENSIONS.common_audio + ('wma', )
SUPPORTED_EXTS = tuple(ACODECS.keys())
FORMAT_RE = create_mapping_re(('best', *SUPPORTED_EXTS))
if acodec != 'copy':
more_opts = self._quality_args(acodec)
- # not os.path.splitext, since the latter does not work on unicode in all setups
- temp_path = new_path = f'{path.rpartition(".")[0]}.{extension}'
+ temp_path = new_path = replace_extension(path, extension, information['ext'])
if new_path == path:
if acodec == 'copy':
class FFmpegVideoConvertorPP(FFmpegPostProcessor):
- SUPPORTED_EXTS = ('mp4', 'mkv', 'flv', 'webm', 'mov', 'avi', 'mka', 'ogg', *FFmpegExtractAudioPP.SUPPORTED_EXTS)
+ SUPPORTED_EXTS = (
+ *sorted((*MEDIA_EXTENSIONS.common_video, 'gif')),
+ *sorted((*MEDIA_EXTENSIONS.common_audio, 'aac', 'vorbis')),
+ )
FORMAT_RE = create_mapping_re(SUPPORTED_EXTS)
_ACTION = 'converting'
filename = info['filepath']
- # Disabled temporarily. There needs to be a way to overide this
+ # Disabled temporarily. There needs to be a way to override this
# in case of duration actually mismatching in extractor
# See: https://github.com/yt-dlp/yt-dlp/issues/1870, https://github.com/yt-dlp/yt-dlp/issues/1385
'''
@PostProcessor._restrict_to(images=False)
def run(self, info):
+ self._fixup_chapters(info)
filename, metadata_filename = info['filepath'], None
files_to_delete, options = [], []
if self._add_chapters and info.get('chapters'):
@staticmethod
def _get_chapter_opts(chapters, metadata_filename):
- with open(metadata_filename, 'wt', encoding='utf-8') as f:
+ with open(metadata_filename, 'w', encoding='utf-8') as f:
def ffmpeg_escape(text):
return re.sub(r'([\\=;#\n])', r'\\\1', text)
def add(meta_list, info_list=None):
value = next((
- str(info[key]) for key in [f'{meta_prefix}_'] + list(variadic(info_list or meta_list))
+ info[key] for key in [f'{meta_prefix}_'] + list(variadic(info_list or meta_list))
if info.get(key) is not None), None)
if value not in ('', None):
+ value = ', '.join(map(str, variadic(value)))
value = value.replace('\0', '') # nul character cannot be passed in command line
metadata['common'].update({meta_f: value for meta_f in variadic(meta_list)})
- # See [1-4] for some info on media metadata/metadata supported
- # by ffmpeg.
- # 1. https://kdenlive.org/en/project/adding-meta-data-to-mp4-video/
- # 2. https://wiki.multimedia.cx/index.php/FFmpeg_Metadata
- # 3. https://kodi.wiki/view/Video_file_tagging
+ # Info on media metadata/metadata supported by ffmpeg:
+ # https://wiki.multimedia.cx/index.php/FFmpeg_Metadata
+ # https://kdenlive.org/en/project/adding-meta-data-to-mp4-video/
+ # https://kodi.wiki/view/Video_file_tagging
add('title', ('track', 'title'))
add('date', 'upload_date')
add(('description', 'synopsis'), 'description')
add(('purl', 'comment'), 'webpage_url')
add('track', 'track_number')
- add('artist', ('artist', 'creator', 'uploader', 'uploader_id'))
- add('genre')
+ add('artist', ('artist', 'artists', 'creator', 'creators', 'uploader', 'uploader_id'))
+ add('composer', ('composer', 'composers'))
+ add('genre', ('genre', 'genres'))
add('album')
- add('album_artist')
+ add('album_artist', ('album_artist', 'album_artists'))
add('disc', 'disc_number')
add('show', 'series')
add('season_number')
yield ('-metadata', f'{name}={value}')
stream_idx = 0
- for fmt in info.get('requested_formats') or []:
+ for fmt in info.get('requested_formats') or [info]:
stream_count = 2 if 'none' not in (fmt.get('vcodec'), fmt.get('acodec')) else 1
lang = ISO639Utils.short2long(fmt.get('language') or '') or fmt.get('language')
for i in range(stream_idx, stream_idx + stream_count):
new_stream -= 1
yield (
- '-attach', infofn,
+ '-attach', self._ffmpeg_filename_argument(infofn),
f'-metadata:s:{new_stream}', 'mimetype=application/json',
f'-metadata:s:{new_stream}', 'filename=info.json',
)
class FFmpegMergerPP(FFmpegPostProcessor):
+ SUPPORTED_EXTS = MEDIA_EXTENSIONS.common_video
+
@PostProcessor._restrict_to(images=False)
def run(self, info):
filename = info['filepath']
@PostProcessor._restrict_to(images=False)
def run(self, info):
if all(self._needs_fixup(info)):
+ args = ['-f', 'mp4']
+ if self.get_audio_codec(info['filepath']) == 'aac':
+ args.extend(['-bsf:a', 'aac_adtstoasc'])
self._fixup('Fixing MPEG-TS in MP4 container', info['filepath'], [
- *self.stream_copy_opts(), '-f', 'mp4', '-bsf:a', 'aac_adtstoasc'])
+ *self.stream_copy_opts(), *args])
return [], info
class FFmpegSubtitlesConvertorPP(FFmpegPostProcessor):
- SUPPORTED_EXTS = ('srt', 'vtt', 'ass', 'lrc')
+ SUPPORTED_EXTS = MEDIA_EXTENSIONS.subtitles
def __init__(self, downloader=None, format=None):
super().__init__(downloader)
with open(dfxp_file, 'rb') as f:
srt_data = dfxp2srt(f.read())
- with open(srt_file, 'wt', encoding='utf-8') as f:
+ with open(srt_file, 'w', encoding='utf-8') as f:
f.write(srt_data)
old_file = srt_file
@PostProcessor._restrict_to(images=False)
def run(self, info):
+ self._fixup_chapters(info)
chapters = info.get('chapters') or []
if not chapters:
self.to_screen('Chapter information is unavailable')
class FFmpegThumbnailsConvertorPP(FFmpegPostProcessor):
- SUPPORTED_EXTS = ('jpg', 'png', 'webp')
+ SUPPORTED_EXTS = MEDIA_EXTENSIONS.thumbnails
FORMAT_RE = create_mapping_re(SUPPORTED_EXTS)
def __init__(self, downloader=None, format=None):
@classmethod
def is_webp(cls, path):
- write_string(f'DeprecationWarning: {cls.__module__}.{cls.__name__}.is_webp is deprecated')
+ deprecation_warning(f'{cls.__module__}.{cls.__name__}.is_webp is deprecated')
return imghdr.what(path) == 'webp'
def fixup_webp(self, info, idx=-1):
@staticmethod
def _options(target_ext):
+ yield from ('-update', '1')
if target_ext == 'jpg':
- return ['-bsf:v', 'mjpeg2jpeg']
- return []
+ yield from ('-bsf:v', 'mjpeg2jpeg')
def convert_thumbnail(self, thumbnail_filename, target_ext):
thumbnail_conv_filename = replace_extension(thumbnail_filename, target_ext)
self.to_screen(f'Converting thumbnail "{thumbnail_filename}" to {target_ext}')
+ _, source_ext = os.path.splitext(thumbnail_filename)
self.real_run_ffmpeg(
- [(thumbnail_filename, ['-f', 'image2', '-pattern_type', 'none'])],
- [(thumbnail_conv_filename.replace('%', '%%'), self._options(target_ext))])
+ [(thumbnail_filename, [] if source_ext == '.gif' else ['-f', 'image2', '-pattern_type', 'none'])],
+ [(thumbnail_conv_filename, self._options(target_ext))])
return thumbnail_conv_filename
def run(self, info):
continue
has_thumbnail = True
self.fixup_webp(info, idx)
+ original_thumbnail = thumbnail_dict['filepath'] # Path can change during fixup
thumbnail_ext = os.path.splitext(original_thumbnail)[1][1:].lower()
if thumbnail_ext == 'jpeg':
thumbnail_ext = 'jpg'
if len(in_files) < len(entries):
raise PostProcessingError('Aborting concatenation because some downloads failed')
- ie_copy = self._downloader._playlist_infodict(info)
exts = traverse_obj(entries, (..., 'requested_downloads', 0, 'ext'), (..., 'ext'))
- ie_copy['ext'] = exts[0] if len(set(exts)) == 1 else 'mkv'
+ ie_copy = collections.ChainMap({'ext': exts[0] if len(set(exts)) == 1 else 'mkv'},
+ info, self._downloader._playlist_infodict(info))
out_file = self._downloader.prepare_filename(ie_copy, 'pl_video')
files_to_delete = self.concat_files(in_files, out_file)