]> jfr.im git - yt-dlp.git/commitdiff
[dash,youtube] Download live from start to end (#888)
authorThe Hatsune Daishi <redacted>
Mon, 20 Dec 2021 06:06:46 +0000 (15:06 +0900)
committerGitHub <redacted>
Mon, 20 Dec 2021 06:06:46 +0000 (11:36 +0530)
* Add option `--live-from-start` to enable downloading live videos from start
* Add key `is_from_start` in formats to identify formats (of live videos) that downloads from start
* [dash] Create protocol `http_dash_segments_generator` that allows a function to be passed instead of fragments
* [fragment] Allow multiple live dash formats to download simultaneously
* [youtube] Implement fragment re-fetching for the live dash formats
* [youtube] Re-extract dash manifest every 5 hours (manifest expires in 6hrs)
* [postprocessor/ffmpeg] Add `FFmpegFixupDuplicateMoovPP` to fixup duplicated moov atoms

Known issue: Ctrl+C doesn't work on Windows when downloading multiple formats

Closes #1521
Authored by: nao20010128nao, pukkandan

15 files changed:
README.md
yt_dlp/YoutubeDL.py
yt_dlp/__init__.py
yt_dlp/downloader/__init__.py
yt_dlp/downloader/dash.py
yt_dlp/downloader/f4m.py
yt_dlp/downloader/fragment.py
yt_dlp/extractor/common.py
yt_dlp/extractor/youtube.py
yt_dlp/minicurses.py
yt_dlp/options.py
yt_dlp/postprocessor/__init__.py
yt_dlp/postprocessor/common.py
yt_dlp/postprocessor/ffmpeg.py
yt_dlp/utils.py

index ef83b8e3bcd92c913c74e2731e792c301340ddbc..6311157dfc7bd8f26fa0f11f71bad7313339450a 100644 (file)
--- a/README.md
+++ b/README.md
@@ -88,6 +88,7 @@ # NEW FEATURES
     * Redirect channel's home URL automatically to `/video` to preserve the old behaviour
     * `255kbps` audio is extracted (if available) from youtube music when premium cookies are given
     * Youtube music Albums, channels etc can be downloaded ([except self-uploaded music](https://github.com/yt-dlp/yt-dlp/issues/723))
+    * Download livestreams from the start using `--live-from-start`
 
 * **Cookies from browser**: Cookies can be automatically extracted from all major web browsers using `--cookies-from-browser BROWSER[:PROFILE]`
 
@@ -340,6 +341,10 @@ ## General Options:
     --flat-playlist                  Do not extract the videos of a playlist,
                                      only list them
     --no-flat-playlist               Extract the videos of a playlist
+    --live-from-start                Download livestreams from the start.
+                                     Currently only supported for YouTube
+    --no-live-from-start             Download livestreams from the current
+                                     time (default)
     --wait-for-video MIN[-MAX]       Wait for scheduled streams to become
                                      available. Pass the minimum number of
                                      seconds (or range) to wait between retries
@@ -1585,7 +1590,7 @@ #### youtube
 * `skip`: `hls` or `dash` (or both) to skip download of the respective manifests
 * `player_client`: Clients to extract video data from. The main clients are `web`, `android`, `ios`, `mweb`. These also have `_music`, `_embedded`, `_agegate`, and `_creator` variants (Eg: `web_embedded`) (`mweb` has only `_agegate`). By default, `android,web` is used, but the agegate and creator variants are added as required for age-gated videos. Similarly the music variants are added for `music.youtube.com` urls. You can also use `all` to use all the clients, and `default` for the default clients.
 * `player_skip`: Skip some network requests that are generally needed for robust extraction. One or more of `configs` (skip client configs), `webpage` (skip initial webpage), `js` (skip js player). While these options can help reduce the number of requests needed or avoid some rate-limiting, they could cause some issues. See [#860](https://github.com/yt-dlp/yt-dlp/pull/860) for more details
-* `include_live_dash`: Include live dash formats (These formats don't download properly)
+* `include_live_dash`: Include live dash formats even without `--live-from-start` (These formats don't download properly)
 * `comment_sort`: `top` or `new` (default) - choose comment sorting mode (on YouTube's side)
 * `max_comments`: Limit the amount of comments to gather. Comma-separated list of integers representing `max-comments,max-parents,max-replies,max-replies-per-thread`. Default is `all,all,all,all`.
     * E.g. `all,all,1000,10` will get a maximum of 1000 replies total, with up to 10 replies per thread. `1000,all,100` will get a maximum of 1000 comments, with a maximum of 100 replies total.
index 80d779beef71f59eb7ea6744459209aa12e24692..b5d4380963d4f5b115b4d0383fb18f1ac71dc99f 100644 (file)
@@ -5,7 +5,6 @@
 
 import collections
 import contextlib
-import copy
 import datetime
 import errno
 import fileinput
 from .postprocessor import (
     get_postprocessor,
     EmbedThumbnailPP,
+    FFmpegFixupDuplicateMoovPP,
     FFmpegFixupDurationPP,
     FFmpegFixupM3u8PP,
     FFmpegFixupM4aPP,
@@ -1107,7 +1107,7 @@ def get_value(mdict):
         def _dumpjson_default(obj):
             if isinstance(obj, (set, LazyList)):
                 return list(obj)
-            raise TypeError(f'Object of type {type(obj).__name__} is not JSON serializable')
+            return repr(obj)
 
         def create_key(outer_mobj):
             if not outer_mobj.group('has_key'):
@@ -2071,8 +2071,7 @@ def selector_function(ctx):
                 selector_1, selector_2 = map(_build_selector_function, selector.selector)
 
                 def selector_function(ctx):
-                    for pair in itertools.product(
-                            selector_1(copy.deepcopy(ctx)), selector_2(copy.deepcopy(ctx))):
+                    for pair in itertools.product(selector_1(ctx), selector_2(ctx)):
                         yield _merge(pair)
 
             elif selector.type == SINGLE:  # atom
@@ -2142,7 +2141,7 @@ def selector_function(ctx):
             filters = [self._build_format_filter(f) for f in selector.filters]
 
             def final_selector(ctx):
-                ctx_copy = copy.deepcopy(ctx)
+                ctx_copy = dict(ctx)
                 for _filter in filters:
                     ctx_copy['formats'] = list(filter(_filter, ctx_copy['formats']))
                 return selector_function(ctx_copy)
@@ -2354,6 +2353,10 @@ def sanitize_numeric_fields(info):
         if not self.params.get('allow_unplayable_formats'):
             formats = [f for f in formats if not f.get('has_drm')]
 
+        if info_dict.get('is_live'):
+            get_from_start = bool(self.params.get('live_from_start'))
+            formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
+
         if not formats:
             self.raise_no_formats(info_dict)
 
@@ -2660,7 +2663,9 @@ def dl(self, name, info, subtitle=False, test=False):
             urls = '", "'.join([f['url'] for f in info.get('requested_formats', [])] or [info['url']])
             self.write_debug('Invoking downloader on "%s"' % urls)
 
-        new_info = copy.deepcopy(self._copy_infodict(info))
+        # Note: Ideally info should be a deep-copied so that hooks cannot modify it.
+        # But it may contain objects that are not deep-copyable
+        new_info = self._copy_infodict(info)
         if new_info.get('http_headers') is None:
             new_info['http_headers'] = self._calc_headers(new_info)
         return fd.download(name, new_info, subtitle)
@@ -2675,7 +2680,7 @@ def process_info(self, info_dict):
             if self._num_downloads >= int(max_downloads):
                 raise MaxDownloadsReached()
 
-        if info_dict.get('is_live'):
+        if info_dict.get('is_live') and not self.params.get('live_from_start'):
             info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
 
         # TODO: backward compatibility, to be removed
@@ -2889,15 +2894,22 @@ def correct_ext(filename, ext=new_ext):
                     dl_filename = existing_file(full_filename, temp_filename)
                     info_dict['__real_download'] = False
 
+                    downloaded = []
+                    merger = FFmpegMergerPP(self)
+
+                    fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-')
                     if dl_filename is not None:
                         self.report_file_already_downloaded(dl_filename)
-                    elif get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-'):
+                    elif fd:
+                        for f in requested_formats if fd != FFmpegFD else []:
+                            f['filepath'] = fname = prepend_extension(
+                                correct_ext(temp_filename, info_dict['ext']),
+                                'f%s' % f['format_id'], info_dict['ext'])
+                            downloaded.append(fname)
                         info_dict['url'] = '\n'.join(f['url'] for f in requested_formats)
                         success, real_download = self.dl(temp_filename, info_dict)
                         info_dict['__real_download'] = real_download
                     else:
-                        downloaded = []
-                        merger = FFmpegMergerPP(self)
                         if self.params.get('allow_unplayable_formats'):
                             self.report_warning(
                                 'You have requested merging of multiple formats '
@@ -2909,7 +2921,7 @@ def correct_ext(filename, ext=new_ext):
                                 'The formats won\'t be merged.')
 
                         if temp_filename == '-':
-                            reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict)
+                            reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params)
                                       else 'but the formats are incompatible for simultaneous download' if merger.available
                                       else 'but ffmpeg is not installed')
                             self.report_warning(
@@ -2931,14 +2943,15 @@ def correct_ext(filename, ext=new_ext):
                             partial_success, real_download = self.dl(fname, new_info)
                             info_dict['__real_download'] = info_dict['__real_download'] or real_download
                             success = success and partial_success
-                        if merger.available and not self.params.get('allow_unplayable_formats'):
-                            info_dict['__postprocessors'].append(merger)
-                            info_dict['__files_to_merge'] = downloaded
-                            # Even if there were no downloads, it is being merged only now
-                            info_dict['__real_download'] = True
-                        else:
-                            for file in downloaded:
-                                files_to_move[file] = None
+
+                    if downloaded and merger.available and not self.params.get('allow_unplayable_formats'):
+                        info_dict['__postprocessors'].append(merger)
+                        info_dict['__files_to_merge'] = downloaded
+                        # Even if there were no downloads, it is being merged only now
+                        info_dict['__real_download'] = True
+                    else:
+                        for file in downloaded:
+                            files_to_move[file] = None
                 else:
                     # Just a single file
                     dl_filename = existing_file(full_filename, temp_filename)
@@ -3005,9 +3018,14 @@ def ffmpeg_fixup(cndn, msg, cls):
 
                     downloader = get_suitable_downloader(info_dict, self.params) if 'protocol' in info_dict else None
                     downloader = downloader.__name__ if downloader else None
-                    ffmpeg_fixup(info_dict.get('requested_formats') is None and downloader == 'HlsFD',
-                                 'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
-                                 FFmpegFixupM3u8PP)
+
+                    if info_dict.get('requested_formats') is None:  # Not necessary if doing merger
+                        ffmpeg_fixup(downloader == 'HlsFD',
+                                     'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
+                                     FFmpegFixupM3u8PP)
+                        ffmpeg_fixup(info_dict.get('is_live') and downloader == 'DashSegmentsFD',
+                                     'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP)
+
                     ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed timestamps detected', FFmpegFixupTimestampPP)
                     ffmpeg_fixup(downloader == 'WebSocketFragmentFD', 'Malformed duration detected', FFmpegFixupDurationPP)
 
@@ -3104,10 +3122,17 @@ def sanitize_info(info_dict, remove_private_keys=False):
                 k.startswith('_') or k in remove_keys or v in empty_values)
         else:
             reject = lambda k, v: k in remove_keys
-        filter_fn = lambda obj: (
-            list(map(filter_fn, obj)) if isinstance(obj, (LazyList, list, tuple, set))
-            else obj if not isinstance(obj, dict)
-            else dict((k, filter_fn(v)) for k, v in obj.items() if not reject(k, v)))
+
+        def filter_fn(obj):
+            if isinstance(obj, dict):
+                return {k: filter_fn(v) for k, v in obj.items() if not reject(k, v)}
+            elif isinstance(obj, (list, tuple, set, LazyList)):
+                return list(map(filter_fn, obj))
+            elif obj is None or isinstance(obj, (str, int, float, bool)):
+                return obj
+            else:
+                return repr(obj)
+
         return filter_fn(info_dict)
 
     @staticmethod
index 3dccdb186f574fe7f103e4ff485206f7175c8615..ab68f26c006e2797cee683d3df50c50a5ea574b3 100644 (file)
@@ -745,6 +745,7 @@ def report_deprecation(val, old, new=None):
         'youtube_include_hls_manifest': opts.youtube_include_hls_manifest,
         'encoding': opts.encoding,
         'extract_flat': opts.extract_flat,
+        'live_from_start': opts.live_from_start,
         'wait_for_video': opts.wait_for_video,
         'mark_watched': opts.mark_watched,
         'merge_output_format': opts.merge_output_format,
index 5270e80812155d030fb4d87358701b54869ab680..acc19f43a64702a1eb0edfc3fe465b9fb7b90f1c 100644 (file)
@@ -12,10 +12,15 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
     info_copy = info_dict.copy()
     info_copy['to_stdout'] = to_stdout
 
-    downloaders = [_get_suitable_downloader(info_copy, proto, params, default)
-                   for proto in (protocol or info_copy['protocol']).split('+')]
+    protocols = (protocol or info_copy['protocol']).split('+')
+    downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols]
+
     if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params):
         return FFmpegFD
+    elif (set(downloaders) == {DashSegmentsFD}
+          and not (to_stdout and len(protocols) > 1)
+          and set(protocols) == {'http_dash_segments_generator'}):
+        return DashSegmentsFD
     elif len(downloaders) == 1:
         return downloaders[0]
     return None
@@ -49,6 +54,7 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
     'rtsp': RtspFD,
     'f4m': F4mFD,
     'http_dash_segments': DashSegmentsFD,
+    'http_dash_segments_generator': DashSegmentsFD,
     'ism': IsmFD,
     'mhtml': MhtmlFD,
     'niconico_dmc': NiconicoDmcFD,
@@ -63,6 +69,7 @@ def shorten_protocol_name(proto, simplify=False):
         'm3u8_native': 'm3u8_n',
         'rtmp_ffmpeg': 'rtmp_f',
         'http_dash_segments': 'dash',
+        'http_dash_segments_generator': 'dash_g',
         'niconico_dmc': 'dmc',
         'websocket_frag': 'WSfrag',
     }
@@ -71,6 +78,7 @@ def shorten_protocol_name(proto, simplify=False):
             'https': 'http',
             'ftps': 'ftp',
             'm3u8_native': 'm3u8',
+            'http_dash_segments_generator': 'dash',
             'rtmp_ffmpeg': 'rtmp',
             'm3u8_frag_urls': 'm3u8',
             'dash_frag_urls': 'dash',
index 6444ad692886660db2d92563d85e35608d8ab3ac..8dd43f4fa8039bfa864d7e6315d661dd90cf00fe 100644 (file)
@@ -1,4 +1,5 @@
 from __future__ import unicode_literals
+import time
 
 from ..downloader import get_suitable_downloader
 from .fragment import FragmentFD
@@ -15,27 +16,53 @@ class DashSegmentsFD(FragmentFD):
     FD_NAME = 'dashsegments'
 
     def real_download(self, filename, info_dict):
-        if info_dict.get('is_live'):
+        if info_dict.get('is_live') and set(info_dict['protocol'].split('+')) != {'http_dash_segments_generator'}:
             self.report_error('Live DASH videos are not supported')
 
-        fragment_base_url = info_dict.get('fragment_base_url')
-        fragments = info_dict['fragments'][:1] if self.params.get(
-            'test', False) else info_dict['fragments']
-
+        real_start = time.time()
         real_downloader = get_suitable_downloader(
             info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-'))
 
-        ctx = {
-            'filename': filename,
-            'total_frags': len(fragments),
-        }
+        requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])]
+        args = []
+        for fmt in requested_formats or [info_dict]:
+            try:
+                fragment_count = 1 if self.params.get('test') else len(fmt['fragments'])
+            except TypeError:
+                fragment_count = None
+            ctx = {
+                'filename': fmt.get('filepath') or filename,
+                'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'),
+                'total_frags': fragment_count,
+            }
+
+            if real_downloader:
+                self._prepare_external_frag_download(ctx)
+            else:
+                self._prepare_and_start_frag_download(ctx, fmt)
+            ctx['start'] = real_start
+
+            fragments_to_download = self._get_fragments(fmt, ctx)
+
+            if real_downloader:
+                self.to_screen(
+                    '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename()))
+                info_dict['fragments'] = fragments_to_download
+                fd = real_downloader(self.ydl, self.params)
+                return fd.real_download(filename, info_dict)
+
+            args.append([ctx, fragments_to_download, fmt])
 
-        if real_downloader:
-            self._prepare_external_frag_download(ctx)
-        else:
-            self._prepare_and_start_frag_download(ctx, info_dict)
+        return self.download_and_append_fragments_multiple(*args)
+
+    def _resolve_fragments(self, fragments, ctx):
+        fragments = fragments(ctx) if callable(fragments) else fragments
+        return [next(fragments)] if self.params.get('test') else fragments
+
+    def _get_fragments(self, fmt, ctx):
+        fragment_base_url = fmt.get('fragment_base_url')
+        fragments = self._resolve_fragments(fmt['fragments'], ctx)
 
-        fragments_to_download = []
         frag_index = 0
         for i, fragment in enumerate(fragments):
             frag_index += 1
@@ -46,17 +73,8 @@ def real_download(self, filename, info_dict):
                 assert fragment_base_url
                 fragment_url = urljoin(fragment_base_url, fragment['path'])
 
-            fragments_to_download.append({
+            yield {
                 'frag_index': frag_index,
                 'index': i,
                 'url': fragment_url,
-            })
-
-        if real_downloader:
-            self.to_screen(
-                '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename()))
-            info_dict['fragments'] = fragments_to_download
-            fd = real_downloader(self.ydl, self.params)
-            return fd.real_download(filename, info_dict)
-
-        return self.download_and_append_fragments(ctx, fragments_to_download, info_dict)
+            }
index 9da2776d92c60d2e9564005c7135539cb023a91b..0008b7c286f171b8aacda173f04cd916014d7cc0 100644 (file)
@@ -366,7 +366,7 @@ def real_download(self, filename, info_dict):
         ctx = {
             'filename': filename,
             'total_frags': total_frags,
-            'live': live,
+            'live': bool(live),
         }
 
         self._prepare_frag_download(ctx)
index 04b0f68c0f221775e290246008bc0d5dacbdfc35..79c6561c7f3175d431529cab2cb2ba2bc2ee0e2f 100644 (file)
@@ -1,9 +1,10 @@
 from __future__ import division, unicode_literals
 
+import http.client
+import json
+import math
 import os
 import time
-import json
-from math import ceil
 
 try:
     import concurrent.futures
@@ -15,6 +16,7 @@
 from .http import HttpFD
 from ..aes import aes_cbc_decrypt_bytes
 from ..compat import (
+    compat_os_name,
     compat_urllib_error,
     compat_struct_pack,
 )
@@ -90,7 +92,7 @@ def _prepare_and_start_frag_download(self, ctx, info_dict):
         self._start_frag_download(ctx, info_dict)
 
     def __do_ytdl_file(self, ctx):
-        return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file')
+        return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
 
     def _read_ytdl_file(self, ctx):
         assert 'ytdl_corrupt' not in ctx
@@ -375,17 +377,20 @@ def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_f
         @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
                 all args must be either tuple or list
         '''
+        interrupt_trigger = [True]
         max_progress = len(args)
         if max_progress == 1:
             return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
-        max_workers = self.params.get('concurrent_fragment_downloads', max_progress)
+        max_workers = self.params.get('concurrent_fragment_downloads', 1)
         if max_progress > 1:
             self._prepare_multiline_status(max_progress)
 
         def thread_func(idx, ctx, fragments, info_dict, tpe):
             ctx['max_progress'] = max_progress
             ctx['progress_idx'] = idx
-            return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe)
+            return self.download_and_append_fragments(
+                ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
+                tpe=tpe, interrupt_trigger=interrupt_trigger)
 
         class FTPE(concurrent.futures.ThreadPoolExecutor):
             # has to stop this or it's going to wait on the worker thread itself
@@ -393,8 +398,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
                 pass
 
         spins = []
+        if compat_os_name == 'nt':
+            self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. '
+                                'This is a known issue and patches are welcome')
         for idx, (ctx, fragments, info_dict) in enumerate(args):
-            tpe = FTPE(ceil(max_workers / max_progress))
+            tpe = FTPE(math.ceil(max_workers / max_progress))
             job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe)
             spins.append((tpe, job))
 
@@ -402,18 +410,32 @@ def __exit__(self, exc_type, exc_val, exc_tb):
         for tpe, job in spins:
             try:
                 result = result and job.result()
+            except KeyboardInterrupt:
+                interrupt_trigger[0] = False
             finally:
                 tpe.shutdown(wait=True)
+        if not interrupt_trigger[0]:
+            raise KeyboardInterrupt()
         return result
 
-    def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None):
+    def download_and_append_fragments(
+            self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
+            tpe=None, interrupt_trigger=None):
+        if not interrupt_trigger:
+            interrupt_trigger = (True, )
+
         fragment_retries = self.params.get('fragment_retries', 0)
-        is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)
+        is_fatal = (
+            ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
+            if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
+
         if not pack_func:
             pack_func = lambda frag_content, _: frag_content
 
         def download_fragment(fragment, ctx):
             frag_index = ctx['fragment_index'] = fragment['frag_index']
+            if not interrupt_trigger[0]:
+                return False, frag_index
             headers = info_dict.get('http_headers', {}).copy()
             byte_range = fragment.get('byte_range')
             if byte_range:
@@ -428,7 +450,7 @@ def download_fragment(fragment, ctx):
                     if not success:
                         return False, frag_index
                     break
-                except compat_urllib_error.HTTPError as err:
+                except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err:
                     # Unavailable (possibly temporary) fragments may be served.
                     # First we try to retry then either skip or abort.
                     # See https://github.com/ytdl-org/youtube-dl/issues/10165,
@@ -466,7 +488,8 @@ def append_fragment(frag_content, frag_index, ctx):
 
         decrypt_fragment = self.decrypter(info_dict)
 
-        max_workers = self.params.get('concurrent_fragment_downloads', 1)
+        max_workers = math.ceil(
+            self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
         if can_threaded_download and max_workers > 1:
 
             def _download_fragment(fragment):
@@ -477,6 +500,8 @@ def _download_fragment(fragment):
             self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
             with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
                 for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments):
+                    if not interrupt_trigger[0]:
+                        break
                     ctx['fragment_filename_sanitized'] = frag_filename
                     ctx['fragment_index'] = frag_index
                     result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
@@ -484,6 +509,8 @@ def _download_fragment(fragment):
                         return False
         else:
             for fragment in fragments:
+                if not interrupt_trigger[0]:
+                    break
                 frag_content, frag_index = download_fragment(fragment, ctx)
                 result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx)
                 if not result:
index 52099b4b428a82e74b5ba807798eb4a1ea0c96e7..9abbaf04f50b45864d2f63ec6943469da32e83e0 100644 (file)
@@ -163,9 +163,8 @@ class InfoExtractor(object):
                     * filesize_approx  An estimate for the number of bytes
                     * player_url SWF Player URL (used for rtmpdump).
                     * protocol   The protocol that will be used for the actual
-                                 download, lower-case.
-                                 "http", "https", "rtsp", "rtmp", "rtmp_ffmpeg", "rtmpe",
-                                 "m3u8", "m3u8_native" or "http_dash_segments".
+                                 download, lower-case. One of "http", "https" or
+                                 one of the protocols defined in downloader.PROTOCOL_MAP
                     * fragment_base_url
                                  Base URL for fragments. Each fragment's path
                                  value (if present) will be relative to
@@ -181,6 +180,8 @@ class InfoExtractor(object):
                                             fragment_base_url
                                  * "duration" (optional, int or float)
                                  * "filesize" (optional, int)
+                    * is_from_start  Is a live format that can be downloaded
+                                from the start. Boolean
                     * preference Order number of this format. If this field is
                                  present and not None, the formats get sorted
                                  by this field, regardless of all other values.
index 5a3b98bb5ca8e23a1bd32d7a2de6044945b98824..1f500939908c3c14cfc51f1284385da4f884b910 100644 (file)
@@ -5,6 +5,7 @@
 import calendar
 import copy
 import datetime
+import functools
 import hashlib
 import itertools
 import json
@@ -15,6 +16,7 @@
 import sys
 import time
 import traceback
+import threading
 
 from .common import InfoExtractor, SearchInfoExtractor
 from ..compat import (
@@ -1747,6 +1749,142 @@ def __init__(self, *args, **kwargs):
         self._code_cache = {}
         self._player_cache = {}
 
+    def _prepare_live_from_start_formats(self, formats, video_id, live_start_time, url, webpage_url, smuggled_data):
+        EXPIRATION_DURATION = 18_000
+        lock = threading.Lock()
+
+        is_live = True
+        expiration_time = time.time() + EXPIRATION_DURATION
+        formats = [f for f in formats if f.get('is_from_start')]
+
+        def refetch_manifest(format_id):
+            nonlocal formats, expiration_time, is_live
+            if time.time() <= expiration_time:
+                return
+
+            _, _, prs, player_url = self._download_player_responses(url, smuggled_data, video_id, webpage_url)
+            video_details = traverse_obj(
+                prs, (..., 'videoDetails'), expected_type=dict, default=[])
+            microformats = traverse_obj(
+                prs, (..., 'microformat', 'playerMicroformatRenderer'),
+                expected_type=dict, default=[])
+            _, is_live, _, formats = self._list_formats(video_id, microformats, video_details, prs, player_url)
+            expiration_time = time.time() + EXPIRATION_DURATION
+
+        def mpd_feed(format_id):
+            """
+            @returns (manifest_url, manifest_stream_number, is_live) or None
+            """
+            with lock:
+                refetch_manifest(format_id)
+
+            f = next((f for f in formats if f['format_id'] == format_id), None)
+            if not f:
+                self.report_warning(
+                    f'Cannot find refreshed manifest for format {format_id}{bug_reports_message()}')
+                return None
+            return f['manifest_url'], f['manifest_stream_number'], is_live
+
+        for f in formats:
+            f['protocol'] = 'http_dash_segments_generator'
+            f['fragments'] = functools.partial(
+                self._live_dash_fragments, f['format_id'], live_start_time, mpd_feed)
+
+    def _live_dash_fragments(self, format_id, live_start_time, mpd_feed, ctx):
+        FETCH_SPAN, MAX_DURATION = 5, 432000
+
+        mpd_url, stream_number, is_live = None, None, True
+
+        begin_index = 0
+        download_start_time = ctx.get('start') or time.time()
+
+        lack_early_segments = download_start_time - (live_start_time or download_start_time) > MAX_DURATION
+        if lack_early_segments:
+            self.report_warning(bug_reports_message(
+                'Starting download from the last 120 hours of the live stream since '
+                'YouTube does not have data before that. If you think this is wrong,'), only_once=True)
+            lack_early_segments = True
+
+        known_idx, no_fragment_score, last_segment_url = begin_index, 0, None
+        fragments, fragment_base_url = None, None
+
+        def _extract_sequence_from_mpd(refresh_sequence):
+            nonlocal mpd_url, stream_number, is_live, no_fragment_score, fragments, fragment_base_url
+            # Obtain from MPD's maximum seq value
+            old_mpd_url = mpd_url
+            mpd_url, stream_number, is_live = mpd_feed(format_id) or (mpd_url, stream_number, False)
+            if old_mpd_url == mpd_url and not refresh_sequence:
+                return True, last_seq
+            try:
+                fmts, _ = self._extract_mpd_formats_and_subtitles(
+                    mpd_url, None, note=False, errnote=False, fatal=False)
+            except ExtractorError:
+                fmts = None
+            if not fmts:
+                no_fragment_score += 1
+                return False, last_seq
+            fmt_info = next(x for x in fmts if x['manifest_stream_number'] == stream_number)
+            fragments = fmt_info['fragments']
+            fragment_base_url = fmt_info['fragment_base_url']
+            assert fragment_base_url
+
+            _last_seq = int(re.search(r'(?:/|^)sq/(\d+)', fragments[-1]['path']).group(1))
+            return True, _last_seq
+
+        while is_live:
+            fetch_time = time.time()
+            if no_fragment_score > 30:
+                return
+            if last_segment_url:
+                # Obtain from "X-Head-Seqnum" header value from each segment
+                try:
+                    urlh = self._request_webpage(
+                        last_segment_url, None, note=False, errnote=False, fatal=False)
+                except ExtractorError:
+                    urlh = None
+                last_seq = try_get(urlh, lambda x: int_or_none(x.headers['X-Head-Seqnum']))
+                if last_seq is None:
+                    no_fragment_score += 1
+                    last_segment_url = None
+                    continue
+            else:
+                should_retry, last_seq = _extract_sequence_from_mpd(True)
+                if not should_retry:
+                    continue
+
+            if known_idx > last_seq:
+                last_segment_url = None
+                continue
+
+            last_seq += 1
+
+            if begin_index < 0 and known_idx < 0:
+                # skip from the start when it's negative value
+                known_idx = last_seq + begin_index
+            if lack_early_segments:
+                known_idx = max(known_idx, last_seq - int(MAX_DURATION // fragments[-1]['duration']))
+            try:
+                for idx in range(known_idx, last_seq):
+                    # do not update sequence here or you'll get skipped some part of it
+                    should_retry, _ = _extract_sequence_from_mpd(False)
+                    if not should_retry:
+                        # retry when it gets weird state
+                        known_idx = idx - 1
+                        raise ExtractorError('breaking out of outer loop')
+                    last_segment_url = urljoin(fragment_base_url, 'sq/%d' % idx)
+                    yield {
+                        'url': last_segment_url,
+                    }
+                if known_idx == last_seq:
+                    no_fragment_score += 5
+                else:
+                    no_fragment_score = 0
+                known_idx = last_seq
+            except ExtractorError:
+                continue
+
+            time.sleep(max(0, FETCH_SPAN + fetch_time - time.time()))
+
     def _extract_player_url(self, *ytcfgs, webpage=None):
         player_url = traverse_obj(
             ytcfgs, (..., 'PLAYER_JS_URL'), (..., 'WEB_PLAYER_CONTEXT_CONFIGS', ..., 'jsUrl'),
@@ -2548,11 +2686,13 @@ def _extract_formats(self, streaming_data, video_id, player_url, is_live):
                     dct['container'] = dct['ext'] + '_dash'
             yield dct
 
+        live_from_start = is_live and self.get_param('live_from_start')
         skip_manifests = self._configuration_arg('skip')
-        get_dash = (
-            (not is_live or self._configuration_arg('include_live_dash'))
-            and 'dash' not in skip_manifests and self.get_param('youtube_include_dash_manifest', True))
-        get_hls = 'hls' not in skip_manifests and self.get_param('youtube_include_hls_manifest', True)
+        if not self.get_param('youtube_include_hls_manifest', True):
+            skip_manifests.append('hls')
+        get_dash = 'dash' not in skip_manifests and (
+            not is_live or live_from_start or self._configuration_arg('include_live_dash'))
+        get_hls = not live_from_start and 'hls' not in skip_manifests
 
         def process_manifest_format(f, proto, itag):
             if itag in itags:
@@ -2583,6 +2723,9 @@ def process_manifest_format(f, proto, itag):
                     if process_manifest_format(f, 'dash', f['format_id']):
                         f['filesize'] = int_or_none(self._search_regex(
                             r'/clen/(\d+)', f.get('fragment_base_url') or f['url'], 'file size', default=None))
+                        if live_from_start:
+                            f['is_from_start'] = True
+
                         yield f
 
     def _extract_storyboard(self, player_responses, duration):
@@ -2620,12 +2763,7 @@ def _extract_storyboard(self, player_responses, duration):
                 } for j in range(math.ceil(fragment_count))],
             }
 
-    def _real_extract(self, url):
-        url, smuggled_data = unsmuggle_url(url, {})
-        video_id = self._match_id(url)
-
-        base_url = self.http_scheme() + '//www.youtube.com/'
-        webpage_url = base_url + 'watch?v=' + video_id
+    def _download_player_responses(self, url, smuggled_data, video_id, webpage_url):
         webpage = None
         if 'webpage' not in self._configuration_arg('player_skip'):
             webpage = self._download_webpage(
@@ -2637,6 +2775,28 @@ def _real_extract(self, url):
             self._get_requested_clients(url, smuggled_data),
             video_id, webpage, master_ytcfg)
 
+        return webpage, master_ytcfg, player_responses, player_url
+
+    def _list_formats(self, video_id, microformats, video_details, player_responses, player_url):
+        live_broadcast_details = traverse_obj(microformats, (..., 'liveBroadcastDetails'))
+        is_live = get_first(video_details, 'isLive')
+        if is_live is None:
+            is_live = get_first(live_broadcast_details, 'isLiveNow')
+
+        streaming_data = traverse_obj(player_responses, (..., 'streamingData'), default=[])
+        formats = list(self._extract_formats(streaming_data, video_id, player_url, is_live))
+
+        return live_broadcast_details, is_live, streaming_data, formats
+
+    def _real_extract(self, url):
+        url, smuggled_data = unsmuggle_url(url, {})
+        video_id = self._match_id(url)
+
+        base_url = self.http_scheme() + '//www.youtube.com/'
+        webpage_url = base_url + 'watch?v=' + video_id
+
+        webpage, master_ytcfg, player_responses, player_url = self._download_player_responses(url, smuggled_data, video_id, webpage_url)
+
         playability_statuses = traverse_obj(
             player_responses, (..., 'playabilityStatus'), expected_type=dict, default=[])
 
@@ -2705,13 +2865,7 @@ def feed_entry(name):
                 return self.playlist_result(
                     entries, video_id, video_title, video_description)
 
-        live_broadcast_details = traverse_obj(microformats, (..., 'liveBroadcastDetails'))
-        is_live = get_first(video_details, 'isLive')
-        if is_live is None:
-            is_live = get_first(live_broadcast_details, 'isLiveNow')
-
-        streaming_data = traverse_obj(player_responses, (..., 'streamingData'), default=[])
-        formats = list(self._extract_formats(streaming_data, video_id, player_url, is_live))
+        live_broadcast_details, is_live, streaming_data, formats = self._list_formats(video_id, microformats, video_details, player_responses, player_url)
 
         if not formats:
             if not self.get_param('allow_unplayable_formats') and traverse_obj(streaming_data, (..., 'licenseInfos')):
@@ -2814,10 +2968,13 @@ def feed_entry(name):
                 is_live = False
         if is_upcoming is None and (live_content or is_live):
             is_upcoming = False
-        live_starttime = parse_iso8601(get_first(live_broadcast_details, 'startTimestamp'))
-        live_endtime = parse_iso8601(get_first(live_broadcast_details, 'endTimestamp'))
-        if not duration and live_endtime and live_starttime:
-            duration = live_endtime - live_starttime
+        live_start_time = parse_iso8601(get_first(live_broadcast_details, 'startTimestamp'))
+        live_end_time = parse_iso8601(get_first(live_broadcast_details, 'endTimestamp'))
+        if not duration and live_end_time and live_start_time:
+            duration = live_end_time - live_start_time
+
+        if is_live and self.get_param('live_from_start'):
+            self._prepare_live_from_start_formats(formats, video_id, live_start_time, url, webpage_url, smuggled_data)
 
         formats.extend(self._extract_storyboard(player_responses, duration))
 
@@ -2860,7 +3017,7 @@ def feed_entry(name):
                          else None if is_live is None or is_upcoming is None
                          else live_content),
             'live_status': 'is_upcoming' if is_upcoming else None,  # rest will be set by YoutubeDL
-            'release_timestamp': live_starttime,
+            'release_timestamp': live_start_time,
         }
 
         pctr = traverse_obj(player_responses, (..., 'captions', 'playerCaptionsTracklistRenderer'), expected_type=dict)
index c81153c1e0c21e1dc2dfd88d89c53063760abcb5..f9f99e390135ba104e8d2c2377188b2e81b149be 100644 (file)
@@ -147,6 +147,7 @@ def _move_cursor(self, dest):
     def print_at_line(self, text, pos):
         if self._HAVE_FULLCAP:
             self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
+            return
 
         text = self._add_line_number(text, pos)
         textlen = len(text)
index f4293e6884a4c2a6264117144b4fb6d559fa919a..e3d753adfbd023174925cea92561f464050583c0 100644 (file)
@@ -258,6 +258,14 @@ def _dict_from_options_callback(
         '--no-flat-playlist',
         action='store_false', dest='extract_flat',
         help='Extract the videos of a playlist')
+    general.add_option(
+        '--live-from-start',
+        action='store_true', dest='live_from_start',
+        help='Download livestreams from the start. Currently only supported for YouTube')
+    general.add_option(
+        '--no-live-from-start',
+        action='store_false', dest='live_from_start',
+        help='Download livestreams from the current time (default)')
     general.add_option(
         '--wait-for-video',
         dest='wait_for_video', metavar='MIN[-MAX]', default=None,
index 4ae230d2f2ef0c598ec85f74cd138428c860f179..7f8adb3686d854b528722cb8fdafa1d13d2297cd 100644 (file)
@@ -9,6 +9,7 @@
     FFmpegPostProcessor,
     FFmpegEmbedSubtitlePP,
     FFmpegExtractAudioPP,
+    FFmpegFixupDuplicateMoovPP,
     FFmpegFixupDurationPP,
     FFmpegFixupStretchedPP,
     FFmpegFixupTimestampPP,
index ab9eb6acfc404e8e0cf654bbe9d7d73126b08840..f2467c542303b70b4609251a81ede8963a284cc9 100644 (file)
@@ -1,6 +1,5 @@
 from __future__ import unicode_literals
 
-import copy
 import functools
 import os
 
@@ -18,7 +17,7 @@ class PostProcessorMetaClass(type):
     def run_wrapper(func):
         @functools.wraps(func)
         def run(self, info, *args, **kwargs):
-            info_copy = copy.deepcopy(self._copy_infodict(info))
+            info_copy = self._copy_infodict(info)
             self._hook_progress({'status': 'started'}, info_copy)
             ret = func(self, info, *args, **kwargs)
             if ret is not None:
index 26af55a9b3dd1a1e4228139925e5a1fb10e5a0bc..594762974de91a225b674d257ca3fa7cbebd4e35 100644 (file)
@@ -908,13 +908,23 @@ def run(self, info):
         return [], info
 
 
-class FFmpegFixupDurationPP(FFmpegFixupPostProcessor):
+class FFmpegCopyStreamPostProcessor(FFmpegFixupPostProcessor):
+    MESSAGE = 'Copying stream'
+
     @PostProcessor._restrict_to(images=False)
     def run(self, info):
-        self._fixup('Fixing video duration', info['filepath'], ['-c', 'copy', '-map', '0', '-dn'])
+        self._fixup(self.MESSAGE, info['filepath'], ['-c', 'copy', '-map', '0', '-dn'])
         return [], info
 
 
+class FFmpegFixupDurationPP(FFmpegCopyStreamPostProcessor):
+    MESSAGE = 'Fixing video duration'
+
+
+class FFmpegFixupDuplicateMoovPP(FFmpegCopyStreamPostProcessor):
+    MESSAGE = 'Fixing duplicate MOOV atoms'
+
+
 class FFmpegSubtitlesConvertorPP(FFmpegPostProcessor):
     SUPPORTED_EXTS = ('srt', 'vtt', 'ass', 'lrc')
 
index 81c95f3e971123466de66f772955df14da701bd9..2919324c660a558aa8778d4c31064bd41ea7a53f 100644 (file)
@@ -2631,12 +2631,6 @@ def __reversed__(self):
     def __copy__(self):
         return type(self)(self.__iterable, reverse=self.__reversed, _cache=self.__cache)
 
-    def __deepcopy__(self, memo):
-        # FIXME: This is actually just a shallow copy
-        id_ = id(self)
-        memo[id_] = self.__copy__()
-        return memo[id_]
-
     def __repr__(self):
         # repr and str should mimic a list. So we exhaust the iterable
         return repr(self.exhaust())