]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/tiktok.py
[cleanup] Misc
[yt-dlp.git] / yt_dlp / extractor / tiktok.py
index 4926096c00316a528210b3aab882be82bd84b4f3..79a22386120dda2ff31430ed6e1f43621145acff 100644 (file)
@@ -1,7 +1,6 @@
 import itertools
 import json
 import random
-import re
 import string
 import time
 
@@ -12,6 +11,7 @@
     HEADRequest,
     LazyList,
     UnsupportedError,
+    get_element_by_id,
     get_first,
     int_or_none,
     join_nonempty,
@@ -25,7 +25,7 @@
 
 
 class TikTokBaseIE(InfoExtractor):
-    _APP_VERSIONS = [('20.9.3', '293'), ('20.4.3', '243'), ('20.2.1', '221'), ('20.1.2', '212'), ('20.0.4', '204')]
+    _APP_VERSIONS = [('26.1.3', '260103'), ('26.1.2', '260102'), ('26.1.1', '260101'), ('25.6.2', '250602')]
     _WORKING_APP_VERSION = None
     _APP_NAME = 'trill'
     _AID = 1180
@@ -38,6 +38,10 @@ class TikTokBaseIE(InfoExtractor):
     def _create_url(user_id, video_id):
         return f'https://www.tiktok.com/@{user_id or "_"}/video/{video_id}'
 
+    def _get_sigi_state(self, webpage, display_id):
+        return self._parse_json(get_element_by_id(
+            'SIGI_STATE|sigi-persisted-data', webpage, escape_value=False), display_id)
+
     def _call_api_impl(self, ep, query, manifest_app_version, video_id, fatal=True,
                        note='Downloading API JSON', errnote='Unable to download API page'):
         self._set_cookie(self._API_HOSTNAME, 'odin_tt', ''.join(random.choice('0123456789abcdef') for _ in range(160)))
@@ -263,6 +267,9 @@ def extract_addr(addr, add_meta={}):
 
         return {
             'id': aweme_id,
+            'extractor_key': TikTokIE.ie_key(),
+            'extractor': TikTokIE.IE_NAME,
+            'webpage_url': self._create_url(author_info.get('uid'), aweme_id),
             'title': aweme_detail.get('desc'),
             'description': aweme_detail.get('desc'),
             'view_count': int_or_none(stats_info.get('play_count')),
@@ -275,7 +282,7 @@ def extract_addr(addr, add_meta={}):
             'uploader_url': user_url,
             'track': music_track,
             'album': str_or_none(music_info.get('album')) or None,
-            'artist': music_author,
+            'artist': music_author or None,
             'timestamp': int_or_none(aweme_detail.get('create_time')),
             'formats': formats,
             'subtitles': self.extract_subtitles(aweme_detail, aweme_id),
@@ -364,6 +371,7 @@ def _parse_aweme_video_web(self, aweme_detail, webpage_url):
 
 class TikTokIE(TikTokBaseIE):
     _VALID_URL = r'https?://www\.tiktok\.com/(?:embed|@(?P<user_id>[\w\.-]+)/video)/(?P<id>\d+)'
+    _EMBED_REGEX = [rf'<(?:script|iframe)[^>]+\bsrc=(["\'])(?P<url>{_VALID_URL})']
 
     _TESTS = [{
         'url': 'https://www.tiktok.com/@leenabhushan/video/6748451240264420610',
@@ -461,7 +469,7 @@ class TikTokIE(TikTokBaseIE):
             'repost_count': int,
             'comment_count': int,
         },
-        'expected_warnings': ['Video not available']
+        'expected_warnings': ['trying with webpage', 'Unable to find video in feed']
     }, {
         # Video without title and description
         'url': 'https://www.tiktok.com/@pokemonlife22/video/7059698374567611694',
@@ -485,55 +493,58 @@ class TikTokIE(TikTokBaseIE):
             'repost_count': int,
             'comment_count': int,
         },
-        'expected_warnings': ['Video not available', 'Creating a generic title']
+    }, {
+        # hydration JSON is sent in a <script> element
+        'url': 'https://www.tiktok.com/@denidil6/video/7065799023130643713',
+        'info_dict': {
+            'id': '7065799023130643713',
+            'ext': 'mp4',
+            'title': '#denidil#денидил',
+            'description': '#denidil#денидил',
+            'uploader': 'denidil6',
+            'uploader_id': '7046664115636405250',
+            'uploader_url': 'https://www.tiktok.com/@MS4wLjABAAAAsvMSzFdQ4ikl3uR2TEJwMBbB2yZh2Zxwhx-WCo3rbDpAharE3GQCrFuJArI3C8QJ',
+            'artist': 'Holocron Music',
+            'album': 'Wolf Sounds (1 Hour) Enjoy the Company of the Animal That Is the Majestic King of the Night',
+            'track': 'Wolf Sounds (1 Hour) Enjoy the Company of the Animal That Is the Majestic King of the Night',
+            'timestamp': 1645134536,
+            'duration': 26,
+            'upload_date': '20220217',
+            'view_count': int,
+            'like_count': int,
+            'repost_count': int,
+            'comment_count': int,
+        },
+        'skip': 'This video is unavailable',
     }, {
         # Auto-captions available
         'url': 'https://www.tiktok.com/@hankgreen1/video/7047596209028074758',
         'only_matching': True
     }]
 
