import base64
import collections
+import functools
import getpass
import hashlib
import http.client
import urllib.request
import xml.etree.ElementTree
-from ..compat import functools # isort: split
from ..compat import (
compat_etree_fromstring,
compat_expanduser,
determine_ext,
dict_get,
encode_data_uri,
- error_to_compat_str,
extract_attributes,
filter_dict,
fix_xml_ampersands,
self._x_forwarded_for_ip = GeoUtils.random_ipv4(country_code)
if self._x_forwarded_for_ip:
self.report_warning(
- 'Video is geo restricted. Retrying extraction with fake IP %s (%s) as X-Forwarded-For.'
- % (self._x_forwarded_for_ip, country_code.upper()))
+ 'Video is geo restricted. Retrying extraction with fake IP '
+ f'{self._x_forwarded_for_ip} ({country_code.upper()}) as X-Forwarded-For.')
return True
return False
if not self._downloader._first_webpage_request:
sleep_interval = self.get_param('sleep_interval_requests') or 0
if sleep_interval > 0:
- self.to_screen('Sleeping %s seconds ...' % sleep_interval)
+ self.to_screen(f'Sleeping {sleep_interval} seconds ...')
time.sleep(sleep_interval)
else:
self._downloader._first_webpage_request = False
if errnote is None:
errnote = 'Unable to download webpage'
- errmsg = f'{errnote}: {error_to_compat_str(err)}'
+ errmsg = f'{errnote}: {err}'
if fatal:
raise ExtractorError(errmsg, cause=err)
else:
if urlh is False:
assert not fatal
return False
- content = self._webpage_read_content(urlh, url_or_request, video_id, note, errnote, fatal, encoding=encoding)
+ content = self._webpage_read_content(urlh, url_or_request, video_id, note, errnote, fatal,
+ encoding=encoding, data=data)
return (content, urlh)
@staticmethod
r'<iframe src="([^"]+)"', content,
'Websense information URL', default=None)
if blocked_iframe:
- msg += ' Visit %s for more details' % blocked_iframe
+ msg += f' Visit {blocked_iframe} for more details'
raise ExtractorError(msg, expected=True)
if '<title>The URL you requested has been blocked</title>' in first_block:
msg = (
r'</h1><p>(.*?)</p>',
content, 'block message', default=None)
if block_msg:
- msg += ' (Message: "%s")' % block_msg.replace('\n', ' ')
+ msg += ' (Message: "{}")'.format(block_msg.replace('\n', ' '))
raise ExtractorError(msg, expected=True)
if ('<title>TTK :: Доступ к ресурсу ограничен</title>' in content
and 'blocklist.rkn.gov.ru' in content):
'Visit http://blocklist.rkn.gov.ru/ for a block reason.',
expected=True)
- def _request_dump_filename(self, url, video_id):
- basen = f'{video_id}_{url}'
+ def _request_dump_filename(self, url, video_id, data=None):
+ if data is not None:
+ data = hashlib.md5(data).hexdigest()
+ basen = join_nonempty(video_id, data, url, delim='_')
trim_length = self.get_param('trim_file_name') or 240
if len(basen) > trim_length:
- h = '___' + hashlib.md5(basen.encode('utf-8')).hexdigest()
+ h = '___' + hashlib.md5(basen.encode()).hexdigest()
basen = basen[:trim_length - len(h)] + h
filename = sanitize_filename(f'{basen}.dump', restricted=True)
# Working around MAX_PATH limitation on Windows (see
except LookupError:
return webpage_bytes.decode('utf-8', 'replace')
- def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errnote=None, fatal=True, prefix=None, encoding=None):
+ def _webpage_read_content(self, urlh, url_or_request, video_id, note=None, errnote=None, fatal=True,
+ prefix=None, encoding=None, data=None):
webpage_bytes = urlh.read()
if prefix is not None:
webpage_bytes = prefix + webpage_bytes
dump = base64.b64encode(webpage_bytes).decode('ascii')
self._downloader.to_screen(dump)
if self.get_param('write_pages'):
- filename = self._request_dump_filename(urlh.url, video_id)
+ if isinstance(url_or_request, Request):
+ data = self._create_request(url_or_request, data).data
+ filename = self._request_dump_filename(urlh.url, video_id, data)
self.to_screen(f'Saving request to {filename}')
with open(filename, 'wb') as outf:
outf.write(webpage_bytes)
if transform_source:
xml_string = transform_source(xml_string)
try:
- return compat_etree_fromstring(xml_string.encode('utf-8'))
+ return compat_etree_fromstring(xml_string.encode())
except xml.etree.ElementTree.ParseError as ve:
self.__print_error('Failed to parse XML' if errnote is None else errnote, fatal, video_id, ve)
impersonate=None, require_impersonation=False):
if self.get_param('load_pages'):
url_or_request = self._create_request(url_or_request, data, headers, query)
- filename = self._request_dump_filename(url_or_request.url, video_id)
+ filename = self._request_dump_filename(url_or_request.url, video_id, url_or_request.data)
self.to_screen(f'Loading request from {filename}')
try:
with open(filename, 'rb') as dumpf:
def report_extraction(self, id_or_name):
"""Report information extraction."""
- self.to_screen('%s: Extracting information' % id_or_name)
+ self.to_screen(f'{id_or_name}: Extracting information')
def report_download_webpage(self, video_id):
"""Report webpage download."""
- self.to_screen('%s: Downloading webpage' % video_id)
+ self.to_screen(f'{video_id}: Downloading webpage')
def report_age_confirmation(self):
"""Report attempt to confirm age."""
elif default is not NO_DEFAULT:
return default
elif fatal:
- raise RegexNotFoundError('Unable to extract %s' % _name)
+ raise RegexNotFoundError(f'Unable to extract {_name}')
else:
- self.report_warning('unable to extract %s' % _name + bug_reports_message())
+ self.report_warning(f'unable to extract {_name}' + bug_reports_message())
return None
def _search_json(self, start_pattern, string, name, video_id, *, end_pattern='',
if tfa is not None:
return tfa
- return getpass.getpass('Type %s and press [Return]: ' % note)
+ return getpass.getpass(f'Type {note} and press [Return]: ')
# Helper functions for extracting OpenGraph info
@staticmethod
def _og_regexes(prop):
content_re = r'content=(?:"([^"]+?)"|\'([^\']+?)\'|\s*([^\s"\'=<>`]+?)(?=\s|/?>))'
- property_re = (r'(?:name|property)=(?:\'og%(sep)s%(prop)s\'|"og%(sep)s%(prop)s"|\s*og%(sep)s%(prop)s\b)'
- % {'prop': re.escape(prop), 'sep': '(?::|[:-])'})
+ property_re = r'(?:name|property)=(?:\'og{sep}{prop}\'|"og{sep}{prop}"|\s*og{sep}{prop}\b)'.format(
+ prop=re.escape(prop), sep='(?::|[:-])')
template = r'<meta[^>]+?%s[^>]+?%s'
return [
template % (property_re, content_re),
@staticmethod
def _meta_regex(prop):
- return r'''(?isx)<meta
- (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?)%s\1)
- [^>]+?content=(["\'])(?P<content>.*?)\2''' % re.escape(prop)
+ return rf'''(?isx)<meta
+ (?=[^>]+(?:itemprop|name|property|id|http-equiv)=(["\']?){re.escape(prop)}\1)
+ [^>]+?content=(["\'])(?P<content>.*?)\2'''
def _og_search_property(self, prop, html, name=None, **kargs):
prop = variadic(prop)
if name is None:
- name = 'OpenGraph %s' % prop[0]
+ name = f'OpenGraph {prop[0]}'
og_regexes = []
for p in prop:
og_regexes.extend(self._og_regexes(p))
elif fatal:
raise RegexNotFoundError('Unable to extract JSON-LD')
else:
- self.report_warning('unable to extract JSON-LD %s' % bug_reports_message())
+ self.report_warning(f'unable to extract JSON-LD {bug_reports_message()}')
return {}
def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
}
def is_type(e, *expected_types):
- type = variadic(traverse_obj(e, '@type'))
- return any(x in type for x in expected_types)
+ type_ = variadic(traverse_obj(e, '@type'))
+ return any(x in type_ for x in expected_types)
def extract_interaction_type(e):
interaction_type = e.get('interactionType')
count_kind = INTERACTION_TYPE_MAP.get(interaction_type.split('/')[-1])
if not count_kind:
continue
- count_key = '%s_count' % count_kind
+ count_key = f'{count_kind}_count'
if info.get(count_key) is not None:
continue
info[count_key] = interaction_count
'end_time': part.get('endOffset'),
} for part in variadic(e.get('hasPart') or []) if part.get('@type') == 'Clip']
for idx, (last_c, current_c, next_c) in enumerate(zip(
- [{'end_time': 0}] + chapters, chapters, chapters[1:])):
+ [{'end_time': 0}, *chapters], chapters, chapters[1:])):
current_c['end_time'] = current_c['end_time'] or next_c['start_time']
current_c['start_time'] = current_c['start_time'] or last_c['end_time']
if None in current_c.values():
def _hidden_inputs(html):
html = re.sub(r'<!--(?:(?!<!--).)*-->', '', html)
hidden_inputs = {}
- for input in re.findall(r'(?i)(<input[^>]+>)', html):
- attrs = extract_attributes(input)
- if not input:
+ for input_el in re.findall(r'(?i)(<input[^>]+>)', html):
+ attrs = extract_attributes(input_el)
+ if not input_el:
continue
if attrs.get('type') not in ('hidden', 'submit'):
continue
def _form_hidden_inputs(self, form_id, html):
form = self._search_regex(
- r'(?is)<form[^>]+?id=(["\'])%s\1[^>]*>(?P<form>.+?)</form>' % form_id,
- html, '%s form' % form_id, group='form')
+ rf'(?is)<form[^>]+?id=(["\']){form_id}\1[^>]*>(?P<form>.+?)</form>',
+ html, f'{form_id} form', group='form')
return self._hidden_inputs(form)
@classproperty(cache=True)
formats[:] = filter(
lambda f: self._is_valid_url(
f['url'], video_id,
- item='%s video format' % f.get('format_id') if f.get('format_id') else 'video'),
+ item='{} video format'.format(f.get('format_id')) if f.get('format_id') else 'video'),
formats)
@staticmethod
def _is_valid_url(self, url, video_id, item='video', headers={}):
url = self._proto_relative_url(url, scheme='http:')
# For now assume non HTTP(S) URLs always valid
- if not (url.startswith('http://') or url.startswith('https://')):
+ if not url.startswith(('http://', 'https://')):
return True
try:
- self._request_webpage(url, video_id, 'Checking %s URL' % item, headers=headers)
+ self._request_webpage(url, video_id, f'Checking {item} URL', headers=headers)
return True
except ExtractorError as e:
self.to_screen(
- '%s: %s URL is invalid, skipping: %s'
- % (video_id, item, error_to_compat_str(e.cause)))
+ f'{video_id}: {item} URL is invalid, skipping: {e.cause!s}')
return False
def http_scheme(self):
# currently yt-dlp cannot decode the playerVerificationChallenge as Akamai uses Adobe Alchemy
akamai_pv = manifest.find('{http://ns.adobe.com/f4m/1.0}pv-2.0')
if akamai_pv is not None and ';' in akamai_pv.text:
- playerVerificationChallenge = akamai_pv.text.split(';')[0]
- if playerVerificationChallenge.strip() != '':
+ player_verification_challenge = akamai_pv.text.split(';')[0]
+ if player_verification_challenge.strip() != '':
return []
formats = []
if not media_url:
continue
manifest_url = (
- media_url if media_url.startswith('http://') or media_url.startswith('https://')
+ media_url if media_url.startswith(('http://', 'https://'))
else ((manifest_base_url or '/'.join(manifest_url.split('/')[:-1])) + '/' + media_url))
# If media_url is itself a f4m manifest do the recursive extraction
# since bitrates in parent manifest (this one) and media_url manifest
def _report_ignoring_subs(self, name):
self.report_warning(bug_reports_message(
f'Ignoring subtitle tracks found in the {name} manifest; '
- 'if any subtitle tracks are missing,'
+ 'if any subtitle tracks are missing,',
), only_once=True)
def _extract_m3u8_formats(self, *args, **kwargs):
formats = [{
'format_id': join_nonempty(m3u8_id, idx),
'format_index': idx,
- 'url': m3u8_url or encode_data_uri(m3u8_doc.encode('utf-8'), 'application/x-mpegurl'),
+ 'url': m3u8_url or encode_data_uri(m3u8_doc.encode(), 'application/x-mpegurl'),
'ext': ext,
'protocol': entry_protocol,
'preference': preference,
if not c or c == '.':
out.append(c)
else:
- out.append('{%s}%s' % (namespace, c))
+ out.append(f'{{{namespace}}}{c}')
return '/'.join(out)
def _extract_smil_formats_and_subtitles(self, smil_url, video_id, fatal=True, f4m_params=None, transform_source=None):
})
continue
- src_url = src if src.startswith('http') else urllib.parse.urljoin(base, src)
+ src_url = src if src.startswith('http') else urllib.parse.urljoin(f'{base}/', src)
src_url = src_url.strip()
if proto == 'm3u8' or src_ext == 'm3u8':
imgs_count += 1
formats.append({
- 'format_id': 'imagestream-%d' % (imgs_count),
+ 'format_id': f'imagestream-{imgs_count}',
'url': src,
'ext': mimetype2ext(medium.get('type')),
'acodec': 'none',
def _parse_smil_subtitles(self, smil, namespace=None, subtitles_lang='en'):
urls = []
subtitles = {}
- for num, textstream in enumerate(smil.findall(self._xpath_ns('.//textstream', namespace))):
+ for textstream in smil.findall(self._xpath_ns('.//textstream', namespace)):
src = textstream.get('src')
if not src or src in urls:
continue
if subtitles and period['subtitles']:
self.report_warning(bug_reports_message(
'Found subtitles in multiple periods in the DASH manifest; '
- 'if part of the subtitles are missing,'
+ 'if part of the subtitles are missing,',
), only_once=True)
for sub_lang, sub_info in period['subtitles'].items():
elif mimetype2ext(mime_type) in ('tt', 'dfxp', 'ttml', 'xml', 'json'):
content_type = 'text'
else:
- self.report_warning('Unknown MIME type %s in DASH manifest' % mime_type)
+ self.report_warning(f'Unknown MIME type {mime_type} in DASH manifest')
continue
base_url = ''
'asr': int_or_none(representation_attrib.get('audioSamplingRate')),
'fps': int_or_none(representation_attrib.get('frameRate')),
'language': lang if lang not in ('mul', 'und', 'zxx', 'mis') else None,
- 'format_note': 'DASH %s' % content_type,
+ 'format_note': f'DASH {content_type}',
'filesize': filesize,
'container': mimetype2ext(mime_type) + '_dash',
- **codecs
+ **codecs,
}
elif content_type == 'text':
f = {
t += c
# Next, $...$ templates are translated to their
# %(...) counterparts to be used with % operator
- t = re.sub(r'\$(%s)\$' % '|'.join(identifiers), r'%(\1)d', t)
- t = re.sub(r'\$(%s)%%([^$]+)\$' % '|'.join(identifiers), r'%(\1)\2', t)
+ t = re.sub(r'\$({})\$'.format('|'.join(identifiers)), r'%(\1)d', t)
+ t = re.sub(r'\$({})%([^$]+)\$'.format('|'.join(identifiers)), r'%(\1)\2', t)
t.replace('$$', '$')
return t
'duration': float_or_none(segment_d, representation_ms_info['timescale']),
})
- for num, s in enumerate(representation_ms_info['s']):
+ for s in representation_ms_info['s']:
segment_time = s.get('t') or segment_time
segment_d = s['d']
add_segment_url()
segment_number += 1
- for r in range(s.get('r', 0)):
+ for _ in range(s.get('r', 0)):
segment_time += segment_d
add_segment_url()
segment_number += 1
timescale = representation_ms_info['timescale']
for s in representation_ms_info['s']:
duration = float_or_none(s['d'], timescale)
- for r in range(s.get('r', 0) + 1):
+ for _ in range(s.get('r', 0) + 1):
segment_uri = representation_ms_info['segment_urls'][segment_index]
fragments.append({
location_key(segment_uri): segment_uri,
fourcc = track.get('FourCC') or KNOWN_TAGS.get(track.get('AudioTag'))
# TODO: add support for WVC1 and WMAP
if fourcc not in ('H264', 'AVC1', 'AACL', 'TTML', 'EC-3'):
- self.report_warning('%s is not a supported codec' % fourcc)
+ self.report_warning(f'{fourcc} is not a supported codec')
continue
tbr = int(track.attrib['Bitrate']) // 1000
# [1] does not mention Width and Height attributes. However,
'fourcc': fourcc,
'language': stream_language,
'codec_private_data': track.get('CodecPrivateData'),
- }
+ },
})
elif stream_type in ('video', 'audio'):
formats.append({
_MEDIA_TAG_NAME_RE = r'(?:(?:amp|dl8(?:-live)?)-)?(video|audio)'
media_tags = [(media_tag, media_tag_name, media_type, '')
for media_tag, media_tag_name, media_type
- in re.findall(r'(?s)(<(%s)[^>]*/>)' % _MEDIA_TAG_NAME_RE, webpage)]
+ in re.findall(rf'(?s)(<({_MEDIA_TAG_NAME_RE})[^>]*/>)', webpage)]
media_tags.extend(re.findall(
# We only allow video|audio followed by a whitespace or '>'.
# Allowing more characters may end up in significant slow down (see
# https://github.com/ytdl-org/youtube-dl/issues/11979,
# e.g. http://www.porntrex.com/maps/videositemap.xml).
- r'(?s)(<(?P<tag>%s)(?:\s+[^>]*)?>)(.*?)</(?P=tag)>' % _MEDIA_TAG_NAME_RE, webpage))
+ rf'(?s)(<(?P<tag>{_MEDIA_TAG_NAME_RE})(?:\s+[^>]*)?>)(.*?)</(?P=tag)>', webpage))
for media_tag, _, media_type, media_content in media_tags:
media_info = {
'formats': [],
mobj = re.search(
r'(?:(?:http|rtmp|rtsp)(?P<s>s)?:)?(?P<url>//[^?]+)', url)
url_base = mobj.group('url')
- http_base_url = '%s%s:%s' % ('http', mobj.group('s') or '', url_base)
+ http_base_url = '{}{}:{}'.format('http', mobj.group('s') or '', url_base)
formats = []
def manifest_url(manifest):
m_url = f'{http_base_url}/{manifest}'
if query:
- m_url += '?%s' % query
+ m_url += f'?{query}'
return m_url
if 'm3u8' not in skip_protocols:
video_id, fatal=False)
for rtmp_format in rtmp_formats:
rtsp_format = rtmp_format.copy()
- rtsp_format['url'] = '%s/%s' % (rtmp_format['url'], rtmp_format['play_path'])
+ rtsp_format['url'] = '{}/{}'.format(rtmp_format['url'], rtmp_format['play_path'])
del rtsp_format['play_path']
del rtsp_format['ext']
rtsp_format.update({
return formats
def _find_jwplayer_data(self, webpage, video_id=None, transform_source=js_to_json):
- mobj = re.search(
- r'''(?s)jwplayer\s*\(\s*(?P<q>'|")(?!(?P=q)).+(?P=q)\s*\)(?!</script>).*?\.\s*setup\s*\(\s*(?P<options>(?:\([^)]*\)|[^)])+)\s*\)''',
- webpage)
- if mobj:
- try:
- jwplayer_data = self._parse_json(mobj.group('options'),
- video_id=video_id,
- transform_source=transform_source)
- except ExtractorError:
- pass
- else:
- if isinstance(jwplayer_data, dict):
- return jwplayer_data
+ return self._search_json(
+ r'''(?<!-)\bjwplayer\s*\(\s*(?P<q>'|")(?!(?P=q)).+(?P=q)\s*\)(?:(?!</script>).)*?\.\s*(?:setup\s*\(|(?P<load>load)\s*\(\s*\[)''',
+ webpage, 'JWPlayer data', video_id,
+ # must be a {...} or sequence, ending
+ contains_pattern=r'\{(?s:.*)}(?(load)(?:\s*,\s*\{(?s:.*)})*)', end_pattern=r'(?(load)\]|\))',
+ transform_source=transform_source, default=None)
- def _extract_jwplayer_data(self, webpage, video_id, *args, **kwargs):
+ def _extract_jwplayer_data(self, webpage, video_id, *args, transform_source=js_to_json, **kwargs):
jwplayer_data = self._find_jwplayer_data(
- webpage, video_id, transform_source=js_to_json)
+ webpage, video_id, transform_source=transform_source)
return self._parse_jwplayer_data(
jwplayer_data, video_id, *args, **kwargs)
mpd_id=mpd_id, rtmp_params=rtmp_params, base_url=base_url)
subtitles = {}
- tracks = video_data.get('tracks')
- if tracks and isinstance(tracks, list):
- for track in tracks:
- if not isinstance(track, dict):
- continue
- track_kind = track.get('kind')
- if not track_kind or not isinstance(track_kind, str):
- continue
- if track_kind.lower() not in ('captions', 'subtitles'):
- continue
- track_url = urljoin(base_url, track.get('file'))
- if not track_url:
- continue
- subtitles.setdefault(track.get('label') or 'en', []).append({
- 'url': self._proto_relative_url(track_url)
- })
+ for track in traverse_obj(video_data, (
+ 'tracks', lambda _, v: v['kind'].lower() in ('captions', 'subtitles'))):
+ track_url = urljoin(base_url, track.get('file'))
+ if not track_url:
+ continue
+ subtitles.setdefault(track.get('label') or 'en', []).append({
+ 'url': self._proto_relative_url(track_url),
+ })
entry = {
'id': this_video_id,
'tbr': int_or_none(source.get('bitrate'), scale=1000),
'filesize': int_or_none(source.get('filesize')),
'ext': ext,
- 'format_id': format_id
+ 'format_id': format_id,
}
if source_url.startswith('rtmp'):
a_format['ext'] = 'flv'
# See com/longtailvideo/jwplayer/media/RTMPMediaProvider.as
# of jwplayer.flash.swf
rtmp_url_parts = re.split(
- r'((?:mp4|mp3|flv):)', source_url, 1)
+ r'((?:mp4|mp3|flv):)', source_url, maxsplit=1)
if len(rtmp_url_parts) == 3:
rtmp_url, prefix, play_path = rtmp_url_parts
a_format.update({
continue
cookies = cookies.encode('iso-8859-1').decode('utf-8')
cookie_value = re.search(
- r'%s=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)' % cookie, cookies)
+ rf'{cookie}=(.+?);.*?\b[Dd]omain=(.+?)(?:[,;]|$)', cookies)
if cookie_value:
value, domain = cookie_value.groups()
self._set_cookie(domain, cookie, value)
desc += ' (**Currently broken**)' if markdown else ' (Currently broken)'
# Escape emojis. Ref: https://github.com/github/markup/issues/1153
- name = (' - **%s**' % re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME)) if markdown else cls.IE_NAME
+ name = (' - **{}**'.format(re.sub(r':(\w+:)', ':\u200B\\g<1>', cls.IE_NAME))) if markdown else cls.IE_NAME
return f'{name}:{desc}' if desc else name
def extract_subtitles(self, *args, **kwargs):
self.to_screen(f'Extracted {comment_count} comments')
return {
'comments': comments,
- 'comment_count': None if interrupted else comment_count
+ 'comment_count': None if interrupted else comment_count,
}
return extractor
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):
- all_known = all(map(
- lambda x: x is not None,
- (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted)))
+ all_known = all(
+ x is not None for x in
+ (is_private, needs_premium, needs_subscription, needs_auth, is_unlisted))
return (
'private' if is_private
else 'premium_only' if needs_premium
@classproperty
def _VALID_URL(cls):
- return r'%s(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)' % cls._SEARCH_KEY
+ return rf'{cls._SEARCH_KEY}(?P<prefix>|[1-9][0-9]*|all):(?P<query>[\s\S]+)'
def _real_extract(self, query):
prefix, query = self._match_valid_url(query).group('prefix', 'query')