]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/youtube.py
[youtube] Fix throttling by decrypting n-sig (#1437)
[yt-dlp.git] / yt_dlp / extractor / youtube.py
index 658b45fe14fe2697b96bb0c86b165f9c92605a7b..56cd2ed8d3a022992ace5b93c24bef400f9c2d69 100644 (file)
@@ -1720,7 +1720,7 @@ def _extract_player_info(cls, player_url):
             raise ExtractorError('Cannot identify player %r' % player_url)
         return id_m.group('id')
 
-    def _load_player(self, video_id, player_url, fatal=True) -> bool:
+    def _load_player(self, video_id, player_url, fatal=True):
         player_id = self._extract_player_info(player_url)
         if player_id not in self._code_cache:
             code = self._download_webpage(
@@ -1729,7 +1729,7 @@ def _load_player(self, video_id, player_url, fatal=True) -> bool:
                 errnote='Download of %s failed' % player_url)
             if code:
                 self._code_cache[player_id] = code
-        return player_id in self._code_cache
+        return self._code_cache.get(player_id)
 
     def _extract_signature_function(self, video_id, player_url, example_sig):
         player_id = self._extract_player_info(player_url)
@@ -1743,8 +1743,8 @@ def _extract_signature_function(self, video_id, player_url, example_sig):
         if cache_spec is not None:
             return lambda s: ''.join(s[i] for i in cache_spec)
 
-        if self._load_player(video_id, player_url):
-            code = self._code_cache[player_id]
+        code = self._load_player(video_id, player_url)
+        if code:
             res = self._parse_sig_js(code)
 
             test_string = ''.join(map(compat_chr, range(len(example_sig))))
@@ -1755,6 +1755,9 @@ def _extract_signature_function(self, video_id, player_url, example_sig):
             return res
 
     def _print_sig_code(self, func, example_sig):
+        if not self.get_param('youtube_print_sig_code'):
+            return
+
         def gen_sig_code(idxs):
             def _genslice(start, end, step):
                 starts = '' if start == 0 else str(start)
@@ -1831,13 +1834,58 @@ def _decrypt_signature(self, s, video_id, player_url):
                 )
                 self._player_cache[player_id] = func
             func = self._player_cache[player_id]
-            if self.get_param('youtube_print_sig_code'):
-                self._print_sig_code(func, s)
+            self._print_sig_code(func, s)
             return func(s)
         except Exception as e:
-            tb = traceback.format_exc()
-            raise ExtractorError(
-                'Signature extraction failed: ' + tb, cause=e)
+            raise ExtractorError('Signature extraction failed: ' + traceback.format_exc(), cause=e)
+
+    def _decrypt_nsig(self, s, video_id, player_url):
+        """Turn the encrypted n field into a working signature"""
+        if player_url is None:
+            raise ExtractorError('Cannot decrypt nsig without player_url')
+        if player_url.startswith('//'):
+            player_url = 'https:' + player_url
+        elif not re.match(r'https?://', player_url):
+            player_url = compat_urlparse.urljoin(
+                'https://www.youtube.com', player_url)
+
+        sig_id = ('nsig_value', s)
+        if sig_id in self._player_cache:
+            return self._player_cache[sig_id]
+
+        try:
+            player_id = ('nsig', player_url)
+            if player_id not in self._player_cache:
+                self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
+            func = self._player_cache[player_id]
+            self._player_cache[sig_id] = func(s)
+            self.write_debug(f'Decrypted nsig {s} => {self._player_cache[sig_id]}')
+            return self._player_cache[sig_id]
+        except Exception as e:
+            raise ExtractorError(traceback.format_exc(), cause=e)
+
+    def _extract_n_function_name(self, jscode):
+        return self._search_regex(
+            (r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]{3})\([a-zA-Z0-9]\)',),
+            jscode, 'Initial JS player n function name', group='nfunc')
+
+    def _extract_n_function(self, video_id, player_url):
+        player_id = self._extract_player_info(player_url)
+        func_code = self._downloader.cache.load('youtube-nsig', player_id)
+
+        if func_code:
+            jsi = JSInterpreter(func_code)
+        else:
+            jscode = self._load_player(video_id, player_url)
+            funcname = self._extract_n_function_name(jscode)
+            jsi = JSInterpreter(jscode)
+            func_code = jsi.extract_function_code(funcname)
+            self._downloader.cache.store('youtube-nsig', player_id, func_code)
+
+        if self.get_param('youtube_print_sig_code'):
+            self.to_screen(f'Extracted nsig function from {player_id}:\n{func_code[1]}\n')
+
+        return lambda s: jsi.extract_function_from_code(*func_code)([s])
 
     def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
         """
@@ -1856,9 +1904,8 @@ def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=F
                     raise ExtractorError(error_msg)
                 self.report_warning(error_msg)
                 return
-            if self._load_player(video_id, player_url, fatal=fatal):
-                player_id = self._extract_player_info(player_url)
-                code = self._code_cache[player_id]
+            code = self._load_player(video_id, player_url, fatal=fatal)
+            if code:
                 sts = int_or_none(self._search_regex(
                     r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code,
                     'JS player signature timestamp', group='sts', fatal=fatal))
@@ -2440,6 +2487,16 @@ def _extract_formats(self, streaming_data, video_id, player_url, is_live):
                 sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
                 fmt_url += '&' + sp + '=' + signature
 
+            query = parse_qs(fmt_url)
+            throttled = False
+            if query.get('ratebypass') != ['yes'] and query.get('n'):
+                try:
+                    fmt_url = update_url_query(fmt_url, {
+                        'n': self._decrypt_nsig(query['n'][0], video_id, player_url)})
+                except ExtractorError as e:
+                    self.report_warning(f'nsig extraction failed: You may experience throttling for some formats\n{e}', only_once=True)
+                    throttled = True
+
             if itag:
                 itags.append(itag)
                 stream_ids.append(stream_id)
@@ -2453,7 +2510,9 @@ def _extract_formats(self, streaming_data, video_id, player_url, is_live):
                 'format_note': ', '.join(filter(None, (
                     '%s%s' % (audio_track.get('displayName') or '',
                               ' (default)' if audio_track.get('audioIsDefault') else ''),
-                    fmt.get('qualityLabel') or quality.replace('audio_quality_', '')))),
+                    fmt.get('qualityLabel') or quality.replace('audio_quality_', ''),
+                    throttled and 'THROTTLED'))),
+                'source_preference': -10 if not throttled else -1,
                 'fps': int_or_none(fmt.get('fps')),
                 'height': height,
                 'quality': q(quality),
@@ -2645,12 +2704,6 @@ def feed_entry(name):
             if reason:
                 self.raise_no_formats(reason, expected=True)
 
-        for f in formats:
-            if '&c=WEB&' in f['url'] and '&ratebypass=yes&' not in f['url']:  # throttled
-                f['source_preference'] = -10
-                # TODO: this method is not reliable
-                f['format_note'] = format_field(f, 'format_note', '%s ') + '(maybe throttled)'
-
         # Source is given priority since formats that throttle are given lower source_preference
         # When throttling issue is fully fixed, remove this
         self._sort_formats(formats, ('quality', 'res', 'fps', 'hdr:12', 'source', 'codec:vp9.2', 'lang'))