]>
Commit | Line | Data |
---|---|---|
1 | from __future__ import unicode_literals | |
2 | ||
3 | try: | |
4 | import concurrent.futures | |
5 | can_threaded_download = True | |
6 | except ImportError: | |
7 | can_threaded_download = False | |
8 | ||
9 | from ..downloader import _get_real_downloader | |
10 | from .fragment import FragmentFD | |
11 | ||
12 | from ..compat import compat_urllib_error | |
13 | from ..utils import ( | |
14 | DownloadError, | |
15 | sanitize_open, | |
16 | urljoin, | |
17 | ) | |
18 | ||
19 | ||
20 | class DashSegmentsFD(FragmentFD): | |
21 | """ | |
22 | Download segments in a DASH manifest. External downloaders can take over | |
23 | the fragment downloads by supporting the 'dash_frag_urls' protocol | |
24 | """ | |
25 | ||
26 | FD_NAME = 'dashsegments' | |
27 | ||
28 | def real_download(self, filename, info_dict): | |
29 | fragment_base_url = info_dict.get('fragment_base_url') | |
30 | fragments = info_dict['fragments'][:1] if self.params.get( | |
31 | 'test', False) else info_dict['fragments'] | |
32 | ||
33 | real_downloader = _get_real_downloader(info_dict, 'dash_frag_urls', self.params, None) | |
34 | ||
35 | ctx = { | |
36 | 'filename': filename, | |
37 | 'total_frags': len(fragments), | |
38 | } | |
39 | ||
40 | if real_downloader: | |
41 | self._prepare_external_frag_download(ctx) | |
42 | else: | |
43 | self._prepare_and_start_frag_download(ctx) | |
44 | ||
45 | fragment_retries = self.params.get('fragment_retries', 0) | |
46 | skip_unavailable_fragments = self.params.get('skip_unavailable_fragments', True) | |
47 | ||
48 | fragments_to_download = [] | |
49 | frag_index = 0 | |
50 | for i, fragment in enumerate(fragments): | |
51 | frag_index += 1 | |
52 | if frag_index <= ctx['fragment_index']: | |
53 | continue | |
54 | fragment_url = fragment.get('url') | |
55 | if not fragment_url: | |
56 | assert fragment_base_url | |
57 | fragment_url = urljoin(fragment_base_url, fragment['path']) | |
58 | ||
59 | fragments_to_download.append({ | |
60 | 'frag_index': frag_index, | |
61 | 'index': i, | |
62 | 'url': fragment_url, | |
63 | }) | |
64 | ||
65 | if real_downloader: | |
66 | self.to_screen( | |
67 | '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename())) | |
68 | info_copy = info_dict.copy() | |
69 | info_copy['fragments'] = fragments_to_download | |
70 | fd = real_downloader(self.ydl, self.params) | |
71 | # TODO: Make progress updates work without hooking twice | |
72 | # for ph in self._progress_hooks: | |
73 | # fd.add_progress_hook(ph) | |
74 | success = fd.real_download(filename, info_copy) | |
75 | if not success: | |
76 | return False | |
77 | else: | |
78 | def download_fragment(fragment): | |
79 | i = fragment['index'] | |
80 | frag_index = fragment['frag_index'] | |
81 | fragment_url = fragment['url'] | |
82 | ||
83 | ctx['fragment_index'] = frag_index | |
84 | ||
85 | # In DASH, the first segment contains necessary headers to | |
86 | # generate a valid MP4 file, so always abort for the first segment | |
87 | fatal = i == 0 or not skip_unavailable_fragments | |
88 | count = 0 | |
89 | while count <= fragment_retries: | |
90 | try: | |
91 | success, frag_content = self._download_fragment(ctx, fragment_url, info_dict) | |
92 | if not success: | |
93 | return False, frag_index | |
94 | break | |
95 | except compat_urllib_error.HTTPError as err: | |
96 | # YouTube may often return 404 HTTP error for a fragment causing the | |
97 | # whole download to fail. However if the same fragment is immediately | |
98 | # retried with the same request data this usually succeeds (1-2 attempts | |
99 | # is usually enough) thus allowing to download the whole file successfully. | |
100 | # To be future-proof we will retry all fragments that fail with any | |
101 | # HTTP error. | |
102 | count += 1 | |
103 | if count <= fragment_retries: | |
104 | self.report_retry_fragment(err, frag_index, count, fragment_retries) | |
105 | except DownloadError: | |
106 | # Don't retry fragment if error occurred during HTTP downloading | |
107 | # itself since it has own retry settings | |
108 | if not fatal: | |
109 | break | |
110 | raise | |
111 | ||
112 | if count > fragment_retries: | |
113 | if not fatal: | |
114 | return False, frag_index | |
115 | self.report_error('Giving up after %s fragment retries' % fragment_retries) | |
116 | return False, frag_index | |
117 | ||
118 | return frag_content, frag_index | |
119 | ||
120 | def append_fragment(frag_content, frag_index): | |
121 | if frag_content: | |
122 | fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index) | |
123 | try: | |
124 | file, frag_sanitized = sanitize_open(fragment_filename, 'rb') | |
125 | ctx['fragment_filename_sanitized'] = frag_sanitized | |
126 | file.close() | |
127 | self._append_fragment(ctx, frag_content) | |
128 | return True | |
129 | except FileNotFoundError: | |
130 | if skip_unavailable_fragments: | |
131 | self.report_skip_fragment(frag_index) | |
132 | return True | |
133 | else: | |
134 | self.report_error( | |
135 | 'fragment %s not found, unable to continue' % frag_index) | |
136 | return False | |
137 | else: | |
138 | if skip_unavailable_fragments: | |
139 | self.report_skip_fragment(frag_index) | |
140 | return True | |
141 | else: | |
142 | self.report_error( | |
143 | 'fragment %s not found, unable to continue' % frag_index) | |
144 | return False | |
145 | ||
146 | max_workers = self.params.get('concurrent_fragment_downloads', 1) | |
147 | if can_threaded_download and max_workers > 1: | |
148 | self.report_warning('The download speed shown is only of one thread. This is a known issue') | |
149 | with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: | |
150 | futures = [pool.submit(download_fragment, fragment) for fragment in fragments_to_download] | |
151 | # timeout must be 0 to return instantly | |
152 | done, not_done = concurrent.futures.wait(futures, timeout=0) | |
153 | try: | |
154 | while not_done: | |
155 | # Check every 1 second for KeyboardInterrupt | |
156 | freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1) | |
157 | done |= freshly_done | |
158 | except KeyboardInterrupt: | |
159 | for future in not_done: | |
160 | future.cancel() | |
161 | # timeout must be none to cancel | |
162 | concurrent.futures.wait(not_done, timeout=None) | |
163 | raise KeyboardInterrupt | |
164 | results = [future.result() for future in futures] | |
165 | ||
166 | for frag_content, frag_index in results: | |
167 | result = append_fragment(frag_content, frag_index) | |
168 | if not result: | |
169 | return False | |
170 | else: | |
171 | for fragment in fragments_to_download: | |
172 | frag_content, frag_index = download_fragment(fragment) | |
173 | result = append_fragment(frag_content, frag_index) | |
174 | if not result: | |
175 | return False | |
176 | ||
177 | self._finish_frag_download(ctx) | |
178 | return True |