1 from __future__
import unicode_literals
5 import concurrent
.futures
6 can_threaded_download
= True
8 can_threaded_download
= False
10 from ..downloader
import _get_real_downloader
11 from .fragment
import FragmentFD
13 from ..compat
import compat_urllib_error
21 class DashSegmentsFD(FragmentFD
):
23 Download segments in a DASH manifest. External downloaders can take over
24 the fragment downloads by supporting the 'dash_frag_urls' protocol
27 FD_NAME
= 'dashsegments'
29 def real_download(self
, filename
, info_dict
):
30 fragment_base_url
= info_dict
.get('fragment_base_url')
31 fragments
= info_dict
['fragments'][:1] if self
.params
.get(
32 'test', False) else info_dict
['fragments']
34 real_downloader
= _get_real_downloader(info_dict
, 'dash_frag_urls', self
.params
, None)
38 'total_frags': len(fragments
),
42 self
._prepare
_external
_frag
_download
(ctx
)
44 self
._prepare
_and
_start
_frag
_download
(ctx
)
46 fragment_retries
= self
.params
.get('fragment_retries', 0)
47 skip_unavailable_fragments
= self
.params
.get('skip_unavailable_fragments', True)
49 fragments_to_download
= []
51 for i
, fragment
in enumerate(fragments
):
53 if frag_index
<= ctx
['fragment_index']:
55 fragment_url
= fragment
.get('url')
57 assert fragment_base_url
58 fragment_url
= urljoin(fragment_base_url
, fragment
['path'])
60 fragments_to_download
.append({
61 'frag_index': frag_index
,
68 '[%s] Fragment downloads will be delegated to %s' % (self
.FD_NAME
, real_downloader
.get_basename()))
69 info_copy
= info_dict
.copy()
70 info_copy
['fragments'] = fragments_to_download
71 fd
= real_downloader(self
.ydl
, self
.params
)
72 # TODO: Make progress updates work without hooking twice
73 # for ph in self._progress_hooks:
74 # fd.add_progress_hook(ph)
75 success
= fd
.real_download(filename
, info_copy
)
79 def download_fragment(fragment
):
81 frag_index
= fragment
['frag_index']
82 fragment_url
= fragment
['url']
84 ctx
['fragment_index'] = frag_index
86 # In DASH, the first segment contains necessary headers to
87 # generate a valid MP4 file, so always abort for the first segment
88 fatal
= i
== 0 or not skip_unavailable_fragments
90 while count
<= fragment_retries
:
92 success
, frag_content
= self
._download
_fragment
(ctx
, fragment_url
, info_dict
)
94 return False, frag_index
96 except compat_urllib_error
.HTTPError
as err
:
97 # YouTube may often return 404 HTTP error for a fragment causing the
98 # whole download to fail. However if the same fragment is immediately
99 # retried with the same request data this usually succeeds (1-2 attempts
100 # is usually enough) thus allowing to download the whole file successfully.
101 # To be future-proof we will retry all fragments that fail with any
104 if count
<= fragment_retries
:
105 self
.report_retry_fragment(err
, frag_index
, count
, fragment_retries
)
106 except DownloadError
:
107 # Don't retry fragment if error occurred during HTTP downloading
108 # itself since it has own retry settings
113 if count
> fragment_retries
:
115 return False, frag_index
116 ctx
['dest_stream'].close()
117 self
.report_error('Giving up after %s fragment retries' % fragment_retries
)
118 return False, frag_index
120 return frag_content
, frag_index
122 def append_fragment(frag_content
, frag_index
):
124 fragment_filename
= '%s-Frag%d' % (ctx
['tmpfilename'], frag_index
)
126 file, frag_sanitized
= sanitize_open(fragment_filename
, 'rb')
127 ctx
['fragment_filename_sanitized'] = frag_sanitized
129 self
._append
_fragment
(ctx
, frag_content
)
131 except EnvironmentError as ose
:
132 if ose
.errno
!= errno
.ENOENT
:
135 if skip_unavailable_fragments
:
136 self
.report_skip_fragment(frag_index
)
139 ctx
['dest_stream'].close()
141 'fragment %s not found, unable to continue' % frag_index
)
144 if skip_unavailable_fragments
:
145 self
.report_skip_fragment(frag_index
)
148 ctx
['dest_stream'].close()
150 'fragment %s not found, unable to continue' % frag_index
)
153 max_workers
= self
.params
.get('concurrent_fragment_downloads', 1)
154 if can_threaded_download
and max_workers
> 1:
155 self
.report_warning('The download speed shown is only of one thread. This is a known issue')
156 with concurrent
.futures
.ThreadPoolExecutor(max_workers
) as pool
:
157 futures
= [pool
.submit(download_fragment
, fragment
) for fragment
in fragments_to_download
]
158 # timeout must be 0 to return instantly
159 done
, not_done
= concurrent
.futures
.wait(futures
, timeout
=0)
162 # Check every 1 second for KeyboardInterrupt
163 freshly_done
, not_done
= concurrent
.futures
.wait(not_done
, timeout
=1)
165 except KeyboardInterrupt:
166 for future
in not_done
:
168 # timeout must be none to cancel
169 concurrent
.futures
.wait(not_done
, timeout
=None)
170 raise KeyboardInterrupt
171 results
= [future
.result() for future
in futures
]
173 for frag_content
, frag_index
in results
:
174 result
= append_fragment(frag_content
, frag_index
)
178 for fragment
in fragments_to_download
:
179 frag_content
, frag_index
= download_fragment(fragment
)
180 result
= append_fragment(frag_content
, frag_index
)
184 self
._finish
_frag
_download
(ctx
)