-    @classmethod
-    def _extract_urls(cls, webpage):
-        return [mobj.group('url') for mobj in re.finditer(
-            rf'<(?:script|iframe)[^>]+\bsrc=(["\'])(?P<url>{cls._VALID_URL})', webpage)]
-
     def _extract_aweme_app(self, aweme_id):
-        try:
-            aweme_detail = self._call_api('aweme/detail', {'aweme_id': aweme_id}, aweme_id,
-                                          note='Downloading video details', errnote='Unable to download video details').get('aweme_detail')
-            if not aweme_detail:
-                raise ExtractorError('Video not available', video_id=aweme_id)
-        except ExtractorError as e:
-            self.report_warning(f'{e}; Retrying with feed workaround')
-            feed_list = self._call_api('feed', {'aweme_id': aweme_id}, aweme_id,
-                                       note='Downloading video feed', errnote='Unable to download video feed').get('aweme_list') or []
-            aweme_detail = next((aweme for aweme in feed_list if str(aweme.get('aweme_id')) == aweme_id), None)
-            if not aweme_detail:
-                raise ExtractorError('Unable to find video in feed', video_id=aweme_id)
+        feed_list = self._call_api('feed', {'aweme_id': aweme_id}, aweme_id,
+                                   note='Downloading video feed', errnote='Unable to download video feed').get('aweme_list') or []
+        aweme_detail = next((aweme for aweme in feed_list if str(aweme.get('aweme_id')) == aweme_id), None)
+        if not aweme_detail:
+            raise ExtractorError('Unable to find video in feed', video_id=aweme_id)
         return self._parse_aweme_video_app(aweme_detail)
 
     def _real_extract(self, url):
         video_id, user_id = self._match_valid_url(url).group('id', 'user_id')
-        url = self._create_url(user_id, video_id)
-
         try:
             return self._extract_aweme_app(video_id)
         except ExtractorError as e:
-            self.report_warning(f'{e}; Retrying with webpage')
+            self.report_warning(f'{e}; trying with webpage')
 
-        # If we only call once, we get a 403 when downlaoding the video.
-        self._download_webpage(url, video_id)
-        webpage = self._download_webpage(url, video_id, note='Downloading video webpage')
+        url = self._create_url(user_id, video_id)
+        webpage = self._download_webpage(url, video_id, headers={'User-Agent': 'User-Agent:Mozilla/5.0'})
         next_data = self._search_nextjs_data(webpage, video_id, default='{}')
-
         if next_data:
             status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode'), expected_type=int) or 0
             video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct'), expected_type=dict)
         else:
-            sigi_json = self._search_regex(
-                r'>\s*window\[[\'"]SIGI_STATE[\'"]\]\s*=\s*(?P<sigi_state>{.+});',
-                webpage, 'sigi data', group='sigi_state')
-            sigi_data = self._parse_json(sigi_json, video_id)
+            sigi_data = self._get_sigi_state(webpage, video_id)
             status = traverse_obj(sigi_data, ('VideoPage', 'statusCode'), expected_type=int) or 0
             video_data = traverse_obj(sigi_data, ('ItemModule', video_id), expected_type=dict)
 
@@ -547,6 +558,7 @@ def _real_extract(self, url):
 class TikTokUserIE(TikTokBaseIE):
     IE_NAME = 'tiktok:user'
     _VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])'
