-# coding: utf-8
-
-import itertools
import hashlib
+import itertools
import json
import re
import time
+import urllib.error
from .common import InfoExtractor
-from ..compat import (
- compat_HTTPError,
-)
from ..utils import (
ExtractorError,
- format_field,
+ decode_base_n,
+ encode_base_n,
float_or_none,
+ format_field,
get_element_by_attribute,
int_or_none,
lowercase_escape,
urlencode_postdata,
)
+_ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
+
+
+def _pk_to_id(id):
+ """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
+ return encode_base_n(int(id.split('_')[0]), table=_ENCODING_CHARS)
+
+
+def _id_to_pk(shortcode):
+ """Covert a shortcode to a numeric value"""
+ return decode_base_n(shortcode[:11], table=_ENCODING_CHARS)
+
class InstagramBaseIE(InfoExtractor):
_NETRC_MACHINE = 'instagram'
if isinstance(product_info, list):
product_info = product_info[0]
+ comment_data = traverse_obj(product_info, ('edge_media_to_parent_comment', 'edges'))
+ comments = [{
+ 'author': traverse_obj(comment_dict, ('node', 'owner', 'username')),
+ 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')),
+ 'id': traverse_obj(comment_dict, ('node', 'id')),
+ 'text': traverse_obj(comment_dict, ('node', 'text')),
+ 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none),
+ } for comment_dict in comment_data] if comment_data else None
+
user_info = product_info.get('user') or {}
info_dict = {
'id': product_info.get('code') or product_info.get('id'),
'view_count': int_or_none(product_info.get('view_count')),
'like_count': int_or_none(product_info.get('like_count')),
'comment_count': int_or_none(product_info.get('comment_count')),
+ 'comments': comments,
'http_headers': {
'Referer': 'https://www.instagram.com/',
}
'add_ie': ['Instagram']
}]
- def _get_id(self, id):
- """Source: https://stackoverflow.com/questions/24437823/getting-instagram-post-url-from-media-id"""
- chrs = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
- media_id = int(id.split('_')[0])
- shortened_id = ''
- while media_id > 0:
- r = media_id % 64
- media_id = (media_id - r) // 64
- shortened_id = chrs[r] + shortened_id
- return shortened_id
-
def _real_extract(self, url):
- return {
- '_type': 'url_transparent',
- 'url': f'http://instagram.com/tv/{self._get_id(self._match_id(url))}/',
- 'ie_key': 'Instagram',
- }
+ video_id = _pk_to_id(self._match_id(url))
+ return self.url_result(f'http://instagram.com/tv/{video_id}', InstagramIE, video_id)
class InstagramIE(InstagramBaseIE):
_VALID_URL = r'(?P<url>https?://(?:www\.)?instagram\.com(?:/[^/]+)?/(?:p|tv|reel)/(?P<id>[^/?#&]+))'
+ _EMBED_REGEX = [r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1']
_TESTS = [{
'url': 'https://instagram.com/p/aye83DjauH/?foo=bar#abc',
'md5': '0d2da106a9d2631273e192b372806516',
'only_matching': True,
}]
- @staticmethod
- def _extract_embed_url(webpage):
- mobj = re.search(
- r'<iframe[^>]+src=(["\'])(?P<url>(?:https?:)?//(?:www\.)?instagram\.com/p/[^/]+/embed.*?)\1',
- webpage)
- if mobj:
- return mobj.group('url')
-
- blockquote_el = get_element_by_attribute(
- 'class', 'instagram-media', webpage)
- if blockquote_el is None:
- return
+ @classmethod
+ def _extract_embed_urls(cls, url, webpage):
+ res = tuple(super()._extract_embed_urls(url, webpage))
+ if res:
+ return res
- mobj = re.search(
- r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1', blockquote_el)
+ mobj = re.search(r'<a[^>]+href=([\'"])(?P<link>[^\'"]+)\1',
+ get_element_by_attribute('class', 'instagram-media', webpage) or '')
if mobj:
- return mobj.group('link')
+ return [mobj.group('link')]
def _real_extract(self, url):
video_id, url = self._match_valid_url(url).group('id', 'url')
- webpage, urlh = self._download_webpage_handle(url, video_id)
- if 'www.instagram.com/accounts/login' in urlh.geturl():
- self.report_warning('Main webpage is locked behind the login page. '
- 'Retrying with embed webpage (Note that some metadata might be missing)')
- webpage = self._download_webpage(
- 'https://www.instagram.com/p/%s/embed/' % video_id, video_id, note='Downloading embed webpage')
-
- shared_data = self._parse_json(
- self._search_regex(
- r'window\._sharedData\s*=\s*({.+?});',
- webpage, 'shared data', default='{}'),
- video_id, fatal=False)
- media = traverse_obj(
- shared_data,
- ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'),
- ('entry_data', 'PostPage', 0, 'media'),
- expected_type=dict)
-
- # _sharedData.entry_data.PostPage is empty when authenticated (see
- # https://github.com/ytdl-org/youtube-dl/pull/22880)
+ general_info = self._download_json(
+ f'https://www.instagram.com/graphql/query/?query_hash=9f8827793ef34641b2fb195d4d41151c'
+ f'&variables=%7B"shortcode":"{video_id}",'
+ '"parent_comment_count":10,"has_threaded_comments":true}', video_id, fatal=False, errnote=False,
+ headers={
+ 'Accept': '*',
+ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
+ 'Authority': 'www.instagram.com',
+ 'Referer': 'https://www.instagram.com',
+ 'x-ig-app-id': '936619743392459',
+ })
+ media = traverse_obj(general_info, ('data', 'shortcode_media')) or {}
if not media:
- additional_data = self._parse_json(
- self._search_regex(
- r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*({.+?})\s*\);',
- webpage, 'additional data', default='{}'),
- video_id, fatal=False)
- product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
- if product_item:
- return self._extract_product(product_item)
- media = traverse_obj(additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}
-
- if not media and 'www.instagram.com/accounts/login' in urlh.geturl():
- self.raise_login_required('You need to log in to access this content')
+ self.report_warning('General metadata extraction failed', video_id)
+
+ info = self._download_json(
+ f'https://i.instagram.com/api/v1/media/{_id_to_pk(video_id)}/info/', video_id,
+ fatal=False, note='Downloading video info', errnote=False, headers={
+ 'Accept': '*',
+ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
+ 'Authority': 'www.instagram.com',
+ 'Referer': 'https://www.instagram.com',
+ 'x-ig-app-id': '936619743392459',
+ })
+ if info:
+ media.update(info['items'][0])
+ return self._extract_product(media)
+
+ webpage = self._download_webpage(
+ f'https://www.instagram.com/p/{video_id}/embed/', video_id,
+ note='Downloading embed webpage', fatal=False)
+ if not webpage:
+ self.raise_login_required('Requested content was not found, the content might be private')
+
+ additional_data = self._search_json(
+ r'window\.__additionalDataLoaded\s*\(\s*[^,]+,\s*', webpage, 'additional data', video_id, fatal=False)
+ product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict)
+ if product_item:
+ media.update(product_item)
+ return self._extract_product(media)
+
+ media.update(traverse_obj(
+ additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {})
username = traverse_obj(media, ('owner', 'username')) or self._search_regex(
r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False)
if nodes:
return self.playlist_result(
self._extract_nodes(nodes, True), video_id,
- format_field(username, template='Post by %s'), description)
+ format_field(username, None, 'Post by %s'), description)
video_url = self._og_search_video_url(webpage, secure=False)
except ExtractorError as e:
# if it's an error caused by a bad query, and there are
# more GIS templates to try, ignore it and keep trying
- if isinstance(e.cause, compat_HTTPError) and e.cause.code == 403:
+ if isinstance(e.cause, urllib.error.HTTPError) and e.cause.code == 403:
if gis_tmpl != gis_tmpls[-1]:
continue
raise
def _real_extract(self, url):
username, story_id = self._match_valid_url(url).groups()
-
- story_info_url = f'{username}/{story_id}/?__a=1' if username == 'highlights' else f'{username}/?__a=1'
- story_info = self._download_json(f'https://www.instagram.com/stories/{story_info_url}', story_id, headers={
- 'X-IG-App-ID': 936619743392459,
- 'X-ASBD-ID': 198387,
- 'X-IG-WWW-Claim': 0,
- 'X-Requested-With': 'XMLHttpRequest',
- 'Referer': url,
- })
- user_id = story_info['user']['id']
- highlight_title = traverse_obj(story_info, ('highlight', 'title'))
+ story_info = self._download_webpage(url, story_id)
+ user_info = self._search_json(r'"user":', story_info, 'user info', story_id, fatal=False)
+ if not user_info:
+ self.raise_login_required('This content is unreachable')
+ user_id = user_info.get('id')
story_info_url = user_id if username != 'highlights' else f'highlight:{story_id}'
- videos = self._download_json(f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}', story_id, headers={
- 'X-IG-App-ID': 936619743392459,
- 'X-ASBD-ID': 198387,
- 'X-IG-WWW-Claim': 0,
- })['reels']
-
- full_name = traverse_obj(videos, ('user', 'full_name'))
-
- user_info = {}
- if not (username and username != 'highlights' and full_name):
- user_info = self._download_json(
- f'https://i.instagram.com/api/v1/users/{user_id}/info/', story_id, headers={
- 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SM-A505F Build/RP1A.200720.012; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/96.0.4664.45 Mobile Safari/537.36 Instagram 214.1.0.29.120 Android (30/11; 450dpi; 1080x2122; samsung; SM-A505F; a50; exynos9610; en_US; 333717274)',
- }, note='Downloading user info')
+ videos = traverse_obj(self._download_json(
+ f'https://i.instagram.com/api/v1/feed/reels_media/?reel_ids={story_info_url}',
+ story_id, errnote=False, fatal=False, headers={
+ 'X-IG-App-ID': 936619743392459,
+ 'X-ASBD-ID': 198387,
+ 'X-IG-WWW-Claim': 0,
+ }), 'reels')
+ if not videos:
+ self.raise_login_required('You need to log in to access this content')
- username = traverse_obj(user_info, ('user', 'username')) or username
- full_name = traverse_obj(user_info, ('user', 'full_name')) or full_name
+ full_name = traverse_obj(videos, (f'highlight:{story_id}', 'user', 'full_name'), (str(user_id), 'user', 'full_name'))
+ story_title = traverse_obj(videos, (f'highlight:{story_id}', 'title'))
+ if not story_title:
+ story_title = f'Story by {username}'
highlights = traverse_obj(videos, (f'highlight:{story_id}', 'items'), (str(user_id), 'items'))
- return self.playlist_result([{
- **self._extract_product(highlight),
- 'title': f'Story by {username}',
- 'uploader': full_name,
- 'uploader_id': user_id,
- } for highlight in highlights], playlist_id=story_id, playlist_title=highlight_title)
+ info_data = []
+ for highlight in highlights:
+ highlight_data = self._extract_product(highlight)
+ if highlight_data.get('formats'):
+ info_data.append({
+ **highlight_data,
+ 'uploader': full_name,
+ 'uploader_id': user_id,
+ })
+ return self.playlist_result(info_data, playlist_id=story_id, playlist_title=story_title)