import errno
import functools
import gzip
+import imp
import io
import itertools
import json
compat_html_entities_html5,
compat_http_client,
compat_integer_types,
+ compat_numeric_types,
compat_kwargs,
compat_os_name,
compat_parse_qs,
r'&([^&;]+;)', lambda m: _htmlentity_transform(m.group(1)), s)
+def process_communicate_or_kill(p, *args, **kwargs):
+ try:
+ return p.communicate(*args, **kwargs)
+ except BaseException: # Including KeyboardInterrupt
+ p.kill()
+ p.wait()
+ raise
+
+
def get_subprocess_encoding():
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
# For subprocess calls, encode with locale encoding
return optval
-def formatSeconds(secs):
+def formatSeconds(secs, delim=':'):
if secs > 3600:
- return '%d:%02d:%02d' % (secs // 3600, (secs % 3600) // 60, secs % 60)
+ return '%d%s%02d%s%02d' % (secs // 3600, delim, (secs % 3600) // 60, delim, secs % 60)
elif secs > 60:
- return '%d:%02d' % (secs // 60, secs % 60)
+ return '%d%s%02d' % (secs // 60, delim, secs % 60)
else:
return '%d' % secs
if ytdl_is_updateable():
update_cmd = 'type youtube-dlc -U to update'
else:
- update_cmd = 'see https://github.com/blackjack4494/yt-dlc on how to update'
- msg = '; please report this issue on https://github.com/blackjack4494/yt-dlc .'
+ update_cmd = 'see https://github.com/pukkandan/yt-dlp on how to update'
+ msg = '; please report this issue on https://github.com/pukkandan/yt-dlp .'
msg += ' Make sure you are using the latest version; %s.' % update_cmd
msg += ' Be sure to call youtube-dlc with the --verbose flag and include its complete output.'
return msg
self.msg = msg
+class ExistingVideoReached(YoutubeDLError):
+ """ --max-downloads limit has been reached. """
+ pass
+
+
+class RejectedVideoReached(YoutubeDLError):
+ """ --max-downloads limit has been reached. """
+ pass
+
+
class MaxDownloadsReached(YoutubeDLError):
""" --max-downloads limit has been reached. """
pass
if not url or not isinstance(url, compat_str):
return None
url = url.strip()
- return url if re.match(r'^(?:[a-zA-Z][\da-zA-Z.+-]*:)?//', url) else None
+ return url if re.match(r'^(?:(?:https?|rt(?:m(?:pt?[es]?|fp)|sp[su]?)|mms|ftps?):)?//', url) else None
+
+
+def strftime_or_none(timestamp, date_format, default=None):
+ datetime_object = None
+ try:
+ if isinstance(timestamp, compat_numeric_types): # unix timestamp
+ datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+ elif isinstance(timestamp, compat_str): # assume YYYYMMDD
+ datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+ return datetime_object.strftime(date_format)
+ except (ValueError, TypeError, AttributeError):
+ return default
def parse_duration(s):
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
try:
- subprocess.Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+ process_communicate_or_kill(subprocess.Popen(
+ [exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE))
except OSError:
return False
return exe
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if youtube-dlc is run in the background.
# See https://github.com/ytdl-org/youtube-dl/issues/955#issuecomment-209789656
- out, _ = subprocess.Popen(
+ out, _ = process_communicate_or_kill(subprocess.Popen(
[encodeArgument(exe)] + args,
stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
except OSError:
return False
if isinstance(out, bytes): # Python 2.x
def fixup(url):
if not isinstance(url, compat_str):
url = url.decode('utf-8', 'replace')
- BOM_UTF8 = '\xef\xbb\xbf'
- if url.startswith(BOM_UTF8):
- url = url[len(BOM_UTF8):]
- url = url.strip()
- if url.startswith(('#', ';', ']')):
+ BOM_UTF8 = ('\xef\xbb\xbf', '\ufeff')
+ for bom in BOM_UTF8:
+ if url.startswith(bom):
+ url = url[len(bom):]
+ url = url.lstrip()
+ if not url or url.startswith(('#', ';', ']')):
return False
- return url
+ # "#" cannot be stripped out since it is part of the URI
+ # However, it can be safely stipped out if follwing a whitespace
+ return re.split(r'\s#', url, 1)[0].rstrip()
with contextlib.closing(batch_fd) as fd:
return [url for url in map(fixup, fd) if url]
r'\g<callback_data>', code)
-def js_to_json(code):
+def js_to_json(code, vars={}):
+ # vars is a dict of var, val pairs to substitute
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*'
SKIP_RE = r'\s*(?:{comment})?\s*'.format(comment=COMMENT_RE)
INTEGER_TABLE = (
i = int(im.group(1), base)
return '"%d":' % i if v.endswith(':') else '%d' % i
+ if v in vars:
+ return vars[v]
+
return '"%s"' % v
return re.sub(r'''(?sx)
return q
-DEFAULT_OUTTMPL = '%(title)s-%(id)s.%(ext)s'
+DEFAULT_OUTTMPL = {
+ 'default': '%(title)s [%(id)s].%(ext)s',
+}
+OUTTMPL_TYPES = {
+ 'subtitle': None,
+ 'thumbnail': None,
+ 'description': 'description',
+ 'annotation': 'annotations.xml',
+ 'infojson': 'info.json',
+ 'pl_description': 'description',
+ 'pl_infojson': 'info.json',
+}
def limit_length(s, length):
def ytdl_is_updateable():
""" Returns if youtube-dlc can be updated with -U """
+ return False
+
from zipimport import zipimporter
return isinstance(globals().get('__loader__'), zipimporter) or hasattr(sys, 'frozen')
return [command_option] if param == expected_value else []
-def cli_configuration_args(params, param, default=[]):
- ex_args = params.get(param)
- if ex_args is None:
- return default
- assert isinstance(ex_args, list)
- return ex_args
+def cli_configuration_args(params, arg_name, key, default=[], exe=None): # returns arg, for_compat
+ argdict = params.get(arg_name, {})
+ if isinstance(argdict, (list, tuple)): # for backward compatibility
+ return argdict, True
+
+ if argdict is None:
+ return default, False
+ assert isinstance(argdict, dict)
+
+ assert isinstance(key, compat_str)
+ key = key.lower()
+
+ args = exe_args = None
+ if exe is not None:
+ assert isinstance(exe, compat_str)
+ exe = exe.lower()
+ args = argdict.get('%s+%s' % (key, exe))
+ if args is None:
+ exe_args = argdict.get(exe)
+
+ if args is None:
+ args = argdict.get(key) if key != exe else None
+ if args is None and exe_args is None:
+ args = argdict.get('default', default)
+
+ args, exe_args = args or [], exe_args or []
+ assert isinstance(args, (list, tuple))
+ assert isinstance(exe_args, (list, tuple))
+ return args + exe_args, False
class ISO639Utils(object):
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
except EnvironmentError as e:
raise XAttrMetadataError(e.errno, e.strerror)
- stdout, stderr = p.communicate()
+ stdout, stderr = process_communicate_or_kill(p)
stderr = stderr.decode('utf-8', 'replace')
if p.returncode != 0:
raise XAttrMetadataError(p.returncode, stderr)
day_field: str(random_date.day),
}
+
# Templates for internet shortcut files, which are plain text files.
DOT_URL_LINK_TEMPLATE = '''
[InternetShortcut]
return path
+
def format_field(obj, field, template='%s', ignore=(None, ''), default='', func=None):
val = obj.get(field, default)
if func and val not in ignore:
val = func(val)
return template % val if val not in ignore else default
+
+
+def clean_podcast_url(url):
+ return re.sub(r'''(?x)
+ (?:
+ (?:
+ chtbl\.com/track|
+ media\.blubrry\.com| # https://create.blubrry.com/resources/podcast-media-download-statistics/getting-started/
+ play\.podtrac\.com
+ )/[^/]+|
+ (?:dts|www)\.podtrac\.com/(?:pts/)?redirect\.[0-9a-z]{3,4}| # http://analytics.podtrac.com/how-to-measure
+ flex\.acast\.com|
+ pd(?:
+ cn\.co| # https://podcorn.com/analytics-prefix/
+ st\.fm # https://podsights.com/docs/
+ )/e
+ )/''', '', url)
+
+
+_HEX_TABLE = '0123456789abcdef'
+
+
+def random_uuidv4():
+ return re.sub(r'[xy]', lambda x: _HEX_TABLE[random.randint(0, 15)], 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx')
+
+
+def make_dir(path, to_screen=None):
+ try:
+ dn = os.path.dirname(path)
+ if dn and not os.path.exists(dn):
+ os.makedirs(dn)
+ return True
+ except (OSError, IOError) as err:
+ if callable(to_screen) is not None:
+ to_screen('unable to create directory ' + error_to_compat_str(err))
+ return False
+
+
+def get_executable_path():
+ path = os.path.dirname(sys.argv[0])
+ if os.path.abspath(sys.argv[0]) != os.path.abspath(sys.executable): # Not packaged
+ path = os.path.join(path, '..')
+ return os.path.abspath(path)
+
+
+def load_plugins(name, type, namespace):
+ plugin_info = [None]
+ classes = []
+ try:
+ plugin_info = imp.find_module(
+ name, [os.path.join(get_executable_path(), 'ytdlp_plugins')])
+ plugins = imp.load_module(name, *plugin_info)
+ for name in dir(plugins):
+ if not name.endswith(type):
+ continue
+ klass = getattr(plugins, name)
+ classes.append(klass)
+ namespace[name] = klass
+ except ImportError:
+ pass
+ finally:
+ if plugin_info[0] is not None:
+ plugin_info[0].close()
+ return classes
+
+
+def traverse_dict(dictn, keys, casesense=True):
+ if not isinstance(dictn, dict):
+ return None
+ first_key = keys[0]
+ if not casesense:
+ dictn = {key.lower(): val for key, val in dictn.items()}
+ first_key = first_key.lower()
+ value = dictn.get(first_key, None)
+ return value if len(keys) < 2 else traverse_dict(value, keys[1:], casesense)