import json
import re
import time
-import urllib.error
from .common import InfoExtractor
+from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
decode_base_n,
encode_base_n,
+ filter_dict,
float_or_none,
format_field,
get_element_by_attribute,
_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
-def _pk_to_id(id):
+def _pk_to_id(media_id):
"""Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
- return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS)
+ return encode_base_n(int(media_id.split('_')[0]), table=_ENCODING_CHARS)
def _id_to_pk(shortcode):
'height': self._get_dimension('height', node),
'http_headers': {
'Referer': 'https://www.instagram.com/',
- }
+ },
}
elif not video_id:
continue
return {}
formats = [{
- 'format_id': format.get('id'),
- 'url': format.get('url'),
- 'width': format.get('width'),
- 'height': format.get('height'),
+ 'format_id': fmt.get('id'),
+ 'url': fmt.get('url'),
+ 'width': fmt.get('width'),
+ 'height': fmt.get('height'),
'vcodec': vcodec,
- } for format in videos_list or []]
+ } for fmt in videos_list or []]
if dash_manifest_raw:
formats.extend(self._parse_mpd_formats(self._parse_xml(dash_manifest_raw, media_id), mpd_id='dash'))
- self._sort_formats(formats)
thumbnails = [{
'url': thumbnail.get('url'),
'width': thumbnail.get('width'),
- 'height': thumbnail.get('height')
+ 'height': thumbnail.get('height'),
} for thumbnail in traverse_obj(product_media, ('image_versions2', 'candidates')) or []]
return {
'id': media_id,
'duration': float_or_none(product_media.get('video_duration')),
'formats': formats,
- 'thumbnails': thumbnails
+ 'thumbnails': thumbnails,
}
def _extract_product(self, product_info):
if isinstance(product_info, list):
product_info = product_info[0]
- comment_data = traverse_obj(product_info, ('edge_media_to_parent_comment', 'edges'))
- comments = [{
- 'author': traverse_obj(comment_dict, ('node', 'owner', 'username')),
- 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')),
- 'id': traverse_obj(comment_dict, ('node', 'id')),
- 'text': traverse_obj(comment_dict, ('node', 'text')),
- 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none),
- } for comment_dict in comment_data] if comment_data else None
-
user_info = product_info.get('user') or {}
info_dict = {
- 'id': product_info.get('code') or _pk_to_id(product_info.get('pk')),
+ 'id': _pk_to_id(traverse_obj(product_info, 'pk', 'id', expected_type=str_or_none)[:19]),
'title': product_info.get('title') or f'Video by {user_info.get("username")}',
'description': traverse_obj(product_info, ('caption', 'text'), expected_type=str_or_none),
'timestamp': int_or_none(product_info.get('taken_at')),
'view_count': int_or_none(product_info.get('view_count')),
'like_count': int_or_none(product_info.get('like_count')),
'comment_count': int_or_none(product_info.get('comment_count')),
- 'comments': comments,
+ '__post_extractor': self.extract_comments(_pk_to_id(product_info.get('pk'))),
'http_headers': {
'Referer': 'https://www.instagram.com/',
- }
+ },
}
carousel_media = product_info.get('carousel_media')
if carousel_media:
return {
**info_dict,
- **self._extract_product_media(product_info)
+ **self._extract_product_media(product_info),
}
+ def _get_comments(self, video_id):
+ comments_info = self._download_json(
+ f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/comments/?can_support_threading=true&permalink_enabled=false', video_id,
+ fatal=False, errnote='Comments extraction failed', note='Downloading comments info', headers=self._API_HEADERS) or {}
+
+ comment_data = traverse_obj(comments_info, ('edge_media_to_parent_comment', 'edges'), 'comments')
+ for comment_dict in comment_data or []:
+ yield {
+ 'author': traverse_obj(comment_dict, ('node', 'owner', 'username'), ('user', 'username')),
+ 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id'), ('user', 'pk')),
+ 'author_thumbnail': traverse_obj(comment_dict, ('node', 'owner', 'profile_pic_url'), ('user', 'profile_pic_url'), expected_type=url_or_none),
+ 'id': traverse_obj(comment_dict, ('node', 'id'), 'pk'),
+ 'text': traverse_obj(comment_dict, ('node', 'text'), 'text'),
+ 'like_count': traverse_obj(comment_dict, ('node', 'edge_liked_by', 'count'), 'comment_like_count', expected_type=int_or_none),
+ 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), 'created_at', expected_type=int_or_none),
+ }
+
class InstagramIOSIE(InfoExtractor):
IE_DESC = 'IOS instagram:// URL'
'comment_count': int,
'comments': list,
},
- 'add_ie': ['Instagram']
+ 'add_ie': ['Instagram'],
}]
def _real_extract(self, url):
class InstagramIE(InstagramBaseIE):
- _VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+ _VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reels?(?!/audio/))/(?P<id>[^/?#&]+))'
_EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1']
_TESTS = [{
'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
'title': 'Video by naomipq',
'description': 'md5:1f17f0ab29bd6fe2bfad705f58de3cb8',
'thumbnail': r're:^https?://.*\.jpg',
- 'duration': 0,
+ 'duration': 8.747,
'timestamp': 1371748545,
'upload_date': '20130620',
'uploader_id': '2815873',
'comment_count': int,
'comments': list,
},
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
- # missing description
- 'url': 'https://www.instagram.com/p/BA-pQFBG8HZ/?taken-by=britneyspears',
+ # reel
+ 'url': 'https://www.instagram.com/reel/Chunk8-jurw/',
+ 'md5': 'f6d8277f74515fa3ff9f5791426e42b1',
'info_dict': {
- 'id': 'BA-pQFBG8HZ',
+ 'id': 'Chunk8-jurw',
'ext': 'mp4',
- 'title': 'Video by britneyspears',
+ 'title': 'Video by instagram',
+ 'description': 'md5:c9cde483606ed6f80fbe9283a6a2b290',
'thumbnail': r're:^https?://.*\.jpg',
- 'duration': 0,
- 'timestamp': 1453760977,
- 'upload_date': '20160125',
- 'uploader_id': '12246775',
- 'uploader': 'Britney Spears',
- 'channel': 'britneyspears',
+ 'duration': 5.016,
+ 'timestamp': 1661529231,
+ 'upload_date': '20220826',
+ 'uploader_id': '25025320',
+ 'uploader': 'Instagram',
+ 'channel': 'instagram',
'like_count': int,
'comment_count': int,
'comments': list,
},
- 'params': {
- 'skip_download': True,
- },
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
# multi video post
'url': 'https://www.instagram.com/p/BQ0eAlwhDrw/',
'id': 'BQ0dSaohpPW',
'ext': 'mp4',
'title': 'Video 1',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}, {
'info_dict': {
'id': 'BQ0dTpOhuHT',
'ext': 'mp4',
'title': 'Video 2',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}, {
'info_dict': {
'id': 'BQ0dT7RBFeF',
'ext': 'mp4',
'title': 'Video 3',
+ 'thumbnail': r're:^https?://.*\.jpg',
+ 'view_count': int,
},
}],
'info_dict': {
'title': 'Post by instagram',
'description': 'md5:0f9203fc6a2ce4d228da5754bcf54957',
},
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
# IGTV
'url': 'https://www.instagram.com/tv/BkfuX9UB-eK/',
'comment_count': int,
'comments': list,
'description': 'Meet Cass Hirst (@cass.fb), a fingerboarding pro who can perform tiny ollies and kickflips while blindfolded.',
- }
+ },
+ 'expected_warnings': [
+ 'General metadata extraction failed',
+ 'Main webpage is locked behind the login page',
+ ],
}, {
'url': 'https://instagram.com/p/-Cmh1cukG2/',
'only_matching': True,
}, {
'url': 'https://www.instagram.com/marvelskies.fc/reel/CWqAgUZgCku/',
'only_matching': True,
+ }, {
+ 'url': 'https://www.instagram.com/reels/Cop84x6u7CP/',
+ 'only_matching': True,
}]
@classmethod
video_id, url = self._match_valid_url(url).group('id', 'url')
media, webpage = {}, ''
+ if self._get_cookies(url).get('sessionid'):
+ info = traverse_obj(self._download_json(
+ f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/info/', video_id,
+ fatal=False, errnote='Video info extraction failed',
+ note='Downloading video info', headers=self._API_HEADERS), ('items', 0))
+ if info:
+ media.update(info)
+ return self._extract_product(media)
+
api_check = self._download_json(
f'{self._API_BASE_URL}/web/get_ruling_for_content/?content_type=MEDIA&target_id={_id_to_pk(video_id)}',
video_id, headers=self._API_HEADERS, fatal=False, note='Setting up session', errnote=False) or {}
if not csrf_token:
self.report_warning('No csrf token set by Instagram API', video_id)
- elif api_check.get('status') != 'ok':
- self.report_warning('Instagram API is not granting access', video_id)
else:
- if self._get_cookies(url).get('sessionid'):
- media = traverse_obj(self._download_json(
- f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/info/', video_id,
- fatal=False, note='Downloading video info', headers={
- **self._API_HEADERS,
- 'X-CSRFToken': csrf_token.value,
- }), ('items', 0))
- if media:
- return self._extract_product(media)
-
- variables = {
- 'shortcode': video_id,
- 'child_comment_count': 3,
- 'fetch_comment_count': 40,
- 'parent_comment_count': 24,
- 'has_threaded_comments': True,
- }
- general_info = self._download_json(
- 'https://www.instagram.com/graphql/query/', video_id, fatal=False,
- headers={
- **self._API_HEADERS,
- 'X-CSRFToken': csrf_token.value,
- 'X-Requested-With': 'XMLHttpRequest',
- 'Referer': url,
- }, query={
- 'query_hash': '9f8827793ef34641b2fb195d4d41151c',
- 'variables': json.dumps(variables, separators=(',', ':')),
- })
- media = traverse_obj(general_info, ('data', 'shortcode_media'))
-
- if not media:
+ csrf_token = csrf_token.value if api_check.get('status') == 'ok' else None
+ if not csrf_token:
+ self.report_warning('Instagram API is not granting access', video_id)
+
+ variables = {
+ 'shortcode': video_id,
+ 'child_comment_count': 3,
+ 'fetch_comment_count': 40,
+ 'parent_comment_count': 24,
+ 'has_threaded_comments': True,
+ }
+ general_info = self._download_json(
+ 'https://www.instagram.com/graphql/query/', video_id, fatal=False, errnote=False,
+ headers={
+ **self._API_HEADERS,
+ 'X-CSRFToken': csrf_token or '',
+ 'X-Requested-With': 'XMLHttpRequest',
+ 'Referer': url,
+ }, query={
+ 'query_hash': '9f8827793ef34641b2fb195d4d41151c',
+ 'variables': json.dumps(variables, separators=(',', ':')),
+ })
+ media.update(traverse_obj(general_info, ('data', 'shortcode_media')) or {})
+
+ if not general_info:
self.report_warning('General metadata extraction failed (some metadata might be missing).', video_id)
webpage, urlh = self._download_webpage_handle(url, video_id)
shared_data = self._search_json(
- r'window\._sharedData\s*=', webpage, 'shared data', video_id, fatal=False)
+ r'window\._sharedData\s*=', webpage, 'shared data', video_id, fatal=False) or {}
- if self._LOGIN_URL not in urlh.geturl():
+ if shared_data and self._LOGIN_URL not in urlh.url:
media.update(traverse_obj(
shared_data, ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
('entry_data', 'PostPage', 0, 'media'), expected_type=dict) or {})
else:
- self.report_warning('Main webpage is locked behind the login page. Retrying with embed webpage')
+ self.report_warning('Main webpage is locked behind the login page. Retrying with embed webpage (some metadata might be missing).')
webpage = self._download_webpage(
f'{url}/embed/', video_id, note='Downloading embed webpage', fatal=False)
additional_data = self._search_json(
- r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*', webpage, 'additional data', video_id, fatal=False)
- if not additional_data:
- self.raise_login_required('Requested content was not found, the content might be private')
+ r'window\.__additionalDataLoaded\s*\(\s*[^,]+,', webpage, 'additional data', video_id, fatal=False)
+ if not additional_data and not media:
+ self.raise_login_required('Requested content is not available, rate-limit reached or login required')
product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
if product_item:
dash = traverse_obj(media, ('dash_info', 'video_dash_manifest'))
if dash:
formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash'))
- self._sort_formats(formats)
comment_data = traverse_obj(media, ('edge_media_to_parent_comment', 'edges'))
comments = [{
return {
'id': video_id,
'formats': formats,
- 'title': media.get('title') or 'Video by %s' % username,
+ 'title': media.get('title') or f'Video by {username}',
'description': description,
'duration': float_or_none(media.get('video_duration')),
'timestamp': traverse_obj(media, 'taken_at_timestamp', 'date', expected_type=int_or_none),
'thumbnails': thumbnails,
'http_headers': {
'Referer': 'https://www.instagram.com/',
- }
+ },
}
gis_tmpls = [self._gis_tmpl]
else:
gis_tmpls = [
- '%s' % rhx_gis,
+ f'{rhx_gis}',
'',
- '%s:%s' % (rhx_gis, csrf_token),
- '%s:%s:%s' % (rhx_gis, csrf_token, self.get_param('http_headers')['User-Agent']),
+ f'{rhx_gis}:{csrf_token}',
+ '{}:{}:{}'.format(rhx_gis, csrf_token, self.get_param('http_headers')['User-Agent']),
]
# try all of the ways to generate a GIS query, and not only use the
try:
json_data = self._download_json(
'https://www.instagram.com/graphql/query/', uploader_id,
- 'Downloading JSON page %d' % page_num, headers={
+ f'Downloading JSON page {page_num}', headers={
'X-Requested-With': 'XMLHttpRequest',
'X-Instagram-GIS': hashlib.md5(
- ('%s:%s' % (gis_tmpl, variables)).encode('utf-8')).hexdigest(),
+ (f'{gis_tmpl}:{variables}').encode()).hexdigest(),
}, query={
'query_hash': self._QUERY_HASH,
'variables': variables,
except ExtractorError as e:
# if it's an error caused by a bad query, and there are
# more GIS templates to try, ignore it and keep trying
- if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403:
+ if isinstance(e.cause, HTTPError) and e.cause.status == 403:
if gis_tmpl != gis_tmpls[-1]:
continue
raise
class InstagramUserIE(InstagramPlaylistBaseIE):
+ _WORKING = False
_VALID_URL = r'https?://(?:www\.)?instagram\.com/(?P<id>[^/]{2,})/?(?:$|[?#])'
IE_DESC = 'Instagram user profile'
IE_NAME = 'instagram:user'
'extract_flat': True,
'skip_download': True,
'playlistend': 5,
- }
+ },
}]
- _QUERY_HASH = '42323d64886122307be10013ad2dcc44',
+ _QUERY_HASH = ('42323d64886122307be10013ad2dcc44',)
@staticmethod
def _parse_timeline_from(data):
# returns a dictionary of variables to add to the timeline query based
# on the GraphQL of the original page
return {
- 'id': data['entry_data']['ProfilePage'][0]['graphql']['user']['id']
+ 'id': data['entry_data']['ProfilePage'][0]['graphql']['user']['id'],
}
'extract_flat': True,
'skip_download': True,
'playlistend': 50,
- }
+ },
}]
- _QUERY_HASH = 'f92f56d47dc7a55b606908374b43a314',
+ _QUERY_HASH = ('f92f56d47dc7a55b606908374b43a314',)
@staticmethod
def _parse_timeline_from(data):
# on the GraphQL of the original page
return {
'tag_name':
- data['entry_data']['TagPage'][0]['graphql']['hashtag']['name']
+ data['entry_data']['TagPage'][0]['graphql']['hashtag']['name'],
}
'id': '18090946048123978',
'title': 'Rare',
},
- 'playlist_mincount': 50
+ 'playlist_mincount': 50,
}]
def _real_extract(self, url):
user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False)
if not user_info:
self.raise_login_required('This content is unreachable')
- user_id = user_info.get('id')
+ user_id = traverse_obj(user_info, 'pk', 'id', expected_type=str)
story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}'
+ if not story_info_url: # user id is only mandatory for non-highlights
+ raise ExtractorError('Unable to extract user id')
+
videos = traverse_obj(self._download_json(
f'{self._API_BASE_URL}/feed/reels_media/?reel_ids={story_info_url}',
story_id, errnote=False, fatal=False, headers=self._API_HEADERS), 'reels')
if not videos:
self.raise_login_required('You need to log in to access this content')
- full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name'))
+ full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (user_id, 'user', 'full_name'))
story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title'))
if not story_title:
story_title = f'Story by {username}'
- highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items'))
+ highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (user_id, 'items'))
info_data = []
for highlight in highlights:
highlight_data = self._extract_product(highlight)
if highlight_data.get('formats'):
info_data.append({
- **highlight_data,
'uploader': full_name,
'uploader_id': user_id,
+ **filter_dict(highlight_data),
})
return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)