]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/instagram.py
[extractors] Use new framework for existing embeds (#4307)
[yt-dlp.git] / yt_dlp / extractor / instagram.py
index 970f2c8abf96452672d9c838c44e1276ce949dba..94db756403add923bcad616caf5bfb06795491be 100644 (file)
@@ -1,19 +1,17 @@
-# coding: utf-8
-
-import itertools
 import hashlib
+import itertools
 import json
 import re
 import time
+import urllib.error
 
 from .common import InfoExtractor
-from ..compat import (
-    compat_HTTPError,
-)
 from ..utils import (
     ExtractorError,
-    format_field,
+    decode_base_n,
+    encode_base_n,
     float_or_none,
+    format_field,
     get_element_by_attribute,
     int_or_none,
     lowercase_escape,
     urlencode_postdata,
 )
 
+_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
+
+
+def _pk_to_id(id):
+    """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
+    return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS)
+
+
+def _id_to_pk(shortcode):
+    """Covert a shortcode to a numeric value"""
+    return decode_base_n(shortcode[:11], table=_ENCODING_CHARS)
+
 
 class InstagramBaseIE(InfoExtractor):
     _NETRC_MACHINE = 'instagram'
@@ -158,6 +168,15 @@ def _extract_product(self, product_info):
         if isinstance(product_info, list):
             product_info = product_info[0]
 
+        comment_data = traverse_obj(product_info, ('edge_media_to_parent_comment', 'edges'))
+        comments = [{
+            'author': traverse_obj(comment_dict, ('node', 'owner', 'username')),
+            'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')),
+            'id': traverse_obj(comment_dict, ('node', 'id')),
+            'text': traverse_obj(comment_dict, ('node', 'text')),
+            'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none),
+        } for comment_dict in comment_data] if comment_data else None
+
         user_info = product_info.get('user') or {}
         info_dict = {
             'id': product_info.get('code') or product_info.get('id'),
@@ -170,6 +189,7 @@ def _extract_product(self, product_info):
             'view_count': int_or_none(product_info.get('view_count')),
             'like_count': int_or_none(product_info.get('like_count')),
             'comment_count': int_or_none(product_info.get('comment_count')),
+            'comments': comments,
             'http_headers': {
                 'Referer': 'https://www.instagram.com/',
             }
@@ -216,27 +236,14 @@ class InstagramIOSIE(InfoExtractor):
         'add_ie': ['Instagram']
     }]
 
-    def _get_id(self, id):
-        """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
-        chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
-        media_id = int(id.split('_')[0])
-        shortened_id = ''
-        while media_id > 0:
-            r = media_id % 64
-            media_id = (media_id - r) // 64
-            shortened_id = chrs[r] + shortened_id
-        return shortened_id
-
     def _real_extract(self, url):
-        return {
-            '_type': 'url_transparent',
-            'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/',
-            'ie_key': 'Instagram',
-        }
+        video_id = _pk_to_id(self._match_id(url))
+        return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id)
 
 
 class InstagramIE(InstagramBaseIE):
     _VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+    _EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1']
     _TESTS = [{
         'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
         'md5': '0d2da106a9d2631273e192b372806516',
@@ -340,59 +347,62 @@ class InstagramIE(InstagramBaseIE):
         'only_matching': True,
     }]
 
-    @staticmethod
-    def _extract_embed_url(webpage):
-        mobj = re.search(
-            r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1',
-            webpage)
-        if mobj:
-            return mobj.group('url')
-
-        blockquote_el = get_element_by_attribute(
-            'class', 'instagram-media', webpage)
-        if blockquote_el is None:
-            return
+    @classmethod
+    def _extract_embed_urls(cls, url, webpage):
+        res = tuple(super()._extract_embed_urls(url, webpage))
+        if res:
+            return res
 
