*.mkv
*.swf
*.part
+*.part-*
*.ytdl
*.dump
*.frag
--no-include-ads Do not download advertisements (default)
## Download Options:
+ -N, --concurrent-fragments N Number of fragments to download
+ concurrently (default is 1)
-r, --limit-rate RATE Maximum download rate in bytes per second
(e.g. 50K or 4.2M)
-R, --retries RETRIES Number of retries (default is 10), or
if opts.overwrites:
# --yes-overwrites implies --no-continue
opts.continue_dl = False
+ if opts.concurrent_fragment_downloads <= 0:
+ raise ValueError('Concurrent fragments must be positive')
def parse_retries(retries, name=''):
if retries in ('inf', 'infinite'):
'extractor_retries': opts.extractor_retries,
'skip_unavailable_fragments': opts.skip_unavailable_fragments,
'keep_fragments': opts.keep_fragments,
+ 'concurrent_fragment_downloads': opts.concurrent_fragment_downloads,
'buffersize': opts.buffersize,
'noresizebuffer': opts.noresizebuffer,
'http_chunk_size': opts.http_chunk_size,
from __future__ import unicode_literals
+try:
+ import concurrent.futures
+ can_threaded_download = True
+except ImportError:
+ can_threaded_download = False
+
from ..downloader import _get_real_downloader
from .fragment import FragmentFD
from ..compat import compat_urllib_error
from ..utils import (
DownloadError,
+ sanitize_open,
urljoin,
)
assert fragment_base_url
fragment_url = urljoin(fragment_base_url, fragment['path'])
- if real_downloader:
- fragments_to_download.append({
- 'url': fragment_url,
- })
- continue
-
- # In DASH, the first segment contains necessary headers to
- # generate a valid MP4 file, so always abort for the first segment
- fatal = i == 0 or not skip_unavailable_fragments
- count = 0
- while count <= fragment_retries:
- try:
- success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
- if not success:
- return False
- self._append_fragment(ctx, frag_content)
- break
- except compat_urllib_error.HTTPError as err:
- # YouTube may often return 404 HTTP error for a fragment causing the
- # whole download to fail. However if the same fragment is immediately
- # retried with the same request data this usually succeeds (1-2 attempts
- # is usually enough) thus allowing to download the whole file successfully.
- # To be future-proof we will retry all fragments that fail with any
- # HTTP error.
- count += 1
- if count <= fragment_retries:
- self.report_retry_fragment(err, frag_index, count, fragment_retries)
- except DownloadError:
- # Don't retry fragment if error occurred during HTTP downloading
- # itself since it has own retry settings
- if not fatal:
- self.report_skip_fragment(frag_index)
- break
- raise
-
- if count > fragment_retries:
- if not fatal:
- self.report_skip_fragment(frag_index)
- continue
- self.report_error('giving up after %s fragment retries' % fragment_retries)
- return False
+ fragments_to_download.append({
+ 'frag_index': frag_index,
+ 'index': i,
+ 'url': fragment_url,
+ })
if real_downloader:
info_copy = info_dict.copy()
if not success:
return False
else:
+ def download_fragment(fragment):
+ i = fragment['index']
+ frag_index = fragment['frag_index']
+ fragment_url = fragment['url']
+
+ ctx['fragment_index'] = frag_index
+
+ # In DASH, the first segment contains necessary headers to
+ # generate a valid MP4 file, so always abort for the first segment
+ fatal = i == 0 or not skip_unavailable_fragments
+ count = 0
+ while count <= fragment_retries:
+ try:
+ success, frag_content = self._download_fragment(ctx, fragment_url, info_dict)
+ if not success:
+ return False, frag_index
+ break
+ except compat_urllib_error.HTTPError as err:
+ # YouTube may often return 404 HTTP error for a fragment causing the
+ # whole download to fail. However if the same fragment is immediately
+ # retried with the same request data this usually succeeds (1-2 attempts
+ # is usually enough) thus allowing to download the whole file successfully.
+ # To be future-proof we will retry all fragments that fail with any
+ # HTTP error.
+ count += 1
+ if count <= fragment_retries:
+ self.report_retry_fragment(err, frag_index, count, fragment_retries)
+ except DownloadError:
+ # Don't retry fragment if error occurred during HTTP downloading
+ # itself since it has own retry settings
+ if not fatal:
+ break
+ raise
+
+ if count > fragment_retries:
+ if not fatal:
+ return False, frag_index
+ self.report_error('giving up after %s fragment retries' % fragment_retries)
+ return False, frag_index
+
+ return frag_content, frag_index
+
+ def append_fragment(frag_content, frag_index):
+ if frag_content:
+ fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
+ try:
+ file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
+ ctx['fragment_filename_sanitized'] = frag_sanitized
+ file.close()
+ self._append_fragment(ctx, frag_content)
+ return True
+ except FileNotFoundError:
+ if skip_unavailable_fragments:
+ self.report_skip_fragment(frag_index)
+ return True
+ else:
+ self.report_error(
+ 'fragment %s not found, unable to continue' % frag_index)
+ return False
+ else:
+ if skip_unavailable_fragments:
+ self.report_skip_fragment(frag_index)
+ return True
+ else:
+ self.report_error(
+ 'fragment %s not found, unable to continue' % frag_index)
+ return False
+
+ max_workers = self.params.get('concurrent_fragment_downloads', 1)
+ if can_threaded_download and max_workers > 1:
+ self.report_warning('The download speed shown is only of one thread. This is a known issue')
+ with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+ futures = [pool.submit(download_fragment, fragment) for fragment in fragments_to_download]
+ # timeout must be 0 to return instantly
+ done, not_done = concurrent.futures.wait(futures, timeout=0)
+ try:
+ while not_done:
+ # Check every 1 second for KeyboardInterrupt
+ freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
+ done |= freshly_done
+ except KeyboardInterrupt:
+ for future in not_done:
+ future.cancel()
+ # timeout must be none to cancel
+ concurrent.futures.wait(not_done, timeout=None)
+ raise KeyboardInterrupt
+ results = [future.result() for future in futures]
+
+ for frag_content, frag_index in results:
+ result = append_fragment(frag_content, frag_index)
+ if not result:
+ return False
+ else:
+ for fragment in fragments_to_download:
+ frag_content, frag_index = download_fragment(fragment)
+ result = append_fragment(frag_content, frag_index)
+ if not result:
+ return False
+
self._finish_frag_download(ctx)
return True
file_list = []
dest, _ = sanitize_open(tmpfilename, 'wb')
for i, fragment in enumerate(info_dict['fragments']):
- file = '%s_%s.frag' % (tmpfilename, i)
+ file = '%s-Frag%d' % (tmpfilename, i)
decrypt_info = fragment.get('decrypt_info')
src, _ = sanitize_open(file, 'rb')
if decrypt_info:
url_list_file = '%s.frag.urls' % tmpfilename
url_list = []
for i, fragment in enumerate(info_dict['fragments']):
- tmpsegmentname = '%s_%s.frag' % (os.path.basename(tmpfilename), i)
+ tmpsegmentname = '%s-Frag%d' % (os.path.basename(tmpfilename), i)
url_list.append('%s\n\tout=%s' % (fragment['url'], tmpsegmentname))
stream, _ = sanitize_open(url_list_file, 'wb')
stream.write('\n'.join(url_list).encode('utf-8'))
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
+try:
+ import concurrent.futures
+ can_threaded_download = True
+except ImportError:
+ can_threaded_download = False
from ..downloader import _get_real_downloader
from .fragment import FragmentFD
)
from ..utils import (
parse_m3u8_attributes,
+ sanitize_open,
update_url_query,
)
ad_frag_next = False
for line in s.splitlines():
line = line.strip()
- download_frag = False
if line:
if not line.startswith('#'):
if format_index and discontinuity_count != format_index:
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
- if real_downloader:
- fragments.append({
- 'url': frag_url,
- 'decrypt_info': decrypt_info,
- })
- continue
- download_frag = True
+ fragments.append({
+ 'frag_index': frag_index,
+ 'url': frag_url,
+ 'decrypt_info': decrypt_info,
+ 'byte_range': byte_range,
+ 'media_sequence': media_sequence,
+ })
elif line.startswith('#EXT-X-MAP'):
if format_index and discontinuity_count != format_index:
else compat_urlparse.urljoin(man_url, map_info.get('URI')))
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
- if real_downloader:
- fragments.append({
- 'url': frag_url,
- 'decrypt_info': decrypt_info,
- })
- continue
+
+ fragments.append({
+ 'frag_index': frag_index,
+ 'url': frag_url,
+ 'decrypt_info': decrypt_info,
+ 'byte_range': byte_range,
+ 'media_sequence': media_sequence
+ })
if map_info.get('BYTERANGE'):
splitted_byte_range = map_info.get('BYTERANGE').split('@')
'start': sub_range_start,
'end': sub_range_start + int(splitted_byte_range[0]),
}
- download_frag = True
elif line.startswith('#EXT-X-KEY'):
decrypt_url = decrypt_info.get('URI')
ad_frag_next = False
elif line.startswith('#EXT-X-DISCONTINUITY'):
discontinuity_count += 1
+ i += 1
+ media_sequence += 1
- if download_frag:
- count = 0
- headers = info_dict.get('http_headers', {})
- if byte_range:
- headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
- while count <= fragment_retries:
- try:
- success, frag_content = self._download_fragment(
- ctx, frag_url, info_dict, headers)
- if not success:
- return False
- break
- except compat_urllib_error.HTTPError as err:
- # Unavailable (possibly temporary) fragments may be served.
- # First we try to retry then either skip or abort.
- # See https://github.com/ytdl-org/youtube-dl/issues/10165,
- # https://github.com/ytdl-org/youtube-dl/issues/10448).
- count += 1
- if count <= fragment_retries:
- self.report_retry_fragment(err, frag_index, count, fragment_retries)
- if count > fragment_retries:
- if skip_unavailable_fragments:
- i += 1
- media_sequence += 1
- self.report_skip_fragment(frag_index)
- continue
- self.report_error(
- 'giving up after %s fragment retries' % fragment_retries)
- return False
-
- if decrypt_info['METHOD'] == 'AES-128':
- iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
- decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
- self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
- # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
- # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
- # not what it decrypts to.
- if not test:
- frag_content = AES.new(
- decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
- self._append_fragment(ctx, frag_content)
- # We only download the first fragment during the test
- if test:
- break
- i += 1
- media_sequence += 1
+ # We only download the first fragment during the test
+ if test:
+ fragments = [fragments[0] if fragments else None]
if real_downloader:
info_copy = info_dict.copy()
if not success:
return False
else:
+ def download_fragment(fragment):
+ frag_index = fragment['frag_index']
+ frag_url = fragment['url']
+ decrypt_info = fragment['decrypt_info']
+ byte_range = fragment['byte_range']
+ media_sequence = fragment['media_sequence']
+
+ ctx['fragment_index'] = frag_index
+
+ count = 0
+ headers = info_dict.get('http_headers', {})
+ if byte_range:
+ headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
+ while count <= fragment_retries:
+ try:
+ success, frag_content = self._download_fragment(
+ ctx, frag_url, info_dict, headers)
+ if not success:
+ return False, frag_index
+ break
+ except compat_urllib_error.HTTPError as err:
+ # Unavailable (possibly temporary) fragments may be served.
+ # First we try to retry then either skip or abort.
+ # See https://github.com/ytdl-org/youtube-dl/issues/10165,
+ # https://github.com/ytdl-org/youtube-dl/issues/10448).
+ count += 1
+ if count <= fragment_retries:
+ self.report_retry_fragment(err, frag_index, count, fragment_retries)
+ if count > fragment_retries:
+ return False, frag_index
+
+ if decrypt_info['METHOD'] == 'AES-128':
+ iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
+ decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
+ self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
+ # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
+ # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
+ # not what it decrypts to.
+ if not test:
+ frag_content = AES.new(
+ decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
+
+ return frag_content, frag_index
+
+ def append_fragment(frag_content, frag_index):
+ if frag_content:
+ fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
+ try:
+ file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
+ ctx['fragment_filename_sanitized'] = frag_sanitized
+ file.close()
+ self._append_fragment(ctx, frag_content)
+ return True
+ except FileNotFoundError:
+ if skip_unavailable_fragments:
+ self.report_skip_fragment(frag_index)
+ return True
+ else:
+ self.report_error(
+ 'fragment %s not found, unable to continue' % frag_index)
+ return False
+ else:
+ if skip_unavailable_fragments:
+ self.report_skip_fragment(frag_index)
+ return True
+ else:
+ self.report_error(
+ 'fragment %s not found, unable to continue' % frag_index)
+ return False
+
+ max_workers = self.params.get('concurrent_fragment_downloads', 1)
+ if can_threaded_download and max_workers > 1:
+ self.report_warning('The download speed shown is only of one thread. This is a known issue')
+ with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
+ futures = [pool.submit(download_fragment, fragment) for fragment in fragments]
+ # timeout must be 0 to return instantly
+ done, not_done = concurrent.futures.wait(futures, timeout=0)
+ try:
+ while not_done:
+ # Check every 1 second for KeyboardInterrupt
+ freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
+ done |= freshly_done
+ except KeyboardInterrupt:
+ for future in not_done:
+ future.cancel()
+ # timeout must be none to cancel
+ concurrent.futures.wait(not_done, timeout=None)
+ raise KeyboardInterrupt
+ results = [future.result() for future in futures]
+
+ for frag_content, frag_index in results:
+ result = append_fragment(frag_content, frag_index)
+ if not result:
+ return False
+ else:
+ for fragment in fragments:
+ frag_content, frag_index = download_fragment(fragment)
+ result = append_fragment(frag_content, frag_index)
+ if not result:
+ return False
+
self._finish_frag_download(ctx)
return True
help='Languages of the subtitles to download (optional) separated by commas, use --list-subs for available language tags')
downloader = optparse.OptionGroup(parser, 'Download Options')
+ downloader.add_option(
+ '-N', '--concurrent-fragments',
+ dest='concurrent_fragment_downloads', metavar='N', default=1, type=int,
+ help='Number of fragments to download concurrently (default is %default)')
downloader.add_option(
'-r', '--limit-rate', '--rate-limit',
dest='ratelimit', metavar='RATE',