import base64
import collections
+import getpass
import hashlib
+import http.client
+import http.cookiejar
+import http.cookies
import itertools
import json
import math
import random
import sys
import time
+import urllib.parse
+import urllib.request
import xml.etree.ElementTree
from ..compat import functools, re # isort: split
-from ..compat import (
- compat_cookiejar_Cookie,
- compat_cookies_SimpleCookie,
- compat_etree_fromstring,
- compat_expanduser,
- compat_getpass,
- compat_http_client,
- compat_os_name,
- compat_str,
- compat_urllib_error,
- compat_urllib_parse_unquote,
- compat_urllib_parse_urlencode,
- compat_urllib_request,
- compat_urlparse,
-)
+from ..compat import compat_etree_fromstring, compat_expanduser, compat_os_name
from ..downloader import FileDownloader
from ..downloader.f4m import get_base_url, remove_encrypted_media
from ..utils import (
if hasattr(e, 'countries'):
kwargs['countries'] = e.countries
raise type(e)(e.orig_msg, **kwargs)
- except compat_http_client.IncompleteRead as e:
+ except http.client.IncompleteRead as e:
raise ExtractorError('A network error has occurred.', cause=e, expected=True, video_id=self.get_temp_id(url))
except (KeyError, StopIteration) as e:
raise ExtractorError('An extractor error has occurred.', cause=e, video_id=self.get_temp_id(url))
@staticmethod
def __can_accept_status_code(err, expected_status):
- assert isinstance(err, compat_urllib_error.HTTPError)
+ assert isinstance(err, urllib.error.HTTPError)
if expected_status is None:
return False
elif callable(expected_status):
return err.code in variadic(expected_status)
def _create_request(self, url_or_request, data=None, headers={}, query={}):
- if isinstance(url_or_request, compat_urllib_request.Request):
+ if isinstance(url_or_request, urllib.request.Request):
return update_Request(url_or_request, data=data, headers=headers, query=query)
if query:
url_or_request = update_url_query(url_or_request, query)
try:
return self._downloader.urlopen(self._create_request(url_or_request, data, headers, query))
except network_exceptions as err:
- if isinstance(err, compat_urllib_error.HTTPError):
+ if isinstance(err, urllib.error.HTTPError):
if self.__can_accept_status_code(err, expected_status):
# Retain reference to error to prevent file object from
# being closed before it can be read. Works around the
Arguments:
url_or_request -- plain text URL as a string or
- a compat_urllib_request.Requestobject
+ a urllib.request.Request object
video_id -- Video/playlist/item identifier (string)
Keyword arguments:
"""
# Strip hashes from the URL (#1038)
- if isinstance(url_or_request, (compat_str, str)):
+ if isinstance(url_or_request, str):
url_or_request = url_or_request.partition('#')[0]
urlh = self._request_webpage(url_or_request, video_id, note, errnote, fatal, data=data, headers=headers, query=query, expected_status=expected_status)
while True:
try:
return self.__download_webpage(url_or_request, video_id, note, errnote, None, fatal, *args, **kwargs)
- except compat_http_client.IncompleteRead as e:
+ except http.client.IncompleteRead as e:
try_count += 1
if try_count >= tries:
raise e
if tfa is not None:
return tfa
- return compat_getpass('Type %s and press [Return]: ' % note)
+ return getpass.getpass('Type %s and press [Return]: ' % note)
# Helper functions for extracting OpenGraph info
@staticmethod
return {}
def _json_ld(self, json_ld, video_id, fatal=True, expected_type=None):
- if isinstance(json_ld, compat_str):
+ if isinstance(json_ld, str):
json_ld = self._parse_json(json_ld, video_id, fatal=fatal)
if not json_ld:
return {}
# both types can have 'name' property(inherited from 'Thing' type). [1]
# however some websites are using 'Text' type instead.
# 1. https://schema.org/VideoObject
- 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, compat_str) else None,
+ 'uploader': author.get('name') if isinstance(author, dict) else author if isinstance(author, str) else None,
'filesize': int_or_none(float_or_none(e.get('contentSize'))),
'tbr': int_or_none(e.get('bitrate')),
'width': int_or_none(e.get('width')),
]), m3u8_doc)
def format_url(url):
- return url if re.match(r'^https?://', url) else compat_urlparse.urljoin(m3u8_url, url)
+ return url if re.match(r'^https?://', url) else urllib.parse.urljoin(m3u8_url, url)
if self.get_param('hls_split_discontinuity', False):
def _extract_m3u8_playlist_indices(manifest_url=None, m3u8_doc=None):
})
continue
- src_url = src if src.startswith('http') else compat_urlparse.urljoin(base, src)
+ src_url = src if src.startswith('http') else urllib.parse.urljoin(base, src)
src_url = src_url.strip()
if proto == 'm3u8' or src_ext == 'm3u8':
'plugin': 'flowplayer-3.2.0.1',
}
f4m_url += '&' if '?' in f4m_url else '?'
- f4m_url += compat_urllib_parse_urlencode(f4m_params)
+ f4m_url += urllib.parse.urlencode(f4m_params)
formats.extend(self._extract_f4m_formats(f4m_url, video_id, f4m_id='hds', fatal=False))
elif src_ext == 'mpd':
formats.extend(self._extract_mpd_formats(
if re.match(r'^https?://', base_url):
break
if mpd_base_url and base_url.startswith('/'):
- base_url = compat_urlparse.urljoin(mpd_base_url, base_url)
+ base_url = urllib.parse.urljoin(mpd_base_url, base_url)
elif mpd_base_url and not re.match(r'^https?://', base_url):
if not mpd_base_url.endswith('/'):
mpd_base_url += '/'
sampling_rate = int_or_none(track.get('SamplingRate'))
track_url_pattern = re.sub(r'{[Bb]itrate}', track.attrib['Bitrate'], url_pattern)
- track_url_pattern = compat_urlparse.urljoin(ism_url, track_url_pattern)
+ track_url_pattern = urllib.parse.urljoin(ism_url, track_url_pattern)
fragments = []
fragment_ctx = {
fragment_ctx['duration'] = (next_fragment_time - fragment_ctx['time']) / fragment_repeat
for _ in range(fragment_repeat):
fragments.append({
- 'url': re.sub(r'{start[ _]time}', compat_str(fragment_ctx['time']), track_url_pattern),
+ 'url': re.sub(r'{start[ _]time}', str(fragment_ctx['time']), track_url_pattern),
'duration': fragment_ctx['duration'] / stream_timescale,
})
fragment_ctx['time'] += fragment_ctx['duration']
return formats, subtitles
def _extract_wowza_formats(self, url, video_id, m3u8_entry_protocol='m3u8_native', skip_protocols=[]):
- query = compat_urlparse.urlparse(url).query
+ query = urllib.parse.urlparse(url).query
url = re.sub(r'/(?:manifest|playlist|jwplayer)\.(?:m3u8|f4m|mpd|smil)', '', url)
mobj = re.search(
r'(?:(?:http|rtmp|rtsp)(?P<s>s)?:)?(?P<url>//[^?]+)', url)
if not isinstance(track, dict):
continue
track_kind = track.get('kind')
- if not track_kind or not isinstance(track_kind, compat_str):
+ if not track_kind or not isinstance(track_kind, str):
continue
if track_kind.lower() not in ('captions', 'subtitles'):
continue
# Often no height is provided but there is a label in
# format like "1080p", "720p SD", or 1080.
height = int_or_none(self._search_regex(
- r'^(\d{3,4})[pP]?(?:\b|$)', compat_str(source.get('label') or ''),
+ r'^(\d{3,4})[pP]?(?:\b|$)', str(source.get('label') or ''),
'height', default=None))
a_format = {
'url': source_url,
def _set_cookie(self, domain, name, value, expire_time=None, port=None,
path='/', secure=False, discard=False, rest={}, **kwargs):
- cookie = compat_cookiejar_Cookie(
+ cookie = http.cookiejar.Cookie(
0, name, value, port, port is not None, domain, True,
domain.startswith('.'), path, True, secure, expire_time,
discard, None, None, rest)
self.cookiejar.set_cookie(cookie)
def _get_cookies(self, url):
- """ Return a compat_cookies_SimpleCookie with the cookies for the url """
- return compat_cookies_SimpleCookie(self._downloader._calc_cookies(url))
+ """ Return a http.cookies.SimpleCookie with the cookies for the url """
+ return http.cookies.SimpleCookie(self._downloader._calc_cookies(url))
def _apply_first_set_cookie_header(self, url_handle, cookie):
"""
return headers
def _generic_id(self, url):
- return compat_urllib_parse_unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
+ return urllib.parse.unquote(os.path.splitext(url.rstrip('/').split('/')[-1])[0])
def _generic_title(self, url):
- return compat_urllib_parse_unquote(os.path.splitext(url_basename(url))[0])
+ return urllib.parse.unquote(os.path.splitext(url_basename(url))[0])
@staticmethod
def _availability(is_private=None, needs_premium=None, needs_subscription=None, needs_auth=None, is_unlisted=None):