import hashlib
import json
import re
+import time
from .common import InfoExtractor
from ..compat import (
std_headers,
try_get,
url_or_none,
+ variadic,
+ urlencode_postdata,
)
class InstagramIE(InfoExtractor):
_VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+ _NETRC_MACHINE = 'instagram'
_TESTS = [{
'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
'md5': '0d2da106a9d2631273e192b372806516',
if mobj:
return mobj.group('link')
+ def _login(self):
+ username, password = self._get_login_info()
+ if username is None:
+ return
+
+ login_webpage = self._download_webpage(
+ 'https://www.instagram.com/accounts/login/', None,
+ note='Downloading login webpage', errnote='Failed to download login webpage')
+
+ shared_data = self._parse_json(
+ self._search_regex(
+ r'window\._sharedData\s*=\s*({.+?});',
+ login_webpage, 'shared data', default='{}'),
+ None)
+
+ login = self._download_json('https://www.instagram.com/accounts/login/ajax/', None, note='Logging in', headers={
+ 'Accept': '*/*',
+ 'X-IG-App-ID': '936619743392459',
+ 'X-ASBD-ID': '198387',
+ 'X-IG-WWW-Claim': '0',
+ 'X-Requested-With': 'XMLHttpRequest',
+ 'X-CSRFToken': shared_data['config']['csrf_token'],
+ 'X-Instagram-AJAX': shared_data['rollout_hash'],
+ 'Referer': 'https://www.instagram.com/',
+ }, data=urlencode_postdata({
+ 'enc_password': f'#PWD_INSTAGRAM_BROWSER:0:{int(time.time())}:{password}',
+ 'username': username,
+ 'queryParams': '{}',
+ 'optIntoOneTap': 'false',
+ 'stopDeletionNonce': '',
+ 'trustedDeviceRecords': '{}',
+ }))
+
+ if not login.get('authenticated'):
+ if login.get('message'):
+ raise ExtractorError(f'Unable to login: {login["message"]}')
+ raise ExtractorError('Unable to login')
+
+ def _real_initialize(self):
+ self._login()
+
def _real_extract(self, url):
- mobj = re.match(self._VALID_URL, url)
+ mobj = self._match_valid_url(url)
video_id = mobj.group('id')
url = mobj.group('url')
- webpage = self._download_webpage(url, video_id)
+ webpage, urlh = self._download_webpage_handle(url, video_id)
+ if 'www.instagram.com/accounts/login' in urlh.geturl().rstrip('/'):
+ self.raise_login_required('You need to log in to access this content')
(media, video_url, description, thumbnail, timestamp, uploader,
uploader_id, like_count, comment_count, comments, height,
dict)
if media:
video_url = media.get('video_url')
- height = int_or_none(media.get('dimensions', {}).get('height'))
- width = int_or_none(media.get('dimensions', {}).get('width'))
+ height = try_get(media, lambda x: x['dimensions']['height'])
+ width = try_get(media, lambda x: x['dimensions']['width'])
description = try_get(
media, lambda x: x['edge_media_to_caption']['edges'][0]['node']['text'],
compat_str) or media.get('caption')
thumbnail = media.get('display_src') or media.get('display_url')
duration = float_or_none(media.get('video_duration'))
timestamp = int_or_none(media.get('taken_at_timestamp') or media.get('date'))
- uploader = media.get('owner', {}).get('full_name')
- uploader_id = media.get('owner', {}).get('username')
+ uploader = try_get(media, lambda x: x['owner']['full_name'])
+ uploader_id = try_get(media, lambda x: x['owner']['username'])
def get_count(keys, kind):
- if not isinstance(keys, (list, tuple)):
- keys = [keys]
- for key in keys:
+ for key in variadic(keys):
count = int_or_none(try_get(
media, (lambda x: x['edge_media_%s' % key]['count'],
lambda x: x['%ss' % kind]['count'])))
if count is not None:
return count
+
like_count = get_count('preview_like', 'like')
comment_count = get_count(
('preview_comment', 'to_comment', 'to_parent_comment'), 'comment')
- comments = [{
- 'author': comment.get('user', {}).get('username'),
- 'author_id': comment.get('user', {}).get('id'),
- 'id': comment.get('id'),
- 'text': comment.get('text'),
- 'timestamp': int_or_none(comment.get('created_at')),
- } for comment in media.get(
- 'comments', {}).get('nodes', []) if comment.get('text')]
+ comments = []
+ for comment in try_get(media, lambda x: x['edge_media_to_parent_comment']['edges']):
+ comment_dict = comment.get('node', {})
+ comment_text = comment_dict.get('text')
+ if comment_text:
+ comments.append({
+ 'author': try_get(comment_dict, lambda x: x['owner']['username']),
+ 'author_id': try_get(comment_dict, lambda x: x['owner']['id']),
+ 'id': comment_dict.get('id'),
+ 'text': comment_text,
+ 'timestamp': int_or_none(comment_dict.get('created_at')),
+ })
if not video_url:
edges = try_get(
media, lambda x: x['edge_sidecar_to_children']['edges'],
'width': width,
'height': height,
}]
+ dash = try_get(media, lambda x: x['dash_info']['video_dash_manifest'])
+ if dash:
+ formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash'))
+ self._sort_formats(formats)
if not uploader_id:
uploader_id = self._search_regex(
'like_count': like_count,
'comment_count': comment_count,
'comments': comments,
+ 'http_headers': {
+ 'Referer': 'https://www.instagram.com/',
+ }
}