+    _WORKING = False
     _TESTS = [{
         'url': 'https://tiktok.com/@corgibobaa?lang=en',
         'playlist_mincount': 45,
@@ -605,19 +617,17 @@ def _video_entries_api(self, webpage, user_id, username):
             'device_id': ''.join(random.choice(string.digits) for _ in range(19)),  # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api.
         }
 
-        max_retries = self.get_param('extractor_retries', 3)
         for page in itertools.count(1):
-            for retries in itertools.count():
+            for retry in self.RetryManager():
                 try:
-                    post_list = self._call_api('aweme/post', query, username,
-                                               note='Downloading user video list page %d%s' % (page, f' (attempt {retries})' if retries != 0 else ''),
-                                               errnote='Unable to download user video list')
+                    post_list = self._call_api(
+                        'aweme/post', query, username, note=f'Downloading user video list page {page}',
+                        errnote='Unable to download user video list')
                 except ExtractorError as e:
-                    if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0 and retries != max_retries:
-                        self.report_warning('%s. Retrying...' % str(e.cause or e.msg))
+                    if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
+                        retry.error = e
                         continue
                     raise
-                break
             yield from post_list.get('aweme_list', [])
             if not post_list.get('has_more'):
                 break
@@ -645,7 +655,7 @@ def _real_extract(self, url):
         return self.playlist_result(self._entries_api(user_id, videos), user_id, user_name, thumbnail=thumbnail)
 
 
-class TikTokBaseListIE(TikTokBaseIE):
+class TikTokBaseListIE(TikTokBaseIE):  # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
     def _entries(self, list_id, display_id):
         query = {
             self._QUERY_NAME: list_id,
@@ -655,19 +665,17 @@ def _entries(self, list_id, display_id):
             'device_id': ''.join(random.choice(string.digits) for i in range(19))
         }
 
-        max_retries = self.get_param('extractor_retries', 3)
         for page in itertools.count(1):
-            for retries in itertools.count():
+            for retry in self.RetryManager():
                 try:
-                    post_list = self._call_api(self._API_ENDPOINT, query, display_id,
-                                               note='Downloading video list page %d%s' % (page, f' (attempt {retries})' if retries != 0 else ''),
-                                               errnote='Unable to download video list')
+                    post_list = self._call_api(
+                        self._API_ENDPOINT, query, display_id, note=f'Downloading video list page {page}',
+                        errnote='Unable to download video list')
                 except ExtractorError as e:
-                    if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0 and retries != max_retries:
-                        self.report_warning('%s. Retrying...' % str(e.cause or e.msg))
+                    if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
+                        retry.error = e
                         continue
                     raise
-                break
             for video in post_list.get('aweme_list', []):
                 yield {
                     **self._parse_aweme_video_app(video),
@@ -687,6 +695,7 @@ def _real_extract(self, url):
 class TikTokSoundIE(TikTokBaseListIE):
     IE_NAME = 'tiktok:sound'
     _VALID_URL = r'https?://(?:www\.)?tiktok\.com/music/[\w\.-]+-(?P<id>[\d]+)[/?#&]?'
+    _WORKING = False
     _QUERY_NAME = 'music_id'
     _API_ENDPOINT = 'music/aweme'
     _TESTS = [{
@@ -710,6 +719,7 @@ class TikTokSoundIE(TikTokBaseListIE):
 class TikTokEffectIE(TikTokBaseListIE):
     IE_NAME = 'tiktok:effect'
     _VALID_URL = r'https?://(?:www\.)?tiktok\.com/sticker/[\w\.-]+-(?P<id>[\d]+)[/?#&]?'
+    _WORKING = False
     _QUERY_NAME = 'sticker_id'
     _API_ENDPOINT = 'sticker/aweme'
     _TESTS = [{
@@ -729,6 +739,7 @@ class TikTokEffectIE(TikTokBaseListIE):
 class TikTokTagIE(TikTokBaseListIE):
     IE_NAME = 'tiktok:tag'
     _VALID_URL = r'https?://(?:www\.)?tiktok\.com/tag/(?P<id>[^/?#&]+)'
+    _WORKING = False
     _QUERY_NAME = 'ch_id'
     _API_ENDPOINT = 'challenge/aweme'
     _TESTS = [{
@@ -753,7 +764,7 @@ def _real_extract(self, url):
         return self.playlist_result(self._entries(tag_id, display_id), tag_id, display_id)
 
 
-class DouyinIE(TikTokIE):
+class DouyinIE(TikTokIE):  # XXX: Do not subclass from concrete IE
     _VALID_URL = r'https?://(?:www\.)?douyin\.com/video/(?P<id>[0-9]+)'
     _TESTS = [{
         'url': 'https://www.douyin.com/video/6961737553342991651',
@@ -849,7 +860,7 @@ def _real_extract(self, url):
         try:
             return self._extract_aweme_app(video_id)
         except ExtractorError as e:
-            self.report_warning(f'{e}; Retrying with webpage')
+            self.report_warning(f'{e}; trying with webpage')
 
         webpage = self._download_webpage(url, video_id)
         render_data_json = self._search_regex(