]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/extractor/instagram.py
[instagram] Add more formats when logged in (#1487)
[yt-dlp.git] / yt_dlp / extractor / instagram.py
index 12e10143cdc100877f3f0233243f80d224599863..8c935c251433e70c8fefd4f7c57570eeb135abba 100644 (file)
@@ -4,6 +4,7 @@
 import hashlib
 import json
 import re
+import time
 
 from .common import InfoExtractor
 from ..compat import (
     std_headers,
     try_get,
     url_or_none,
+    variadic,
+    urlencode_postdata,
 )
 
 
 class InstagramIE(InfoExtractor):
     _VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+    _NETRC_MACHINE = 'instagram'
     _TESTS = [{
         'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
         'md5': '0d2da106a9d2631273e192b372806516',
@@ -139,12 +143,55 @@ def _extract_embed_url(webpage):
         if mobj:
             return mobj.group('link')
 
+    def _login(self):
+        username, password = self._get_login_info()
+        if username is None:
+            return
+
+        login_webpage = self._download_webpage(
+            'https://www.instagram.com/accounts/login/', None,
+            note='Downloading login webpage', errnote='Failed to download login webpage')
+
+        shared_data = self._parse_json(
+            self._search_regex(
+                r'window\._sharedData\s*=\s*({.+?});',
+                login_webpage, 'shared data', default='{}'),
+            None)
+
+        login = self._download_json('https://www.instagram.com/accounts/login/ajax/', None, note='Logging in', headers={
+            'Accept': '*/*',
+            'X-IG-App-ID': '936619743392459',
+            'X-ASBD-ID': '198387',
+            'X-IG-WWW-Claim': '0',
+            'X-Requested-With': 'XMLHttpRequest',
+            'X-CSRFToken': shared_data['config']['csrf_token'],
+            'X-Instagram-AJAX': shared_data['rollout_hash'],
+            'Referer': 'https://www.instagram.com/',
+        }, data=urlencode_postdata({
+            'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}',
+            'username': username,
+            'queryParams': '{}',
+            'optIntoOneTap': 'false',
+            'stopDeletionNonce': '',
+            'trustedDeviceRecords': '{}',
+        }))
+
+        if not login.get('authenticated'):
+            if login.get('message'):
+                raise ExtractorError(f'Unable to login: {login["message"]}')
+            raise ExtractorError('Unable to login')
+
+    def _real_initialize(self):
+        self._login()
+
     def _real_extract(self, url):
-        mobj = re.match(self._VALID_URL, url)
+        mobj = self._match_valid_url(url)
         video_id = mobj.group('id')
         url = mobj.group('url')
 
-        webpage = self._download_webpage(url, video_id)
+        webpage, urlh = self._download_webpage_handle(url, video_id)
+        if 'www.instagram.com/accounts/login' in urlh.geturl().rstrip('/'):
+            self.raise_login_required('You need to log in to access this content')
 
         (media, video_url, description, thumbnail, timestamp, uploader,
          uploader_id, like_count, comment_count, comments, height,
@@ -175,8 +222,8 @@ def _real_extract(self, url):
                     dict)
         if media:
             video_url = media.get('video_url')
-            height = int_or_none(media.get('dimensions', {}).get('height'))
-            width = int_or_none(media.get('dimensions', {}).get('width'))
+            height = try_get(media, lambda x: x['dimensions']['height'])
+            width = try_get(media, lambda x: x['dimensions']['width'])
             description = try_get(
                 media, lambda x: x['edge_media_to_caption']['edges'][0]['node']['text'],
                 compat_str) or media.get('caption')
@@ -184,30 +231,33 @@ def _real_extract(self, url):
             thumbnail = media.get('display_src') or media.get('display_url')
             duration = float_or_none(media.get('video_duration'))
             timestamp = int_or_none(media.get('taken_at_timestamp') or media.get('date'))
-            uploader = media.get('owner', {}).get('full_name')
-            uploader_id = media.get('owner', {}).get('username')
+            uploader = try_get(media, lambda x: x['owner']['full_name'])
+            uploader_id = try_get(media, lambda x: x['owner']['username'])
 
             def get_count(keys, kind):
-                if not isinstance(keys, (list, tuple)):
-                    keys = [keys]
-                for key in keys:
+                for key in variadic(keys):
                     count = int_or_none(try_get(
                         media, (lambda x: x['edge_media_%s' % key]['count'],
                                 lambda x: x['%ss' % kind]['count'])))
                     if count is not None:
                         return count
+
             like_count = get_count('preview_like', 'like')
             comment_count = get_count(
                 ('preview_comment', 'to_comment', 'to_parent_comment'), 'comment')
 
-            comments = [{
-                'author': comment.get('user', {}).get('username'),
-                'author_id': comment.get('user', {}).get('id'),
-                'id': comment.get('id'),
-                'text': comment.get('text'),
-                'timestamp': int_or_none(comment.get('created_at')),
-            } for comment in media.get(
-                'comments', {}).get('nodes', []) if comment.get('text')]
+            comments = []
+            for comment in try_get(media, lambda x: x['edge_media_to_parent_comment']['edges']):
+                comment_dict = comment.get('node', {})
+                comment_text = comment_dict.get('text')
+                if comment_text:
+                    comments.append({
+                        'author': try_get(comment_dict, lambda x: x['owner']['username']),
+                        'author_id': try_get(comment_dict, lambda x: x['owner']['id']),
+                        'id': comment_dict.get('id'),
+                        'text': comment_text,
+                        'timestamp': int_or_none(comment_dict.get('created_at')),
+                    })
             if not video_url:
                 edges = try_get(
                     media, lambda x: x['edge_sidecar_to_children']['edges'],
@@ -244,6 +294,10 @@ def get_count(keys, kind):
             'width': width,
             'height': height,
         }]
+        dash = try_get(media, lambda x: x['dash_info']['video_dash_manifest'])
+        if dash:
+            formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash'))
+        self._sort_formats(formats)
 
         if not uploader_id:
             uploader_id = self._search_regex(
@@ -273,6 +327,9 @@ def get_count(keys, kind):
             'like_count': like_count,
             'comment_count': comment_count,
             'comments': comments,
+            'http_headers': {
+                'Referer': 'https://www.instagram.com/',
+            }
         }