-        mobj = re.search(
-            r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1', blockquote_el)
+        mobj = re.search(r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1',
+                         get_element_by_attribute('class', 'instagram-media', webpage) or '')
         if mobj:
-            return mobj.group('link')
+            return [mobj.group('link')]
 
     def _real_extract(self, url):
         video_id, url = self._match_valid_url(url).group('id', 'url')
-        webpage, urlh = self._download_webpage_handle(url, video_id)
-        if 'www.instagram.com/accounts/login' in urlh.geturl():
-            self.report_warning('Main webpage is locked behind the login page. '
-                                'Retrying with embed webpage (Note that some metadata might be missing)')
-            webpage = self._download_webpage(
-                'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage')
-
-        shared_data = self._parse_json(
-            self._search_regex(
-                r'window\._sharedData\s*=\s*({.+?});',
-                webpage, 'shared data', default='{}'),
-            video_id, fatal=False)
-        media = traverse_obj(
-            shared_data,
-            ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
-            ('entry_data', 'PostPage', 0, 'media'),
-            expected_type=dict)
-
-        # _sharedData.entry_data.PostPage is empty when authenticated (see
-        # https://github.com/ytdl-org/youtube-dl/pull/22880)
+        general_info = self._download_json(
+            f'https://www.instagram.com/graphql/query/?query_hash=9f8827793ef34641b2fb195d4d41151c'
+            f'&variables=%7B"shortcode":"{video_id}",'
+            '"parent_comment_count":10,"has_threaded_comments":true}', video_id, fatal=False, errnote=False,
+            headers={
+                'Accept': '*',
+                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
+                'Authority': 'www.instagram.com',
+                'Referer': 'https://www.instagram.com',
+                'x-ig-app-id': '936619743392459',
+            })
+        media = traverse_obj(general_info, ('data', 'shortcode_media')) or {}
         if not media:
-            additional_data = self._parse_json(
-                self._search_regex(
-                    r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);',
-                    webpage, 'additional data', default='{}'),
-                video_id, fatal=False)
-            product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
-            if product_item:
-                return self._extract_product(product_item)
-            media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}
-
-        if not media and 'www.instagram.com/accounts/login' in urlh.geturl():
-            self.raise_login_required('You need to log in to access this content')
+            self.report_warning('General metadata extraction failed', video_id)
+
+        info = self._download_json(
+            f'https://i.instagram.com/api/v1/media/{_id_to_pk(video_id)}/info/', video_id,
+            fatal=False, note='Downloading video info', errnote=False, headers={
+                'Accept': '*',
+                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
+                'Authority': 'www.instagram.com',
+                'Referer': 'https://www.instagram.com',
+                'x-ig-app-id': '936619743392459',
+            })
+        if info:
+            media.update(info['items'][0])
+            return self._extract_product(media)
+
+        webpage = self._download_webpage(
+            f'https://www.instagram.com/p/{video_id}/embed/', video_id,
+            note='Downloading embed webpage', fatal=False)
+        if not webpage:
+            self.raise_login_required('Requested content was not found, the content might be private')
+
+        additional_data = self._search_json(
+            r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*', webpage, 'additional data', video_id, fatal=False)
+        product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
+        if product_item:
+            media.update(product_item)
+            return self._extract_product(media)
+
+        media.update(traverse_obj(
+            additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {})
 
         username = traverse_obj(media, ('owner', 'username')) or self._search_regex(
             r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False)
@@ -412,7 +422,7 @@ def _real_extract(self, url):
             if nodes:
                 return self.playlist_result(
                     self._extract_nodes(nodes, True), video_id,
-                    format_field(username, template='Post by %s'), description)
+                    format_field(username, None, 'Post by %s'), description)
 
             video_url = self._og_search_video_url(webpage, secure=False)
 
@@ -521,7 +531,7 @@ def _extract_graphql(self, data, url):
                 except ExtractorError as e:
                     # if it's an error caused by a bad query, and there are
                     # more GIS templates to try, ignore it and keep trying
-                    if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
+                    if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403:
                         if gis_tmpl != gis_tmpls[-1]:
                             continue
                     raise
@@ -631,41 +641,36 @@ class InstagramStoryIE(InstagramBaseIE):
 
     def _real_extract(self, url):
         username, story_id = self._match_valid_url(url).groups()
-
-        story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1'
-        story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={
-            'X-IG-App-ID': 936619743392459,
-            'X-ASBD-ID': 198387,
-            'X-IG-WWW-Claim': 0,
-            'X-Requested-With': 'XMLHttpRequest',
-            'Referer': url,
-        })
-        user_id = story_info['user']['id']
-        highlight_title = traverse_obj(story_info, ('highlight', 'title'))
+        story_info = self._download_webpage(url, story_id)
+        user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False)
+        if not user_info:
+            self.raise_login_required('This content is unreachable')
+        user_id = user_info.get('id')
 
         story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}'
-        videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={
-            'X-IG-App-ID': 936619743392459,
-            'X-ASBD-ID': 198387,
-            'X-IG-WWW-Claim': 0,
-        })['reels']
-
-        full_name = traverse_obj(videos, ('user', 'full_name'))
-
-        user_info = {}
-        if not (username and username != 'highlights' and full_name):
-            user_info = self._download_json(
-                f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={
-                    'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)',
-                }, note='Downloading user info')
+        videos = traverse_obj(self._download_json(
+            f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}',
+            story_id, errnote=False, fatal=False, headers={
+                'X-IG-App-ID': 936619743392459,
+                'X-ASBD-ID': 198387,
+                'X-IG-WWW-Claim': 0,
+            }), 'reels')
+        if not videos:
+            self.raise_login_required('You need to log in to access this content')
 
-        username = traverse_obj(user_info, ('user', 'username')) or username
-        full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name
+        full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name'))
+        story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title'))
+        if not story_title:
+            story_title = f'Story by {username}'
 
         highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items'))
-        return self.playlist_result([{
-            **self._extract_product(highlight),
-            'title': f'Story by {username}',
-            'uploader': full_name,
-            'uploader_id': user_id,
-        } for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title)
+        info_data = []
+        for highlight in highlights:
+            highlight_data = self._extract_product(highlight)
+            if highlight_data.get('formats'):
+                info_data.append({
+                    **highlight_data,
+                    'uploader': full_name,
+                    'uploader_id': user_id,
+                })
+        return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)