]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/facebook.py
[cleanup] Add more ruff rules (#10149)
[yt-dlp.git] / yt_dlp / extractor / facebook.py
index 2fbdf1c37cad5b742a961e20374fa39bfef21b1d..1d1e0770a68ff1a68606498cb4251101380f0a7d 100644 (file)
@@ -3,18 +3,13 @@
 import urllib.parse
 
 from .common import InfoExtractor
-from ..compat import (
-    compat_etree_fromstring,
-    compat_str,
-    compat_urllib_parse_unquote,
-)
+from ..compat import compat_etree_fromstring
 from ..networking import Request
 from ..networking.exceptions import network_exceptions
 from ..utils import (
     ExtractorError,
     clean_html,
     determine_ext,
-    error_to_compat_str,
     float_or_none,
     format_field,
     get_element_by_id,
@@ -44,6 +39,7 @@ class FacebookIE(InfoExtractor):
                         (?:[^#]*?\#!/)?
                         (?:
                             (?:
+                                permalink\.php|
                                 video/video\.php|
                                 photo\.php|
                                 video\.php|
@@ -53,6 +49,7 @@ class FacebookIE(InfoExtractor):
                             )\?(?:.*?)(?:v|video_id|story_fbid)=|
                             [^/]+/videos/(?:[^/]+/)?|
                             [^/]+/posts/|
+                            events/(?:[^/]+/)?|
                             groups/[^/]+/(?:permalink|posts)/|
                             watchparty/
                         )|
@@ -207,7 +204,7 @@ class FacebookIE(InfoExtractor):
             'skip_download': True,
         },
     }, {
-        # FIXME
+        # FIXME: Cannot parse data error
         'url': 'https://www.facebook.com/LaGuiaDelVaron/posts/1072691702860471',
         'info_dict': {
             'id': '1072691702860471',
@@ -249,6 +246,7 @@ class FacebookIE(InfoExtractor):
             'duration': 148.435,
         },
     }, {
+        # data.node.comet_sections.content.story.attachments[].styles.attachment.media
         'url': 'https://www.facebook.com/attn/posts/pfbid0j1Czf2gGDVqeQ8KiMLFm3pWN8GxsQmeRrVhimWDzMuKQoR8r4b1knNsejELmUgyhl',
         'info_dict': {
             'id': '6968553779868435',
@@ -263,6 +261,22 @@ class FacebookIE(InfoExtractor):
             'thumbnail': r're:^https?://.*',
             'timestamp': 1701975646,
         },
+    }, {
+        # data.node.comet_sections.content.story.attachments[].styles.attachment.media
+        'url': 'https://www.facebook.com/permalink.php?story_fbid=pfbid0fqQuVEQyXRa9Dp4RcaTR14KHU3uULHV1EK7eckNXSH63JMuoALsAvVCJ97zAGitil&id=100068861234290',
+        'info_dict': {
+            'id': '270103405756416',
+            'ext': 'mp4',
+            'title': 'Lela Evans',
+            'description': 'Today Makkovik\'s own Pilot Mandy Smith made her inaugural landing on the airstrip in her hometown. What a proud moment as we all cheered and...',
+            'thumbnail': r're:^https?://.*',
+            'uploader': 'Lela Evans',
+            'uploader_id': 'pfbid0shZJipuigyy5mqrUJn9ub5LJFWNHvan5prtyi3LrDuuuJ4NwrURgnQHYR9fywBepl',
+            'upload_date': '20231228',
+            'timestamp': 1703804085,
+            'duration': 394.347,
+            'view_count': int,
+        },
     }, {
         'url': 'https://www.facebook.com/story.php?story_fbid=pfbid0Fnzhm8UuzjBYpPMNFzaSpFE9UmLdU4fJN8qTANi1Dmtj5q7DNrL5NERXfsAzDEV7l&id=100073071055552',
         'only_matching': True,
@@ -381,10 +395,22 @@ class FacebookIE(InfoExtractor):
         },
         'playlist_count': 1,
         'skip': 'Requires logging in',
+    }, {
+        # data.event.cover_media_renderer.cover_video
+        'url': 'https://m.facebook.com/events/1509582499515440',
+        'info_dict': {
+            'id': '637246984455045',
+            'ext': 'mp4',
+            'title': 'ANALISI IN CAMPO OSCURO " Coaguli nel sangue dei vaccinati"',
+            'description': 'Other event by Comitato Liberi Pensatori on Tuesday, October 18 2022',
+            'thumbnail': r're:^https?://.*',
+            'uploader': 'Comitato Liberi Pensatori',
+            'uploader_id': '100065709540881',
+        },
     }]
     _SUPPORTED_PAGLETS_REGEX = r'(?:pagelet_group_mall|permalink_video_pagelet|hyperfeed_story_id_[0-9a-f]+)'
     _api_config = {
-        'graphURI': '/api/graphql/'
+        'graphURI': '/api/graphql/',
     }
 
     def _perform_login(self, username, password):
@@ -419,7 +445,7 @@ def _perform_login(self, username, password):
                     r'(?s)<div[^>]+class=(["\']).*?login_error_box.*?\1[^>]*><div[^>]*>.*?</div><div[^>]*>(?P<error>.+?)</div>',
                     login_results, 'login error', default=None, group='error')
                 if error:
-                    raise ExtractorError('Unable to login: %s' % error, expected=True)
+                    raise ExtractorError(f'Unable to login: {error}', expected=True)
                 self.report_warning('unable to log in: bad username/password, or exceeded login rate limit (~3/min). Check credentials or wait.')
                 return
 
@@ -443,7 +469,7 @@ def _perform_login(self, username, password):
             if re.search(r'id="checkpointSubmitButton"', check_response) is not None:
                 self.report_warning('Unable to confirm login, you have to login in your browser and authorize the login.')
         except network_exceptions as err:
-            self.report_warning('unable to log in: %s' % error_to_compat_str(err))
+            self.report_warning(f'unable to log in: {err}')
             return
 
     def _extract_from_url(self, url, video_id):
@@ -455,51 +481,28 @@ def extract_metadata(webpage):
                 r'data-sjs>({.*?ScheduledServerJS.*?})</script>', webpage)]
             post = traverse_obj(post_data, (
                 ..., 'require', ..., ..., ..., '__bbox', 'require', ..., ..., ..., '__bbox', 'result', 'data'), expected_type=dict) or []
-
-            automatic_captions, subtitles = {}, {}
-            subs_data = traverse_obj(post, (..., 'video', ..., 'attachments', ..., lambda k, v: (
-                k == 'media' and str(v['id']) == video_id and v['__typename'] == 'Video')))
-            is_video_broadcast = get_first(subs_data, 'is_video_broadcast', expected_type=bool)
-            captions = get_first(subs_data, 'video_available_captions_locales', 'captions_url')
-            if url_or_none(captions):  # if subs_data only had a 'captions_url'
-                locale = self._html_search_meta(['og:locale', 'twitter:locale'], webpage, 'locale', default='en_US')
-                subtitles[locale] = [{'url': captions}]
-            # or else subs_data had 'video_available_captions_locales', a list of dicts
-            for caption in traverse_obj(captions, (
-                {lambda x: sorted(x, key=lambda c: c['locale'])}, lambda _, v: v['captions_url'])
-            ):
-                lang = caption.get('localized_language') or ''
-                subs = {
-                    'url': caption['captions_url'],
-                    'name': format_field(caption, 'localized_country', f'{lang} (%s)', default=lang),
-                }
-                if caption.get('localized_creation_method') or is_video_broadcast:
-                    automatic_captions.setdefault(caption['locale'], []).append(subs)
-                else:
-                    subtitles.setdefault(caption['locale'], []).append(subs)
-
             media = traverse_obj(post, (..., 'attachments', ..., lambda k, v: (
                 k == 'media' and str(v['id']) == video_id and v['__typename'] == 'Video')), expected_type=dict)
             title = get_first(media, ('title', 'text'))
             description = get_first(media, ('creation_story', 'comet_sections', 'message', 'story', 'message', 'text'))
-            uploader_data = (
-                get_first(media, ('owner', {dict}))
-                or get_first(post, (..., 'video', lambda k, v: k == 'owner' and v['name']))
-                or get_first(post, ('node', 'actors', ..., {dict})) or {})
-
             page_title = title or self._html_search_regex((
                 r'<h2\s+[^>]*class="uiHeaderTitle"[^>]*>(?P<content>[^<]*)</h2>',
                 r'(?s)<span class="fbPhotosPhotoCaption".*?id="fbPhotoPageCaption"><span class="hasCaption">(?P<content>.*?)</span>',
-                self._meta_regex('og:title'), self._meta_regex('twitter:title'), r'<title>(?P<content>.+?)</title>'
+                self._meta_regex('og:title'), self._meta_regex('twitter:title'), r'<title>(?P<content>.+?)</title>',
             ), webpage, 'title', default=None, group='content')
             description = description or self._html_search_meta(
                 ['description', 'og:description', 'twitter:description'],
                 webpage, 'description', default=None)
+            uploader_data = (
+                get_first(media, ('owner', {dict}))
+                or get_first(post, ('video', 'creation_story', 'attachments', ..., 'media', lambda k, v: k == 'owner' and v['name']))
+                or get_first(post, (..., 'video', lambda k, v: k == 'owner' and v['name']))
+                or get_first(post, ('node', 'actors', ..., {dict}))
+                or get_first(post, ('event', 'event_creator', {dict})) or {})
             uploader = uploader_data.get('name') or (
                 clean_html(get_element_by_id('fbPhotoPageAuthorName', webpage))
                 or self._search_regex(
                     (r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes('title')), webpage, 'uploader', fatal=False))
-
             timestamp = int_or_none(self._search_regex(
                 r'<abbr[^>]+data-utime=["\'](\d+)', webpage,
                 'timestamp', default=None))
@@ -517,12 +520,10 @@ def extract_metadata(webpage):
                 'timestamp': timestamp,
                 'thumbnail': thumbnail,
                 'view_count': parse_count(self._search_regex(
-                    (r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)',),
+                    (r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'),
                     webpage, 'view count', default=None)),
                 'concurrent_view_count': get_first(post, (
                     ('video', (..., ..., 'attachments', ..., 'media')), 'liveViewerCount', {int_or_none})),
-                'automatic_captions': automatic_captions,
-                'subtitles': subtitles,
             }
 
             info_json_ld = self._search_json_ld(webpage, video_id, default={})
@@ -554,7 +555,7 @@ def extract_from_jsmods_instances(js_data):
                     js_data, lambda x: x['jsmods']['instances'], list) or [])
 
         def extract_dash_manifest(video, formats):
-            dash_manifest = video.get('dash_manifest')
+            dash_manifest = traverse_obj(video, 'dash_manifest', 'playlist', expected_type=str)
             if dash_manifest:
                 formats.extend(self._parse_mpd_formats(
                     compat_etree_fromstring(urllib.parse.unquote_plus(dash_manifest)),
@@ -572,19 +573,19 @@ def process_formats(info):
 
         def extract_relay_data(_filter):
             return self._parse_json(self._search_regex(
-                r'data-sjs>({.*?%s.*?})</script>' % _filter,
+                rf'data-sjs>({{.*?{_filter}.*?}})</script>',
                 webpage, 'replay data', default='{}'), video_id, fatal=False) or {}
 
         def extract_relay_prefetched_data(_filter):
             return traverse_obj(extract_relay_data(_filter), (
                 'require', (None, (..., ..., ..., '__bbox', 'require')),
-                lambda _, v: 'RelayPrefetchedStreamCache' in v, ..., ...,
-                '__bbox', 'result', 'data', {dict}), get_all=False) or {}
+                lambda _, v: any(key.startswith('RelayPrefetchedStreamCache') for key in v),
+                ..., ..., '__bbox', 'result', 'data', {dict}), get_all=False) or {}
 
         if not video_data:
             server_js_data = self._parse_json(self._search_regex([
                 r'bigPipe\.onPageletArrive\(({.+?})\)\s*;\s*}\s*\)\s*,\s*["\']onPageletArrive\s+' + self._SUPPORTED_PAGLETS_REGEX,
-                r'bigPipe\.onPageletArrive\(({.*?id\s*:\s*"%s".*?})\);' % self._SUPPORTED_PAGLETS_REGEX
+                rf'bigPipe\.onPageletArrive\(({{.*?id\s*:\s*"{self._SUPPORTED_PAGLETS_REGEX}".*?}})\);',
             ], webpage, 'js data', default='{}'), video_id, js_to_json, False)
             video_data = extract_from_jsmods_instances(server_js_data)
 
@@ -620,6 +621,29 @@ def parse_graphql_video(video):
                                 'url': playable_url,
                             })
                     extract_dash_manifest(video, formats)
+
+                    automatic_captions, subtitles = {}, {}
+                    is_broadcast = traverse_obj(video, ('is_video_broadcast', {bool}))
+                    for caption in traverse_obj(video, (
+                        'video_available_captions_locales',
+                        {lambda x: sorted(x, key=lambda c: c['locale'])},
+                        lambda _, v: url_or_none(v['captions_url']),
+                    )):
+                        lang = caption.get('localized_language') or 'und'
+                        subs = {
+                            'url': caption['captions_url'],
+                            'name': format_field(caption, 'localized_country', f'{lang} (%s)', default=lang),
+                        }
+                        if caption.get('localized_creation_method') or is_broadcast:
+                            automatic_captions.setdefault(caption['locale'], []).append(subs)
+                        else:
+                            subtitles.setdefault(caption['locale'], []).append(subs)
+                    captions_url = traverse_obj(video, ('captions_url', {url_or_none}))
+                    if captions_url and not automatic_captions and not subtitles:
+                        locale = self._html_search_meta(
+                            ['og:locale', 'twitter:locale'], webpage, 'locale', default='en_US')
+                        (automatic_captions if is_broadcast else subtitles)[locale] = [{'url': captions_url}]
+
                     info = {
                         'id': v_id,
                         'formats': formats,
@@ -629,6 +653,8 @@ def parse_graphql_video(video):
                         'timestamp': traverse_obj(video, 'publish_time', 'creation_time', expected_type=int_or_none),
                         'duration': (float_or_none(video.get('playable_duration_in_ms'), 1000)
                                      or float_or_none(video.get('length_in_second'))),
+                        'automatic_captions': automatic_captions,
+                        'subtitles': subtitles,
                     }
                     process_formats(info)
                     description = try_get(video, lambda x: x['savable_description']['text'])
@@ -639,7 +665,7 @@ def parse_graphql_video(video):
                             'description': description,
                         })
                     else:
-                        info['title'] = description or 'Facebook video #%s' % v_id
+                        info['title'] = description or f'Facebook video #{v_id}'
                     entries.append(info)
 
                 def parse_attachment(attachment, key='media'):
@@ -663,11 +689,12 @@ def parse_attachment(attachment, key='media'):
                 for edge in edges:
                     parse_attachment(edge, key='node')
 
-                video = data.get('video') or {}
+                video = traverse_obj(data, (
+                    'event', 'cover_media_renderer', 'cover_video'), 'video', expected_type=dict) or {}
                 if video:
                     attachments = try_get(video, [
                         lambda x: x['story']['attachments'],
-                        lambda x: x['creation_story']['attachments']
+                        lambda x: x['creation_story']['attachments'],
                     ], list) or []
                     for attachment in attachments:
                         parse_attachment(attachment)
