]> jfr.im git - yt-dlp.git/blob - yt_dlp/downloader/hls.py
cee3807ceb963b21f03a48cac87c9d3694532eb4
[yt-dlp.git] / yt_dlp / downloader / hls.py
1 from __future__ import unicode_literals
2
3 import errno
4 import re
5 import io
6 import binascii
7 try:
8 from Crypto.Cipher import AES
9 can_decrypt_frag = True
10 except ImportError:
11 can_decrypt_frag = False
12 try:
13 import concurrent.futures
14 can_threaded_download = True
15 except ImportError:
16 can_threaded_download = False
17
18 from ..downloader import _get_real_downloader
19 from .fragment import FragmentFD
20 from .external import FFmpegFD
21
22 from ..compat import (
23 compat_urllib_error,
24 compat_urlparse,
25 compat_struct_pack,
26 )
27 from ..utils import (
28 parse_m3u8_attributes,
29 sanitize_open,
30 update_url_query,
31 bug_reports_message,
32 )
33 from .. import webvtt
34
35
36 class HlsFD(FragmentFD):
37 """
38 Download segments in a m3u8 manifest. External downloaders can take over
39 the fragment downloads by supporting the 'm3u8_frag_urls' protocol and
40 re-defining 'supports_manifest' function
41 """
42
43 FD_NAME = 'hlsnative'
44
45 @staticmethod
46 def can_download(manifest, info_dict, allow_unplayable_formats=False, with_crypto=can_decrypt_frag):
47 UNSUPPORTED_FEATURES = [
48 # r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [2]
49
50 # Live streams heuristic does not always work (e.g. geo restricted to Germany
51 # http://hls-geo.daserste.de/i/videoportal/Film/c_620000/622873/format,716451,716457,716450,716458,716459,.mp4.csmil/index_4_av.m3u8?null=0)
52 # r'#EXT-X-MEDIA-SEQUENCE:(?!0$)', # live streams [3]
53
54 # This heuristic also is not correct since segments may not be appended as well.
55 # Twitch vods of finished streams have EXT-X-PLAYLIST-TYPE:EVENT despite
56 # no segments will definitely be appended to the end of the playlist.
57 # r'#EXT-X-PLAYLIST-TYPE:EVENT', # media segments may be appended to the end of
58 # # event media playlists [4]
59 # r'#EXT-X-MAP:', # media initialization [5]
60 # 1. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.4
61 # 2. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.2
62 # 3. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.2
63 # 4. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.3.5
64 # 5. https://tools.ietf.org/html/draft-pantos-http-live-streaming-17#section-4.3.2.5
65 ]
66 if not allow_unplayable_formats:
67 UNSUPPORTED_FEATURES += [
68 r'#EXT-X-KEY:METHOD=(?!NONE|AES-128)', # encrypted streams [1]
69 ]
70
71 def check_results():
72 yield not info_dict.get('is_live')
73 is_aes128_enc = '#EXT-X-KEY:METHOD=AES-128' in manifest
74 yield with_crypto or not is_aes128_enc
75 yield not (is_aes128_enc and r'#EXT-X-BYTERANGE' in manifest)
76 for feature in UNSUPPORTED_FEATURES:
77 yield not re.search(feature, manifest)
78 return all(check_results())
79
80 def real_download(self, filename, info_dict):
81 man_url = info_dict['url']
82 self.to_screen('[%s] Downloading m3u8 manifest' % self.FD_NAME)
83
84 is_webvtt = info_dict['ext'] == 'vtt'
85
86 urlh = self.ydl.urlopen(self._prepare_url(info_dict, man_url))
87 man_url = urlh.geturl()
88 s = urlh.read().decode('utf-8', 'ignore')
89
90 if not self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')):
91 if info_dict.get('extra_param_to_segment_url') or info_dict.get('_decryption_key_url'):
92 self.report_error('pycryptodome not found. Please install')
93 return False
94 if self.can_download(s, info_dict, with_crypto=True):
95 self.report_warning('pycryptodome is needed to download this file natively')
96 fd = FFmpegFD(self.ydl, self.params)
97 self.report_warning(
98 '%s detected unsupported features; extraction will be delegated to %s' % (self.FD_NAME, fd.get_basename()))
99 # TODO: Make progress updates work without hooking twice
100 # for ph in self._progress_hooks:
101 # fd.add_progress_hook(ph)
102 return fd.real_download(filename, info_dict)
103
104 real_downloader = _get_real_downloader(info_dict, 'm3u8_frag_urls', self.params, None)
105 if real_downloader and not real_downloader.supports_manifest(s):
106 real_downloader = None
107 if real_downloader:
108 self.to_screen(
109 '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename()))
110
111 def is_ad_fragment_start(s):
112 return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=ad' in s
113 or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',ad'))
114
115 def is_ad_fragment_end(s):
116 return (s.startswith('#ANVATO-SEGMENT-INFO') and 'type=master' in s
117 or s.startswith('#UPLYNK-SEGMENT') and s.endswith(',segment'))
118
119 fragments = []
120
121 media_frags = 0
122 ad_frags = 0
123 ad_frag_next = False
124 for line in s.splitlines():
125 line = line.strip()
126 if not line:
127 continue
128 if line.startswith('#'):
129 if is_ad_fragment_start(line):
130 ad_frag_next = True
131 elif is_ad_fragment_end(line):
132 ad_frag_next = False
133 continue
134 if ad_frag_next:
135 ad_frags += 1
136 continue
137 media_frags += 1
138
139 ctx = {
140 'filename': filename,
141 'total_frags': media_frags,
142 'ad_frags': ad_frags,
143 }
144
145 if real_downloader:
146 self._prepare_external_frag_download(ctx)
147 else:
148 self._prepare_and_start_frag_download(ctx)
149
150 extra_state = ctx.setdefault('extra_state', {})
151
152 fragment_retries = self.params.get('fragment_retries', 0)
153 skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True)
154 test = self.params.get('test', False)
155
156 format_index = info_dict.get('format_index')
157 extra_query = None
158 extra_param_to_segment_url = info_dict.get('extra_param_to_segment_url')
159 if extra_param_to_segment_url:
160 extra_query = compat_urlparse.parse_qs(extra_param_to_segment_url)
161 i = 0
162 media_sequence = 0
163 decrypt_info = {'METHOD': 'NONE'}
164 byte_range = {}
165 discontinuity_count = 0
166 frag_index = 0
167 ad_frag_next = False
168 for line in s.splitlines():
169 line = line.strip()
170 if line:
171 if not line.startswith('#'):
172 if format_index and discontinuity_count != format_index:
173 continue
174 if ad_frag_next:
175 continue
176 frag_index += 1
177 if frag_index <= ctx['fragment_index']:
178 continue
179 frag_url = (
180 line
181 if re.match(r'^https?://', line)
182 else compat_urlparse.urljoin(man_url, line))
183 if extra_query:
184 frag_url = update_url_query(frag_url, extra_query)
185
186 fragments.append({
187 'frag_index': frag_index,
188 'url': frag_url,
189 'decrypt_info': decrypt_info,
190 'byte_range': byte_range,
191 'media_sequence': media_sequence,
192 })
193
194 elif line.startswith('#EXT-X-MAP'):
195 if format_index and discontinuity_count != format_index:
196 continue
197 if frag_index > 0:
198 self.report_error(
199 'Initialization fragment found after media fragments, unable to download')
200 return False
201 frag_index += 1
202 map_info = parse_m3u8_attributes(line[11:])
203 frag_url = (
204 map_info.get('URI')
205 if re.match(r'^https?://', map_info.get('URI'))
206 else compat_urlparse.urljoin(man_url, map_info.get('URI')))
207 if extra_query:
208 frag_url = update_url_query(frag_url, extra_query)
209
210 fragments.append({
211 'frag_index': frag_index,
212 'url': frag_url,
213 'decrypt_info': decrypt_info,
214 'byte_range': byte_range,
215 'media_sequence': media_sequence
216 })
217
218 if map_info.get('BYTERANGE'):
219 splitted_byte_range = map_info.get('BYTERANGE').split('@')
220 sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end']
221 byte_range = {
222 'start': sub_range_start,
223 'end': sub_range_start + int(splitted_byte_range[0]),
224 }
225
226 elif line.startswith('#EXT-X-KEY'):
227 decrypt_url = decrypt_info.get('URI')
228 decrypt_info = parse_m3u8_attributes(line[11:])
229 if decrypt_info['METHOD'] == 'AES-128':
230 if 'IV' in decrypt_info:
231 decrypt_info['IV'] = binascii.unhexlify(decrypt_info['IV'][2:].zfill(32))
232 if not re.match(r'^https?://', decrypt_info['URI']):
233 decrypt_info['URI'] = compat_urlparse.urljoin(
234 man_url, decrypt_info['URI'])
235 if extra_query:
236 decrypt_info['URI'] = update_url_query(decrypt_info['URI'], extra_query)
237 if decrypt_url != decrypt_info['URI']:
238 decrypt_info['KEY'] = None
239
240 elif line.startswith('#EXT-X-MEDIA-SEQUENCE'):
241 media_sequence = int(line[22:])
242 elif line.startswith('#EXT-X-BYTERANGE'):
243 splitted_byte_range = line[17:].split('@')
244 sub_range_start = int(splitted_byte_range[1]) if len(splitted_byte_range) == 2 else byte_range['end']
245 byte_range = {
246 'start': sub_range_start,
247 'end': sub_range_start + int(splitted_byte_range[0]),
248 }
249 elif is_ad_fragment_start(line):
250 ad_frag_next = True
251 elif is_ad_fragment_end(line):
252 ad_frag_next = False
253 elif line.startswith('#EXT-X-DISCONTINUITY'):
254 discontinuity_count += 1
255 i += 1
256 media_sequence += 1
257
258 # We only download the first fragment during the test
259 if test:
260 fragments = [fragments[0] if fragments else None]
261
262 if real_downloader:
263 info_copy = info_dict.copy()
264 info_copy['fragments'] = fragments
265 fd = real_downloader(self.ydl, self.params)
266 # TODO: Make progress updates work without hooking twice
267 # for ph in self._progress_hooks:
268 # fd.add_progress_hook(ph)
269 success = fd.real_download(filename, info_copy)
270 if not success:
271 return False
272 else:
273 def download_fragment(fragment):
274 frag_index = fragment['frag_index']
275 frag_url = fragment['url']
276 decrypt_info = fragment['decrypt_info']
277 byte_range = fragment['byte_range']
278 media_sequence = fragment['media_sequence']
279
280 ctx['fragment_index'] = frag_index
281
282 count = 0
283 headers = info_dict.get('http_headers', {})
284 if byte_range:
285 headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
286 while count <= fragment_retries:
287 try:
288 success, frag_content = self._download_fragment(
289 ctx, frag_url, info_dict, headers)
290 if not success:
291 return False, frag_index
292 break
293 except compat_urllib_error.HTTPError as err:
294 # Unavailable (possibly temporary) fragments may be served.
295 # First we try to retry then either skip or abort.
296 # See https://github.com/ytdl-org/youtube-dl/issues/10165,
297 # https://github.com/ytdl-org/youtube-dl/issues/10448).
298 count += 1
299 if count <= fragment_retries:
300 self.report_retry_fragment(err, frag_index, count, fragment_retries)
301 if count > fragment_retries:
302 self.report_error('Giving up after %s fragment retries' % fragment_retries)
303 return False, frag_index
304
305 if decrypt_info['METHOD'] == 'AES-128':
306 iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
307 decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
308 self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
309 # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
310 # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
311 # not what it decrypts to.
312 if not test:
313 frag_content = AES.new(
314 decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
315
316 return frag_content, frag_index
317
318 pack_fragment = lambda frag_content, _: frag_content
319
320 if is_webvtt:
321 def pack_fragment(frag_content, frag_index):
322 output = io.StringIO()
323 adjust = 0
324 for block in webvtt.parse_fragment(frag_content):
325 if isinstance(block, webvtt.CueBlock):
326 block.start += adjust
327 block.end += adjust
328 elif isinstance(block, webvtt.Magic):
329 # XXX: we do not handle MPEGTS overflow
330 if frag_index == 1:
331 extra_state['webvtt_mpegts'] = block.mpegts or 0
332 extra_state['webvtt_local'] = block.local or 0
333 # XXX: block.local = block.mpegts = None ?
334 else:
335 if block.mpegts is not None and block.local is not None:
336 adjust = (
337 (block.mpegts - extra_state.get('webvtt_mpegts', 0))
338 - (block.local - extra_state.get('webvtt_local', 0))
339 )
340 continue
341 elif isinstance(block, webvtt.HeaderBlock):
342 if frag_index != 1:
343 # XXX: this should probably be silent as well
344 # or verify that all segments contain the same data
345 self.report_warning(bug_reports_message(
346 'Discarding a %s block found in the middle of the stream; '
347 'if the subtitles display incorrectly,'
348 % (type(block).__name__)))
349 continue
350 block.write_into(output)
351
352 return output.getvalue().encode('utf-8')
353
354 def append_fragment(frag_content, frag_index):
355 if frag_content:
356 fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
357 try:
358 file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
359 ctx['fragment_filename_sanitized'] = frag_sanitized
360 file.close()
361 frag_content = pack_fragment(frag_content, frag_index)
362 self._append_fragment(ctx, frag_content)
363 return True
364 except EnvironmentError as ose:
365 if ose.errno != errno.ENOENT:
366 raise
367 # FileNotFoundError
368 if skip_unavailable_fragments:
369 self.report_skip_fragment(frag_index)
370 return True
371 else:
372 self.report_error(
373 'fragment %s not found, unable to continue' % frag_index)
374 return False
375 else:
376 if skip_unavailable_fragments:
377 self.report_skip_fragment(frag_index)
378 return True
379 else:
380 self.report_error(
381 'fragment %s not found, unable to continue' % frag_index)
382 return False
383
384 max_workers = self.params.get('concurrent_fragment_downloads', 1)
385 if can_threaded_download and max_workers > 1:
386 self.report_warning('The download speed shown is only of one thread. This is a known issue')
387 with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
388 futures = [pool.submit(download_fragment, fragment) for fragment in fragments]
389 # timeout must be 0 to return instantly
390 done, not_done = concurrent.futures.wait(futures, timeout=0)
391 try:
392 while not_done:
393 # Check every 1 second for KeyboardInterrupt
394 freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
395 done |= freshly_done
396 except KeyboardInterrupt:
397 for future in not_done:
398 future.cancel()
399 # timeout must be none to cancel
400 concurrent.futures.wait(not_done, timeout=None)
401 raise KeyboardInterrupt
402 results = [future.result() for future in futures]
403
404 for frag_content, frag_index in results:
405 result = append_fragment(frag_content, frag_index)
406 if not result:
407 return False
408 else:
409 for fragment in fragments:
410 frag_content, frag_index = download_fragment(fragment)
411 result = append_fragment(frag_content, frag_index)
412 if not result:
413 return False
414
415 self._finish_frag_download(ctx)
416 return True