# Keep this list in sync with MANIFEST.in
# intended use: when building a source distribution,
# make pypi-files && python setup.py sdist
-pypi-files: AUTHORS Changelog.md LICENSE README.md README.txt supportedsites completions yt-dlp.1 devscripts/* test/*
+pypi-files:
+ AUTHORS Changelog.md LICENSE README.md README.txt supportedsites \
+ completions yt-dlp.1 requirements.txt devscripts/* test/*
.PHONY: all clean install test tar pypi-files completions ot offlinetest codetest supportedsites
rm yt-dlp.zip
chmod a+x yt-dlp
-README.md: yt_dlp/*.py yt_dlp/*/*.py
+README.md: yt_dlp/*.py yt_dlp/*/*.py devscripts/make_readme.py
COLUMNS=80 $(PYTHON) yt_dlp/__main__.py --ignore-config --help | $(PYTHON) devscripts/make_readme.py
-CONTRIBUTING.md: README.md
+CONTRIBUTING.md: README.md devscripts/make_contributing.py
$(PYTHON) devscripts/make_contributing.py README.md CONTRIBUTING.md
issuetemplates: devscripts/make_issue_template.py .github/ISSUE_TEMPLATE_tmpl/1_broken_site.yml .github/ISSUE_TEMPLATE_tmpl/2_site_support_request.yml .github/ISSUE_TEMPLATE_tmpl/3_site_feature_request.yml .github/ISSUE_TEMPLATE_tmpl/4_bug_report.yml .github/ISSUE_TEMPLATE_tmpl/5_feature_request.yml yt_dlp/version.py
README.txt: README.md
pandoc -f $(MARKDOWN) -t plain README.md -o README.txt
-yt-dlp.1: README.md
+yt-dlp.1: README.md devscripts/prepare_manpage.py
$(PYTHON) devscripts/prepare_manpage.py yt-dlp.1.temp.md
pandoc -s -f $(MARKDOWN) -t man yt-dlp.1.temp.md -o yt-dlp.1
rm -f yt-dlp.1.temp.md
CONTRIBUTING.md Collaborators.md CONTRIBUTORS AUTHORS \
Makefile MANIFEST.in yt-dlp.1 README.txt completions \
setup.py setup.cfg yt-dlp yt_dlp requirements.txt \
- devscripts test tox.ini pytest.ini
+ devscripts test
AUTHORS: .mailmap
git shortlog -s -n | cut -f2 | sort > AUTHORS
#!/usr/bin/env python3
-import io
import optparse
# yt-dlp --help | make_readme.py
# This must be run in a console of correct width
+import functools
import re
import sys
EPILOG_START = 'See full documentation'
-helptext = sys.stdin.read()
-if isinstance(helptext, bytes):
- helptext = helptext.decode()
+def take_section(text, start=None, end=None, *, shift=0):
+ return text[
+ text.index(start) + shift if start else None:
+ text.index(end) + shift if end else None
+ ]
-start, end = helptext.index(f'\n {OPTIONS_START}'), helptext.index(f'\n{EPILOG_START}')
-options = re.sub(r'(?m)^ (\w.+)$', r'## \1', helptext[start + 1: end + 1])
+
+def apply_patch(text, patch):
+ return re.sub(*patch, text)
+
+
+options = take_section(sys.stdin.read(), f'\n {OPTIONS_START}', f'\n{EPILOG_START}', shift=1)
+
+switch_col_width = len(re.search(r'(?m)^\s{5,}', options).group())
+delim = f'\n{" " * switch_col_width}'
+
+PATCHES = (
+ ( # Headings
+ r'(?m)^ (\w.+\n)( (?=\w))?',
+ r'## \1'
+ ),
+ ( # Do not split URLs
+ rf'({delim[:-1]})? (?P<label>\[\S+\] )?(?P<url>https?({delim})?:({delim})?/({delim})?/(({delim})?\S+)+)\s',
+ lambda mobj: ''.join((delim, mobj.group('label') or '', re.sub(r'\s+', '', mobj.group('url')), '\n'))
+ ),
+ # This creates issues with prepare_manpage
+ # ( # Avoid newline when a space is available b/w switch and description
+ # r'(?m)^(\s{4}-.{%d})(%s)' % (switch_col_width - 6, delim),
+ # r'\1 '
+ # ),
+)
with open(README_FILE, encoding='utf-8') as f:
readme = f.read()
-header = readme[:readme.index(f'## {OPTIONS_START}')]
-footer = readme[readme.index(f'# {OPTIONS_END}'):]
-
with open(README_FILE, 'w', encoding='utf-8') as f:
- for part in (header, options, footer):
- f.write(part)
+ f.write(''.join((
+ take_section(readme, end=f'## {OPTIONS_START}'),
+ functools.reduce(apply_patch, PATCHES, options),
+ take_section(readme, f'# {OPTIONS_END}'),
+ )))
+++ /dev/null
-[pytest]
-addopts = -ra -v --strict-markers
-markers =
- download
[wheel]
-universal = True
+universal = true
[flake8]
-exclude = devscripts/lazy_load_template.py,devscripts/make_issue_template.py,setup.py,build,.git,venv
+exclude = build,venv,.tox,.git
ignore = E402,E501,E731,E741,W503
+per_file_ignores =
+ ./devscripts/lazy_load_template.py: F401
+
+[tool:pytest]
+addopts = -ra -v --strict-markers
+markers =
+ download
+
+[tox:tox]
+skipsdist = true
+envlist = py{36,37,38,39,310},pypy{36,37,38,39}
+skip_missing_interpreters = true
+
+[testenv] # tox
+deps =
+ pytest
+commands = pytest {posargs:"-m not download"}
+passenv = HOME # For test_compat_expanduser
+setenv =
+ # PYTHONWARNINGS = error # Catches PIP's warnings too
+
+[isort]
+py_version = 36
+multi_line_output = VERTICAL_HANGING_INDENT
+line_length = 80
+reverse_relative = true
+ensure_newline_before_comments = true
+include_trailing_comma = true
if sys.argv[1:2] == ['py2exe']:
- import py2exe
+ import py2exe # noqa: F401
warnings.warn(
'py2exe builds do not support pycryptodomex and needs VC++14 to run. '
'The recommended way is to use "pyinst.py" to build using pyinstaller')
+++ /dev/null
-[tox]
-envlist = py26,py27,py33,py34,py35
-
-# Needed?
-[testenv]
-deps =
- nose
- coverage
-# We need a valid $HOME for test_compat_expanduser
-passenv = HOME
-defaultargs = test --exclude test_download.py --exclude test_age_restriction.py
- --exclude test_subtitles.py --exclude test_write_annotations.py
- --exclude test_youtube_lists.py --exclude test_iqiyi_sdk_interpreter.py
- --exclude test_socks.py
-commands = nosetests --verbose {posargs:{[testenv]defaultargs}} # --with-coverage --cover-package=yt_dlp --cover-html
- # test.test_download:TestDownload.test_NowVideo
def _calc_headers(self, info_dict):
res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {})
- cookies = self._calc_cookies(info_dict)
+ cookies = self._calc_cookies(info_dict['url'])
if cookies:
res['Cookie'] = cookies
return res
- def _calc_cookies(self, info_dict):
- pr = sanitized_Request(info_dict['url'])
+ def _calc_cookies(self, url):
+ pr = sanitized_Request(url)
self.cookiejar.add_cookie_header(pr)
return pr.get_header('Cookie')
if list_only:
# Without this printing, -F --print-json will not work
self.__forced_printings(info_dict, self.prepare_filename(info_dict), incomplete=True)
- return
+ return info_dict
format_selector = self.format_selector
if format_selector is None:
and info_dict.get('thumbnails')
# check with type instead of pp_key, __name__, or isinstance
# since we dont want any custom PPs to trigger this
- and any(type(pp) == EmbedThumbnailPP for pp in self._pps['post_process'])):
+ and any(type(pp) == EmbedThumbnailPP for pp in self._pps['post_process'])): # noqa: E721
info_dict['ext'] = 'mkv'
self.report_warning(
'webm doesn\'t support embedding a thumbnail, mkv will be used')
return
info_dict['__write_download_archive'] = True
+ assert info_dict is original_infodict # Make sure the info_dict was modified in-place
if self.params.get('force_write_download_archive'):
info_dict['__write_download_archive'] = True
-
- # Make sure the info_dict was modified in-place
- assert info_dict is original_infodict
check_max_downloads()
def __download_wrapper(self, func):
'You must provide at least one URL.\n'
'Type yt-dlp --help to see a list of all options.')
+ parser.destroy()
try:
if opts.load_info_filename is not None:
return ydl.download_with_info_file(expand_path(opts.load_info_filename))
verbose: Print additional info to stdout.
quiet: Do not print messages to stdout.
ratelimit: Download speed limit, in bytes/sec.
+ continuedl: Attempt to continue downloads if possible
throttledratelimit: Assume the download is being throttled below this speed (bytes/sec)
retries: Number of times to retry for HTTP error 5xx
file_access_retries: Number of times to retry on file access error
import time
+from . import get_suitable_downloader
from .fragment import FragmentFD
-from ..downloader import get_suitable_downloader
from ..utils import urljoin
+import enum
import os.path
import re
import subprocess
import time
from .fragment import FragmentFD
-from ..compat import functools
-from ..compat import compat_setenv, compat_str
+from ..compat import functools # isort: split
+from ..compat import compat_setenv
from ..postprocessor.ffmpeg import EXT_TO_OUT_FORMATS, FFmpegPostProcessor
from ..utils import (
Popen,
)
+class Features(enum.Enum):
+ TO_STDOUT = enum.auto()
+ MULTIPLE_FORMATS = enum.auto()
+
+
class ExternalFD(FragmentFD):
SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps')
- can_download_to_stdout = False
+ SUPPORTED_FEATURES = ()
def real_download(self, filename, info_dict):
self.report_destination(filename)
@classmethod
def supports(cls, info_dict):
- return (
- (cls.can_download_to_stdout or not info_dict.get('to_stdout'))
- and info_dict['protocol'] in cls.SUPPORTED_PROTOCOLS)
+ return all((
+ not info_dict.get('to_stdout') or Features.TO_STDOUT in cls.SUPPORTED_FEATURES,
+ '+' not in info_dict['protocol'] or Features.MULTIPLE_FORMATS in cls.SUPPORTED_FEATURES,
+ all(proto in cls.SUPPORTED_PROTOCOLS for proto in info_dict['protocol'].split('+')),
+ ))
@classmethod
def can_download(cls, info_dict, path=None):
class FFmpegFD(ExternalFD):
SUPPORTED_PROTOCOLS = ('http', 'https', 'ftp', 'ftps', 'm3u8', 'm3u8_native', 'rtsp', 'rtmp', 'rtmp_ffmpeg', 'mms', 'http_dash_segments')
- can_download_to_stdout = True
+ SUPPORTED_FEATURES = (Features.TO_STDOUT, Features.MULTIPLE_FORMATS)
@classmethod
def available(cls, path=None):
# Fixme: This may be wrong when --ffmpeg-location is used
return FFmpegPostProcessor().available
- @classmethod
- def supports(cls, info_dict):
- return all(proto in cls.SUPPORTED_PROTOCOLS for proto in info_dict['protocol'].split('+'))
-
def on_process_started(self, proc, stdin):
""" Override this in subclasses """
pass
# start_time = info_dict.get('start_time') or 0
# if start_time:
- # args += ['-ss', compat_str(start_time)]
+ # args += ['-ss', str(start_time)]
# end_time = info_dict.get('end_time')
# if end_time:
- # args += ['-t', compat_str(end_time - start_time)]
+ # args += ['-t', str(end_time - start_time)]
http_headers = None
if info_dict.get('http_headers'):
if isinstance(conn, list):
for entry in conn:
args += ['-rtmp_conn', entry]
- elif isinstance(conn, compat_str):
+ elif isinstance(conn, str):
args += ['-rtmp_conn', conn]
for i, url in enumerate(urls):
args.extend(['-map', f'{i}:{stream_number}'])
if self.params.get('test', False):
- args += ['-fs', compat_str(self._TEST_FILE_SIZE)]
+ args += ['-fs', str(self._TEST_FILE_SIZE)]
ext = info_dict['ext']
if protocol in ('m3u8', 'm3u8_native'):
import io
import re
+from . import get_suitable_downloader
from .external import FFmpegFD
from .fragment import FragmentFD
from .. import webvtt
from ..compat import compat_urlparse
from ..dependencies import Cryptodome_AES
-from ..downloader import get_suitable_downloader
from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
if has_range:
content_range = ctx.data.headers.get('Content-Range')
content_range_start, content_range_end, content_len = parse_http_range(content_range)
- if content_range_start is not None and range_start == content_range_start:
- # Content-Range is present and matches requested Range, resume is possible
- accept_content_len = (
+ # Content-Range is present and matches requested Range, resume is possible
+ if range_start == content_range_start and (
# Non-chunked download
not ctx.chunk_size
# Chunked download and requested piece or
# its part is promised to be served
or content_range_end == range_end
- or content_len < range_end)
- if accept_content_len:
- ctx.content_len = content_len
- if content_len or req_end:
- ctx.data_len = min(content_len or req_end, req_end or content_len) - (req_start or 0)
- return
+ or content_len < range_end):
+ ctx.content_len = content_len
+ if content_len or req_end:
+ ctx.data_len = min(content_len or req_end, req_end or content_len) - (req_start or 0)
+ return
# Content-Range is either not present or invalid. Assuming remote webserver is
# trying to send the whole file, resume is not possible, so wiping the local file
# and performing entire redownload
import threading
+from . import get_suitable_downloader
from .common import FileDownloader
-from ..downloader import get_suitable_downloader
-from ..extractor.niconico import NiconicoIE
from ..utils import sanitized_Request
""" Downloading niconico douga from DMC with heartbeat """
def real_download(self, filename, info_dict):
- self.to_screen('[%s] Downloading from DMC' % self.FD_NAME)
+ from ..extractor.niconico import NiconicoIE
+ self.to_screen('[%s] Downloading from DMC' % self.FD_NAME)
ie = NiconicoIE(self.ydl)
info_dict, heartbeat_info_dict = ie._get_heartbeat_info(info_dict)
from .fragment import FragmentFD
from ..compat import compat_urllib_error
-from ..extractor.youtube import YoutubeBaseInfoExtractor as YT_BaseIE
from ..utils import RegexNotFoundError, dict_get, int_or_none, try_get
'total_frags': None,
}
- ie = YT_BaseIE(self.ydl)
+ from ..extractor.youtube import YoutubeBaseInfoExtractor
+
+ ie = YoutubeBaseInfoExtractor(self.ydl)
start_time = int(time.time() * 1000)
import time
import xml.etree.ElementTree
-from ..compat import functools, re
+from ..compat import functools, re # isort: split
from ..compat import (
compat_cookiejar_Cookie,
compat_cookies_SimpleCookie,
def _get_cookies(self, url):
""" Return a compat_cookies_SimpleCookie with the cookies for the url """
- req = sanitized_Request(url)
- self._downloader.cookiejar.add_cookie_header(req)
- return compat_cookies_SimpleCookie(req.get_header('Cookie'))
+ return compat_cookies_SimpleCookie(self._downloader._calc_cookies(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
_VALID_URL = r'test(?:url)?:(?P<extractor>.+?)(?:_(?P<num>[0-9]+))?$'
def _real_extract(self, url):
- from ..extractor import gen_extractor_classes
+ from . import gen_extractor_classes
extractor_id, num = self._match_valid_url(url).group('extractor', 'num')
@functools.cache
-def detect_variant():
+def get_variant_and_executable_path():
+ """@returns (variant, executable_path)"""
if hasattr(sys, 'frozen'):
+ path = sys.executable
prefix = 'mac' if sys.platform == 'darwin' else 'win'
if getattr(sys, '_MEIPASS', None):
if sys._MEIPASS == os.path.dirname(sys.executable):
- return f'{prefix}_dir'
- return f'{prefix}_exe'
- return 'py2exe'
- elif isinstance(__loader__, zipimporter):
- return 'zip'
+ return f'{prefix}_dir', path
+ return f'{prefix}_exe', path
+ return 'py2exe', path
+
+ path = os.path.join(os.path.dirname(__file__), '..')
+ if isinstance(__loader__, zipimporter):
+ return 'zip', os.path.join(path, '..')
elif os.path.basename(sys.argv[0]) == '__main__.py':
- return 'source'
- return 'unknown'
+ return 'source', path
+ return 'unknown', path
+
+
+def detect_variant():
+ return get_variant_and_executable_path()[0]
_NON_UPDATEABLE_REASONS = {
JSON_URL = 'https://api.github.com/repos/yt-dlp/yt-dlp/releases/latest'
def report_error(msg, expected=False):
- ydl.report_error(msg, tb='' if expected else None)
+ ydl.report_error(msg, tb=False if expected else None)
def report_unable(action, expected=False):
report_error(f'Unable to {action}', expected)
if err:
return report_error(err, True)
- # sys.executable is set to the full pathname of the exe-file for py2exe
- # though symlinks are not followed so that we need to do this manually
- # with help of realpath
- filename = compat_realpath(sys.executable if hasattr(sys, 'frozen') else sys.argv[0])
+ variant, filename = get_variant_and_executable_path()
+ filename = compat_realpath(filename) # Absolute path, following symlinks
+
ydl.to_screen(f'Current Build Hash {calc_sha256sum(filename)}')
ydl.to_screen(f'Updating to version {version_id} ...')
if not os.access(filename, os.W_OK):
return report_permission_error(filename)
- # PyInstaller
- variant = detect_variant()
if variant in ('win_exe', 'py2exe'):
directory = os.path.dirname(filename)
if not os.access(directory, os.W_OK):
import xml.etree.ElementTree
import zlib
-from .compat import asyncio, functools # Modules
+from .compat import asyncio, functools # isort: split
from .compat import (
compat_chr,
compat_cookiejar,
return n.attrib[key]
-def get_element_by_id(id, html):
+def get_element_by_id(id, html, **kwargs):
"""Return the content of the tag with the specified ID in the passed HTML document"""
- return get_element_by_attribute('id', id, html)
+ return get_element_by_attribute('id', id, html, **kwargs)
-def get_element_html_by_id(id, html):
+def get_element_html_by_id(id, html, **kwargs):
"""Return the html of the tag with the specified ID in the passed HTML document"""
- return get_element_html_by_attribute('id', id, html)
+ return get_element_html_by_attribute('id', id, html, **kwargs)
def get_element_by_class(class_name, html):
return retval[0] if retval else None
-def get_element_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_by_attribute(attribute, value, html, escape_value)
+def get_element_by_attribute(attribute, value, html, **kwargs):
+ retval = get_elements_by_attribute(attribute, value, html, **kwargs)
return retval[0] if retval else None
-def get_element_html_by_attribute(attribute, value, html, escape_value=True):
- retval = get_elements_html_by_attribute(attribute, value, html, escape_value)
+def get_element_html_by_attribute(attribute, value, html, **kargs):
+ retval = get_elements_html_by_attribute(attribute, value, html, **kargs)
return retval[0] if retval else None
-def get_elements_by_class(class_name, html):
+def get_elements_by_class(class_name, html, **kargs):
"""Return the content of all tags with the specified class in the passed HTML document as a list"""
return get_elements_by_attribute(
'class', r'[^\'"]*\b%s\b[^\'"]*' % re.escape(class_name),
if compat_os_name == 'nt' and supports_terminal_sequences(out):
s = re.sub(r'([\r\n]+)', r' \1', s)
+ enc = None
if 'b' in getattr(out, 'mode', ''):
- byt = s.encode(encoding or preferredencoding(), 'ignore')
- out.write(byt)
+ enc = encoding or preferredencoding()
elif hasattr(out, 'buffer'):
+ out = out.buffer
enc = encoding or getattr(out, 'encoding', None) or preferredencoding()
- byt = s.encode(enc, 'ignore')
- out.buffer.write(byt)
- else:
- out.write(s)
+
+ out.write(s.encode(enc, 'ignore') if enc else s)
out.flush()
def parse_age_limit(s):
# isinstance(False, int) is True. So type() must be used instead
- if type(s) is int:
+ if type(s) is int: # noqa: E721
return s if 0 <= s <= 21 else None
elif not isinstance(s, str):
return None
return ''.join(out)
-def cli_option(params, command_option, param):
+def cli_option(params, command_option, param, separator=None):
param = params.get(param)
- if param:
- param = compat_str(param)
- return [command_option, param] if param is not None else []
+ return ([] if param is None
+ else [command_option, str(param)] if separator is None
+ else [f'{command_option}{separator}{param}'])
def cli_bool_option(params, command_option, param, true_value='true', false_value='false', separator=None):
param = params.get(param)
- if param is None:
- return []
- assert isinstance(param, bool)
- if separator:
- return [command_option + separator + (true_value if param else false_value)]
- return [command_option, true_value if param else false_value]
+ assert param in (True, False, None)
+ return cli_option({True: true_value, False: false_value}, command_option, param, separator)
def cli_valueless_option(params, command_option, param, expected_value=True):
- param = params.get(param)
- return [command_option] if param == expected_value else []
+ return [command_option] if params.get(param) == expected_value else []
def cli_configuration_args(argdict, keys, default=[], use_compat=True):
def get_executable_path():
- from zipimport import zipimporter
- if hasattr(sys, 'frozen'): # Running from PyInstaller
- path = os.path.dirname(sys.executable)
- elif isinstance(__loader__, zipimporter): # Running from ZIP
- path = os.path.join(os.path.dirname(__file__), '../..')
- else:
- path = os.path.join(os.path.dirname(__file__), '..')
- return os.path.abspath(path)
+ from .update import get_variant_and_executable_path
+
+ return os.path.abspath(get_variant_and_executable_path()[1])
def load_plugins(name, suffix, namespace):
class classproperty:
- def __init__(self, f):
- functools.update_wrapper(self, f)
- self.f = f
+ """classmethod(property(func)) that works in py < 3.9"""
+
+ def __init__(self, func):
+ functools.update_wrapper(self, func)
+ self.func = func
def __get__(self, _, cls):
- return self.f(cls)
+ return self.func(cls)
class Namespace: