]> jfr.im git - yt-dlp.git/commitdiff
[dependencies] Simplify `Cryptodome`
authorpukkandan <redacted>
Tue, 28 Feb 2023 17:40:54 +0000 (23:10 +0530)
committerpukkandan <redacted>
Tue, 28 Feb 2023 17:45:13 +0000 (23:15 +0530)
Closes #6292, closes #6272, closes #6338

test/test_aes.py
yt_dlp/__pyinstaller/hook-yt_dlp.py
yt_dlp/aes.py
yt_dlp/compat/_legacy.py
yt_dlp/compat/compat_utils.py
yt_dlp/dependencies/Cryptodome.py
yt_dlp/dependencies/__init__.py
yt_dlp/downloader/hls.py
yt_dlp/extractor/bilibili.py
yt_dlp/extractor/ivi.py
yt_dlp/extractor/wrestleuniverse.py

index 18f15fecb6d5d19fc4ef3aa5229e1cb41de59ea8..a26abfd7d0e4e1d5fa60b0d55b84f2a5ec617540 100644 (file)
@@ -48,7 +48,7 @@ def test_cbc_decrypt(self):
         data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
         decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
-        if Cryptodome:
+        if Cryptodome.AES:
             decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
 
@@ -78,7 +78,7 @@ def test_gcm_decrypt(self):
         decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
             bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
-        if Cryptodome:
+        if Cryptodome.AES:
             decrypted = aes_gcm_decrypt_and_verify_bytes(
                 data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
index 057cfef2f9ca0d3d85460d2289349e6d9a2231f1..63dcdffe026b122ff4ad1c2c6f2c092bc3742b12 100644 (file)
@@ -1,30 +1,8 @@
-import ast
-import os
 import sys
-from pathlib import Path
 
 from PyInstaller.utils.hooks import collect_submodules
 
 
-def find_attribute_accesses(node, name, path=()):
-    if isinstance(node, ast.Attribute):
-        path = [*path, node.attr]
-        if isinstance(node.value, ast.Name) and node.value.id == name:
-            yield path[::-1]
-    for child in ast.iter_child_nodes(node):
-        yield from find_attribute_accesses(child, name, path)
-
-
-def collect_used_submodules(name, level):
-    for dirpath, _, filenames in os.walk(Path(__file__).parent.parent):
-        for filename in filenames:
-            if not filename.endswith('.py'):
-                continue
-            with open(Path(dirpath) / filename, encoding='utf8') as f:
-                for submodule in find_attribute_accesses(ast.parse(f.read()), name):
-                    yield '.'.join(submodule[:level])
-
-
 def pycryptodome_module():
     try:
         import Cryptodome  # noqa: F401
@@ -41,12 +19,8 @@ def pycryptodome_module():
 
 def get_hidden_imports():
     yield 'yt_dlp.compat._legacy'
+    yield pycryptodome_module()
     yield from collect_submodules('websockets')
-
-    crypto = pycryptodome_module()
-    for sm in set(collect_used_submodules('Cryptodome', 2)):
-        yield f'{crypto}.{sm}'
-
     # These are auto-detected, but explicitly add them just in case
     yield from ('mutagen', 'brotli', 'certifi')
 
index deff0a2b3d4eec2ef2a749d5b289f777eb03e4e1..b3a383cd9c3f87bee4b327e703f10fb905ccf848 100644 (file)
@@ -5,14 +5,14 @@
 from .dependencies import Cryptodome
 from .utils import bytes_to_intlist, intlist_to_bytes
 
-if Cryptodome:
+if Cryptodome.AES:
     def aes_cbc_decrypt_bytes(data, key, iv):
         """ Decrypt bytes with AES-CBC using pycryptodome """
-        return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_CBC, iv).decrypt(data)
+        return Cryptodome.AES.new(key, Cryptodome.AES.MODE_CBC, iv).decrypt(data)
 
     def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
         """ Decrypt bytes with AES-GCM using pycryptodome """
-        return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
+        return Cryptodome.AES.new(key, Cryptodome.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
 
 else:
     def aes_cbc_decrypt_bytes(data, key, iv):
index 84d749209e4b8a849fb2db0932d90f2f22028474..83bf869a8000e1207a9e4a3830afe753e46bb048 100644 (file)
@@ -32,9 +32,9 @@
 
 from . import compat_expanduser, compat_HTMLParseError, compat_realpath
 from .compat_utils import passthrough_module
-from ..dependencies import Cryptodome_AES as compat_pycrypto_AES  # noqa: F401
 from ..dependencies import brotli as compat_brotli  # noqa: F401
 from ..dependencies import websockets as compat_websockets  # noqa: F401
+from ..dependencies.Cryptodome import AES as compat_pycrypto_AES  # noqa: F401
 
 passthrough_module(__name__, '...utils', ('WINDOWS_VT_MODE', 'windows_enable_vt_mode'))
 
index 8956b3bf1f7a64b25cd97be5e0f06a0c506e077e..3ca46d270c9cca6d4f83eac176b0edfe9ceed1b7 100644 (file)
@@ -48,7 +48,7 @@ def passthrough_module(parent, child, allowed_attributes=(..., ), *, callback=la
     """Passthrough parent module into a child module, creating the parent if necessary"""
     def __getattr__(attr):
         if _is_package(parent):
-            with contextlib.suppress(ImportError):
+            with contextlib.suppress(ModuleNotFoundError):
                 return importlib.import_module(f'.{attr}', parent.__name__)
 
         ret = from_child(attr)
index 2adc513740af195891a90b5fbcf10ba8ad7af284..a50bce4d4f4e948adcb4808ba764ea02cbf0d681 100644 (file)
@@ -1,8 +1,5 @@
 import types
 
-from ..compat import functools
-from ..compat.compat_utils import passthrough_module
-
 try:
     import Cryptodome as _parent
 except ImportError:
         _parent = types.ModuleType('no_Cryptodome')
         __bool__ = lambda: False
 
-passthrough_module(__name__, _parent, (..., '__version__'))
-del passthrough_module
+__version__ = ''
+AES = PKCS1_v1_5 = Blowfish = PKCS1_OAEP = SHA1 = CMAC = RSA = None
+try:
+    if _parent.__name__ == 'Cryptodome':
+        from Cryptodome import __version__
+        from Cryptodome.Cipher import AES
+        from Cryptodome.Cipher import PKCS1_v1_5
+        from Cryptodome.Cipher import Blowfish
+        from Cryptodome.Cipher import PKCS1_OAEP
+        from Cryptodome.Hash import SHA1
+        from Cryptodome.Hash import CMAC
+        from Cryptodome.PublicKey import RSA
+    elif _parent.__name__ == 'Crypto':
+        from Crypto import __version__
+        from Crypto.Cipher import AES
+        from Crypto.Cipher import PKCS1_v1_5
+        from Crypto.Cipher import Blowfish
+        from Crypto.Cipher import PKCS1_OAEP
+        from Crypto.Hash import SHA1
+        from Crypto.Hash import CMAC
+        from Crypto.PublicKey import RSA
+except ImportError:
+    __version__ = f'broken {__version__}'.strip()
 
 
-@property
-@functools.cache
-def _yt_dlp__identifier():
-    if _parent.__name__ == 'Crypto':
-        from Crypto.Cipher import AES
-        try:
-            # In pycrypto, mode defaults to ECB. See:
-            # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
-            AES.new(b'abcdefghijklmnop')
-        except TypeError:
-            return 'pycrypto'
-    return _parent.__name__
+_yt_dlp__identifier = _parent.__name__
+if AES and _yt_dlp__identifier == 'Crypto':
+    try:
+        # In pycrypto, mode defaults to ECB. See:
+        # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
+        AES.new(b'abcdefghijklmnop')
+    except TypeError:
+        _yt_dlp__identifier = 'pycrypto'
index c2214e6dbadca4ca7d482f68259f042d86641242..6e7d29c5cac19e20728598a245aaa595e182c764 100644 (file)
@@ -73,7 +73,7 @@
 
 
 # Deprecated
-Cryptodome_AES = Cryptodome.Cipher.AES if Cryptodome else None
+Cryptodome_AES = Cryptodome.AES
 
 
 __all__ = [
index 29d6f6241111a786ec66b792db1d29cef99342de..f2868dc52bb10c6b5ad64a3fbf50266a8e00a89e 100644 (file)
@@ -70,7 +70,7 @@ def real_download(self, filename, info_dict):
         can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
         if can_download:
             has_ffmpeg = FFmpegFD.available()
-            no_crypto = not Cryptodome and '#EXT-X-KEY:METHOD=AES-128' in s
+            no_crypto = not Cryptodome.AES and '#EXT-X-KEY:METHOD=AES-128' in s
             if no_crypto and has_ffmpeg:
                 can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
             elif no_crypto:
index f4180633ab6aacc04acf051ba4298f762fa5ba14..2252840b3aabdf9ab56d9249d529162bf965f084 100644 (file)
@@ -894,15 +894,15 @@ def _parse_video_metadata(self, video_data):
         }
 
     def _perform_login(self, username, password):
-        if not Cryptodome:
+        if not Cryptodome.RSA:
             raise ExtractorError('pycryptodomex not found. Please install', expected=True)
 
         key_data = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
             note='Downloading login key', errnote='Unable to download login key')['data']
 
-        public_key = Cryptodome.PublicKey.RSA.importKey(key_data['key'])
-        password_hash = Cryptodome.Cipher.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
+        public_key = Cryptodome.RSA.importKey(key_data['key'])
+        password_hash = Cryptodome.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
         login_post = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
                 'username': username,
index 96220bea9c97d66c998710454eea2aaffdaae839..fa5ceec95bbb508f66004e18ffde12c87cc505cb 100644 (file)
@@ -91,7 +91,7 @@ def _real_extract(self, url):
         for site in (353, 183):
             content_data = (data % site).encode()
             if site == 353:
-                if not Cryptodome:
+                if not Cryptodome.CMAC:
                     continue
 
                 timestamp = (self._download_json(
@@ -105,8 +105,8 @@ def _real_extract(self, url):
 
                 query = {
                     'ts': timestamp,
-                    'sign': Cryptodome.Hash.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data,
-                                                     Cryptodome.Cipher.Blowfish).hexdigest(),
+                    'sign': Cryptodome.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data,
+                                                Cryptodome.Blowfish).hexdigest(),
                 }
             else:
                 query = {}
@@ -126,7 +126,7 @@ def _real_extract(self, url):
                     extractor_msg = 'Video %s does not exist'
                 elif site == 353:
                     continue
-                elif not Cryptodome:
+                elif not Cryptodome.CMAC:
                     raise ExtractorError('pycryptodomex not found. Please install', expected=True)
                 elif message:
                     extractor_msg += ': ' + message
index 78e7c83abc3c5ef566e1d9b89fa73483bf42bf88..5c6dec2c40a7a0833768ee31392ab653d81539e4 100644 (file)
@@ -50,10 +50,10 @@ def _call_api(self, video_id, param='', msg='API', auth=True, data=None, query={
             data=data, headers=headers, query=query, fatal=fatal)
 
     def _call_encrypted_api(self, video_id, param='', msg='API', data={}, query={}, fatal=True):
-        if not Cryptodome:
+        if not Cryptodome.RSA:
             raise ExtractorError('pycryptodomex not found. Please install', expected=True)
-        private_key = Cryptodome.PublicKey.RSA.generate(2048)
-        cipher = Cryptodome.Cipher.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.Hash.SHA1)
+        private_key = Cryptodome.RSA.generate(2048)
+        cipher = Cryptodome.PKCS1_OAEP.new(private_key, hashAlgo=Cryptodome.SHA1)
 
         def decrypt(data):
             if not data: