+import contextlib
import errno
import os
import random
)
from ..utils import (
LockingUnsupportedError,
+ Namespace,
decodeArgument,
encodeFilename,
error_to_compat_str,
def __init__(self, ydl, params):
"""Create a FileDownloader object with the given options."""
- self.ydl = ydl
+ self._set_ydl(ydl)
self._progress_hooks = []
self.params = params
self._prepare_multiline_status()
self.add_progress_hook(self.report_progress)
+ def _set_ydl(self, ydl):
+ self.ydl = ydl
+
+ for func in (
+ 'deprecation_warning',
+ 'report_error',
+ 'report_file_already_downloaded',
+ 'report_warning',
+ 'to_console_title',
+ 'to_stderr',
+ 'trouble',
+ 'write_debug',
+ ):
+ setattr(self, func, getattr(ydl, func))
+
+ def to_screen(self, *args, **kargs):
+ self.ydl.to_screen(*args, quiet=self.params.get('quiet'), **kargs)
+
@staticmethod
def format_seconds(seconds):
time = timetuple_from_msec(seconds * 1000)
multiplier = 1024.0 ** 'bkmgtpezy'.index(matchobj.group(2).lower())
return int(round(number * multiplier))
- def to_screen(self, *args, **kargs):
- self.ydl.to_screen(*args, quiet=self.params.get('quiet'), **kargs)
-
- def to_stderr(self, message):
- self.ydl.to_stderr(message)
-
- def to_console_title(self, message):
- self.ydl.to_console_title(message)
-
- def trouble(self, *args, **kargs):
- self.ydl.trouble(*args, **kargs)
-
- def report_warning(self, *args, **kargs):
- self.ydl.report_warning(*args, **kargs)
-
- def report_error(self, *args, **kargs):
- self.ydl.report_error(*args, **kargs)
-
- def write_debug(self, *args, **kargs):
- self.ydl.write_debug(*args, **kargs)
-
def slow_down(self, start_time, now, byte_counter):
"""Sleep if the download speed is over the rate limit."""
rate_limit = self.params.get('ratelimit')
# Ignore obviously invalid dates
if filetime == 0:
return
- try:
+ with contextlib.suppress(Exception):
os.utime(filename, (time.time(), filetime))
- except Exception:
- pass
return filetime
def report_destination(self, filename):
def _finish_multiline_status(self):
self._multiline.end()
- _progress_styles = {
- 'downloaded_bytes': 'light blue',
- 'percent': 'light blue',
- 'eta': 'yellow',
- 'speed': 'green',
- 'elapsed': 'bold white',
- 'total_bytes': '',
- 'total_bytes_estimate': '',
- }
+ ProgressStyles = Namespace(
+ downloaded_bytes='light blue',
+ percent='light blue',
+ eta='yellow',
+ speed='green',
+ elapsed='bold white',
+ total_bytes='',
+ total_bytes_estimate='',
+ )
def _report_progress_status(self, s, default_template):
- for name, style in self._progress_styles.items():
+ for name, style in self.ProgressStyles._asdict().items():
name = f'_{name}_str'
if name not in s:
continue
'[download] Got server HTTP error: %s. Retrying (attempt %d of %s) ...'
% (error_to_compat_str(err), count, self.format_retries(retries)))
- def report_file_already_downloaded(self, *args, **kwargs):
- """Report file has already been fully downloaded."""
- return self.ydl.report_file_already_downloaded(*args, **kwargs)
-
def report_unable_to_resume(self):
"""Report it was impossible to resume download."""
self.to_screen('[download] Unable to resume')
self._finish_multiline_status()
return True, False
- if subtitle is False:
- min_sleep_interval = self.params.get('sleep_interval')
- if min_sleep_interval:
- max_sleep_interval = self.params.get('max_sleep_interval', min_sleep_interval)
- sleep_interval = random.uniform(min_sleep_interval, max_sleep_interval)
- self.to_screen(
- '[download] Sleeping %s seconds ...' % (
- int(sleep_interval) if sleep_interval.is_integer()
- else '%.2f' % sleep_interval))
- time.sleep(sleep_interval)
+ if subtitle:
+ sleep_interval = self.params.get('sleep_interval_subtitles') or 0
else:
- sleep_interval_sub = 0
- if type(self.params.get('sleep_interval_subtitles')) is int:
- sleep_interval_sub = self.params.get('sleep_interval_subtitles')
- if sleep_interval_sub > 0:
- self.to_screen(
- '[download] Sleeping %s seconds ...' % (
- sleep_interval_sub))
- time.sleep(sleep_interval_sub)
+ min_sleep_interval = self.params.get('sleep_interval') or 0
+ sleep_interval = random.uniform(
+ min_sleep_interval, self.params.get('max_sleep_interval', min_sleep_interval))
+ if sleep_interval > 0:
+ self.to_screen(f'[download] Sleeping {sleep_interval:.2f} seconds ...')
+ time.sleep(sleep_interval)
+
ret = self.real_download(filename, info_dict)
self._finish_multiline_status()
return ret, True