import errno
import functools
import gzip
+import imp
import io
import itertools
import json
compat_html_entities_html5,
compat_http_client,
compat_integer_types,
+ compat_numeric_types,
compat_kwargs,
compat_os_name,
compat_parse_qs,
if ytdl_is_updateable():
update_cmd = 'type youtube-dlc -U to update'
else:
- update_cmd = 'see https://github.com/pukkandan/yt-dlc on how to update'
- msg = '; please report this issue on https://github.com/pukkandan/yt-dlc .'
+ update_cmd = 'see https://github.com/pukkandan/yt-dlp on how to update'
+ msg = '; please report this issue on https://github.com/pukkandan/yt-dlp .'
msg += ' Make sure you are using the latest version; %s.' % update_cmd
msg += ' Be sure to call youtube-dlc with the --verbose flag and include its complete output.'
return msg
self.msg = msg
+class ExistingVideoReached(YoutubeDLError):
+ """ --max-downloads limit has been reached. """
+ pass
+
+
+class RejectedVideoReached(YoutubeDLError):
+ """ --max-downloads limit has been reached. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
+def strftime_or_none(timestamp, date_format, default=None):
+ datetime_object = None
+ try:
+ if isinstance(timestamp, compat_numeric_types): # unix timestamp
+ datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+ elif isinstance(timestamp, compat_str): # assume YYYYMMDD
+ datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+ return datetime_object.strftime(date_format)
+ except (ValueError, TypeError, AttributeError):
+ return default
+
+
def parse_duration(s):
if not isinstance(s, compat_basestring):
return None
r'\g<callback_data>', code)
-def js_to_json(code):
+def js_to_json(code, vars={}):
+ # vars is a dict of var, val pairs to substitute
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
i = int(im.group(1), base)
return '"%d":' % i if v.endswith(':') else '%d' % i
+ if v in vars:
+ return vars[v]
+
return '"%s"' % v
return re.sub(r'''(?sx)
return [command_option] if param == expected_value else []
-def cli_configuration_args(params, param, default=[]):
- ex_args = params.get(param)
- if ex_args is None:
- return default
- assert isinstance(ex_args, list)
- return ex_args
+def cli_configuration_args(params, arg_name, key, default=[], exe=None): # returns arg, for_compat
+ argdict = params.get(arg_name, {})
+ if isinstance(argdict, (list, tuple)): # for backward compatibility
+ return argdict, True
+
+ if argdict is None:
+ return default, False
+ assert isinstance(argdict, dict)
+
+ assert isinstance(key, compat_str)
+ key = key.lower()
+
+ args = exe_args = None
+ if exe is not None:
+ assert isinstance(exe, compat_str)
+ exe = exe.lower()
+ args = argdict.get('%s+%s' % (key, exe))
+ if args is None:
+ exe_args = argdict.get(exe)
+
+ if args is None:
+ args = argdict.get(key) if key != exe else None
+ if args is None and exe_args is None:
+ args = argdict.get('default', default)
+
+ args, exe_args = args or [], exe_args or []
+ assert isinstance(args, (list, tuple))
+ assert isinstance(exe_args, (list, tuple))
+ return args + exe_args, False
class ISO639Utils(object):
st\.fm # https://podsights.com/docs/
)/e
)/''', '', url)
+
+
+_HEX_TABLE = '0123456789abcdef'
+
+
+def random_uuidv4():
+ return re.sub(r'[xy]', lambda x: _HEX_TABLE[random.randint(0, 15)], 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx')
+
+
+def make_dir(path, to_screen=None):
+ try:
+ dn = os.path.dirname(path)
+ if dn and not os.path.exists(dn):
+ os.makedirs(dn)
+ return True
+ except (OSError, IOError) as err:
+ if callable(to_screen) is not None:
+ to_screen('unable to create directory ' + error_to_compat_str(err))
+ return False
+
+
+def get_executable_path():
+ path = os.path.dirname(sys.argv[0])
+ if os.path.abspath(sys.argv[0]) != os.path.abspath(sys.executable): # Not packaged
+ path = os.path.join(path, '..')
+ return os.path.abspath(path)
+
+
+def load_plugins(name, type, namespace):
+ plugin_info = [None]
+ classes = []
+ try:
+ plugin_info = imp.find_module(
+ name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
+ plugins = imp.load_module(name, *plugin_info)
+ for name in dir(plugins):
+ if not name.endswith(type):
+ continue
+ klass = getattr(plugins, name)
+ classes.append(klass)
+ namespace[name] = klass
+ except ImportError:
+ pass
+ finally:
+ if plugin_info[0] is not None:
+ plugin_info[0].close()
+ return classes
+
+
+def traverse_dict(dictn, keys, casesense=True):
+ if not isinstance(dictn, dict):
+ return None
+ first_key = keys[0]
+ if not casesense:
+ dictn = {key.lower(): val for key, val in dictn.items()}
+ first_key = first_key.lower()
+ value = dictn.get(first_key, None)
+ return value if len(keys) < 2 else traverse_dict(value, keys[1:], casesense)