1 from __future__
import unicode_literals
4 import concurrent
.futures
5 can_threaded_download
= True
7 can_threaded_download
= False
9 from ..downloader
import _get_real_downloader
10 from .fragment
import FragmentFD
12 from ..compat
import compat_urllib_error
20 class DashSegmentsFD(FragmentFD
):
22 Download segments in a DASH manifest. External downloaders can take over
23 the fragment downloads by supporting the 'dash_frag_urls' protocol
26 FD_NAME
= 'dashsegments'
28 def real_download(self
, filename
, info_dict
):
29 fragment_base_url
= info_dict
.get('fragment_base_url')
30 fragments
= info_dict
['fragments'][:1] if self
.params
.get(
31 'test', False) else info_dict
['fragments']
33 real_downloader
= _get_real_downloader(info_dict
, 'dash_frag_urls', self
.params
, None)
37 'total_frags': len(fragments
),
41 self
._prepare
_external
_frag
_download
(ctx
)
43 self
._prepare
_and
_start
_frag
_download
(ctx
)
45 fragment_retries
= self
.params
.get('fragment_retries', 0)
46 skip_unavailable_fragments
= self
.params
.get('skip_unavailable_fragments', True)
48 fragments_to_download
= []
50 for i
, fragment
in enumerate(fragments
):
52 if frag_index
<= ctx
['fragment_index']:
54 fragment_url
= fragment
.get('url')
56 assert fragment_base_url
57 fragment_url
= urljoin(fragment_base_url
, fragment
['path'])
59 fragments_to_download
.append({
60 'frag_index': frag_index
,
67 '[%s] Fragment downloads will be delegated to %s' % (self
.FD_NAME
, real_downloader
.get_basename()))
68 info_copy
= info_dict
.copy()
69 info_copy
['fragments'] = fragments_to_download
70 fd
= real_downloader(self
.ydl
, self
.params
)
71 # TODO: Make progress updates work without hooking twice
72 # for ph in self._progress_hooks:
73 # fd.add_progress_hook(ph)
74 success
= fd
.real_download(filename
, info_copy
)
78 def download_fragment(fragment
):
80 frag_index
= fragment
['frag_index']
81 fragment_url
= fragment
['url']
83 ctx
['fragment_index'] = frag_index
85 # In DASH, the first segment contains necessary headers to
86 # generate a valid MP4 file, so always abort for the first segment
87 fatal
= i
== 0 or not skip_unavailable_fragments
89 while count
<= fragment_retries
:
91 success
, frag_content
= self
._download
_fragment
(ctx
, fragment_url
, info_dict
)
93 return False, frag_index
95 except compat_urllib_error
.HTTPError
as err
:
96 # YouTube may often return 404 HTTP error for a fragment causing the
97 # whole download to fail. However if the same fragment is immediately
98 # retried with the same request data this usually succeeds (1-2 attempts
99 # is usually enough) thus allowing to download the whole file successfully.
100 # To be future-proof we will retry all fragments that fail with any
103 if count
<= fragment_retries
:
104 self
.report_retry_fragment(err
, frag_index
, count
, fragment_retries
)
105 except DownloadError
:
106 # Don't retry fragment if error occurred during HTTP downloading
107 # itself since it has own retry settings
112 if count
> fragment_retries
:
114 return False, frag_index
115 self
.report_error('Giving up after %s fragment retries' % fragment_retries
)
116 return False, frag_index
118 return frag_content
, frag_index
120 def append_fragment(frag_content
, frag_index
):
122 fragment_filename
= '%s-Frag%d' % (ctx
['tmpfilename'], frag_index
)
124 file, frag_sanitized
= sanitize_open(fragment_filename
, 'rb')
125 ctx
['fragment_filename_sanitized'] = frag_sanitized
127 self
._append
_fragment
(ctx
, frag_content
)
129 except FileNotFoundError
:
130 if skip_unavailable_fragments
:
131 self
.report_skip_fragment(frag_index
)
135 'fragment %s not found, unable to continue' % frag_index
)
138 if skip_unavailable_fragments
:
139 self
.report_skip_fragment(frag_index
)
143 'fragment %s not found, unable to continue' % frag_index
)
146 max_workers
= self
.params
.get('concurrent_fragment_downloads', 1)
147 if can_threaded_download
and max_workers
> 1:
148 self
.report_warning('The download speed shown is only of one thread. This is a known issue')
149 with concurrent
.futures
.ThreadPoolExecutor(max_workers
) as pool
:
150 futures
= [pool
.submit(download_fragment
, fragment
) for fragment
in fragments_to_download
]
151 # timeout must be 0 to return instantly
152 done
, not_done
= concurrent
.futures
.wait(futures
, timeout
=0)
155 # Check every 1 second for KeyboardInterrupt
156 freshly_done
, not_done
= concurrent
.futures
.wait(not_done
, timeout
=1)
158 except KeyboardInterrupt:
159 for future
in not_done
:
161 # timeout must be none to cancel
162 concurrent
.futures
.wait(not_done
, timeout
=None)
163 raise KeyboardInterrupt
164 results
= [future
.result() for future
in futures
]
166 for frag_content
, frag_index
in results
:
167 result
= append_fragment(frag_content
, frag_index
)
171 for fragment
in fragments_to_download
:
172 frag_content
, frag_index
= download_fragment(fragment
)
173 result
= append_fragment(frag_content
, frag_index
)
177 self
._finish
_frag
_download
(ctx
)