@@ -691,7 +718,7 @@ def parse_attachment(attachment, key='media'):
             m_msg = re.search(r'class="[^"]*uiInterstitialContent[^"]*"><div>(.*?)</div>', webpage)
             if m_msg is not None:
                 raise ExtractorError(
-                    'The video is not available, Facebook said: "%s"' % m_msg.group(1),
+                    f'The video is not available, Facebook said: "{m_msg.group(1)}"',
                     expected=True)
             elif any(p in webpage for p in (
                     '>You must log in to continue',
@@ -728,7 +755,7 @@ def parse_attachment(attachment, key='media'):
                 v_id = video.get('id')
                 if not v_id:
                     continue
-                v_id = compat_str(v_id)
+                v_id = str(v_id)
                 entries.append(self.url_result(
                     self._VIDEO_PAGE_TEMPLATE % v_id,
                     self.ie_key(), v_id, video.get('name')))
@@ -786,7 +813,7 @@ def parse_attachment(attachment, key='media'):
                 continue
             for quality in ('sd', 'hd'):
                 for src_type in ('src', 'src_no_ratelimit'):
-                    src = f[0].get('%s_%s' % (quality, src_type))
+                    src = f[0].get(f'{quality}_{src_type}')
                     if src:
                         # sd, hd formats w/o resolution info should be deprioritized below DASH
                         # TODO: investigate if progressive or src formats still exist
@@ -794,10 +821,10 @@ def parse_attachment(attachment, key='media'):
                         if quality == 'hd':
                             preference += 1
                         formats.append({
-                            'format_id': '%s_%s_%s' % (format_id, quality, src_type),
+                            'format_id': f'{format_id}_{quality}_{src_type}',
                             'url': src,
                             'quality': preference,
-                            'height': 720 if quality == 'hd' else None
+                            'height': 720 if quality == 'hd' else None,
                         })
             extract_dash_manifest(f[0], formats)
             subtitles_src = f[0].get('subtitles_src')
@@ -847,7 +874,7 @@ class FacebookPluginsVideoIE(InfoExtractor):
 
     def _real_extract(self, url):
         return self.url_result(
-            compat_urllib_parse_unquote(self._match_id(url)),
+            urllib.parse.unquote(self._match_id(url)),
             FacebookIE.ie_key())
 
 
@@ -908,7 +935,7 @@ class FacebookReelIE(InfoExtractor):
             'timestamp': 1637502609,
             'upload_date': '20211121',
             'thumbnail': r're:^https?://.*',
-        }
+        },
     }]
 
     def _real_extract(self, url):
@@ -934,7 +961,7 @@ class FacebookAdsIE(InfoExtractor):
             'thumbnail': r're:^https?://.*',
             'upload_date': '20231214',
             'like_count': int,
-        }
+        },
     }, {
         'url': 'https://www.facebook.com/ads/library/?id=893637265423481',
         'info_dict': {
@@ -966,7 +993,7 @@ class FacebookAdsIE(InfoExtractor):
     def _extract_formats(self, video_dict):
         formats = []
         for format_key, format_url in traverse_obj(video_dict, (
-            {dict.items}, lambda _, v: v[0] in self._FORMATS_MAP and url_or_none(v[1])
+            {dict.items}, lambda _, v: v[0] in self._FORMATS_MAP and url_or_none(v[1]),
         )):
             formats.append({
                 'format_id': self._FORMATS_MAP[format_key][0],
@@ -1003,7 +1030,7 @@ def _real_extract(self, url):
 
         entries = []
         for idx, entry in enumerate(traverse_obj(
-            data, (('videos', 'cards'), lambda _, v: any([url_or_none(v[f]) for f in self._FORMATS_MAP]))), 1
+            data, (('videos', 'cards'), lambda _, v: any(url_or_none(v[f]) for f in self._FORMATS_MAP))), 1,
         ):
             entries.append({
                 'id': f'{video_id}_{idx}',