]> jfr.im git - yt-dlp.git/commitdiff
[dependencies] Standardize `Cryptodome` imports
authorpukkandan <redacted>
Mon, 6 Feb 2023 21:52:29 +0000 (03:22 +0530)
committerpukkandan <redacted>
Wed, 8 Feb 2023 01:58:46 +0000 (07:28 +0530)
test/test_aes.py
test/test_compat.py
yt_dlp/aes.py
yt_dlp/compat/compat_utils.py
yt_dlp/dependencies/Cryptodome.py [new file with mode: 0644]
yt_dlp/dependencies/__init__.py [moved from yt_dlp/dependencies.py with 75% similarity]
yt_dlp/downloader/hls.py
yt_dlp/extractor/bilibili.py
yt_dlp/extractor/ivi.py

index 8e8fc0b3e7bf696d80871975788529426cf9d49e..18f15fecb6d5d19fc4ef3aa5229e1cb41de59ea8 100644 (file)
@@ -26,7 +26,7 @@
     key_expansion,
     pad_block,
 )
     key_expansion,
     pad_block,
 )
-from yt_dlp.dependencies import Cryptodome_AES
+from yt_dlp.dependencies import Cryptodome
 from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
 
 # the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
 from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
 
 # the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
@@ -48,7 +48,7 @@ def test_cbc_decrypt(self):
         data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
         decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
         data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
         decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
-        if Cryptodome_AES:
+        if Cryptodome:
             decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
 
             decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
 
@@ -78,7 +78,7 @@ def test_gcm_decrypt(self):
         decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
             bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
         decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
             bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
         self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
-        if Cryptodome_AES:
+        if Cryptodome:
             decrypted = aes_gcm_decrypt_and_verify_bytes(
                 data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
             decrypted = aes_gcm_decrypt_and_verify_bytes(
                 data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
             self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
index e3d775bc18e3f8a013c861240e67efdd144fb5e4..003a97abf73f468af9c49874eefbfd60399d4954 100644 (file)
@@ -31,6 +31,9 @@ def test_compat_passthrough(self):
         # TODO: Test submodule
         # compat.asyncio.events  # Must not raise error
 
         # TODO: Test submodule
         # compat.asyncio.events  # Must not raise error
 
+        with self.assertWarns(DeprecationWarning):
+            compat.compat_pycrypto_AES  # Must not raise error
+
     def test_compat_expanduser(self):
         old_home = os.environ.get('HOME')
         test_str = R'C:\Documents and Settings\тест\Application Data'
     def test_compat_expanduser(self):
         old_home = os.environ.get('HOME')
         test_str = R'C:\Documents and Settings\тест\Application Data'
index 60ce99cb1f10d5cfebe5eaed5c9782c3c9cc3750..deff0a2b3d4eec2ef2a749d5b289f777eb03e4e1 100644 (file)
@@ -2,17 +2,17 @@
 from math import ceil
 
 from .compat import compat_ord
 from math import ceil
 
 from .compat import compat_ord
-from .dependencies import Cryptodome_AES
+from .dependencies import Cryptodome
 from .utils import bytes_to_intlist, intlist_to_bytes
 
 from .utils import bytes_to_intlist, intlist_to_bytes
 
-if Cryptodome_AES:
+if Cryptodome:
     def aes_cbc_decrypt_bytes(data, key, iv):
         """ Decrypt bytes with AES-CBC using pycryptodome """
     def aes_cbc_decrypt_bytes(data, key, iv):
         """ Decrypt bytes with AES-CBC using pycryptodome """
-        return Cryptodome_AES.new(key, Cryptodome_AES.MODE_CBC, iv).decrypt(data)
+        return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_CBC, iv).decrypt(data)
 
     def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
         """ Decrypt bytes with AES-GCM using pycryptodome """
 
     def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
         """ Decrypt bytes with AES-GCM using pycryptodome """
-        return Cryptodome_AES.new(key, Cryptodome_AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
+        return Cryptodome.Cipher.AES.new(key, Cryptodome.Cipher.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag)
 
 else:
     def aes_cbc_decrypt_bytes(data, key, iv):
 
 else:
     def aes_cbc_decrypt_bytes(data, key, iv):
index b67944e6bd601b81c0c86260a7fc0d37c47c0686..373389a4661b88391419e89435e4d05caa15e99a 100644 (file)
 
 
 def get_package_info(module):
 
 
 def get_package_info(module):
-    parent = module.__name__.split('.')[0]
-    parent_module = None
-    with contextlib.suppress(ImportError):
-        parent_module = importlib.import_module(parent)
-
-    for attr in ('__version__', 'version_string', 'version'):
-        version = getattr(parent_module, attr, None)
-        if version is not None:
-            break
-    return _Package(getattr(module, '_yt_dlp__identifier', parent), str(version))
+    return _Package(
+        name=getattr(module, '_yt_dlp__identifier', module.__name__),
+        version=str(next(filter(None, (
+            getattr(module, attr, None)
+            for attr in ('__version__', 'version_string', 'version')
+        )), None)))
 
 
 def _is_package(module):
 
 
 def _is_package(module):
diff --git a/yt_dlp/dependencies/Cryptodome.py b/yt_dlp/dependencies/Cryptodome.py
new file mode 100644 (file)
index 0000000..b95f45d
--- /dev/null
@@ -0,0 +1,38 @@
+import importlib
+
+from ..compat import functools
+from ..compat.compat_utils import EnhancedModule, passthrough_module
+
+EnhancedModule(__name__)
+
+try:
+    import Cryptodome as _parent
+except ImportError:
+    try:
+        import Crypto as _parent
+    except (ImportError, SyntaxError):  # Old Crypto gives SyntaxError in newer Python
+        _parent = EnhancedModule('Cryptodome')
+        __bool__ = lambda: False
+
+
+@functools.cache
+def __getattr__(name):
+    try:
+        submodule = importlib.import_module(f'.{name}', _parent.__name__)
+    except ImportError:
+        return getattr(_parent, name)
+    return passthrough_module(f'{__name__}.{name}', submodule)
+
+
+@property
+@functools.cache
+def _yt_dlp__identifier():
+    if _parent.__name__ == 'Crypto':
+        from Crypto.Cipher import AES
+        try:
+            # In pycrypto, mode defaults to ECB. See:
+            # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
+            AES.new(b'abcdefghijklmnop')
+        except TypeError:
+            return 'pycrypto'
+    return _parent.__name__
similarity index 75%
rename from yt_dlp/dependencies.py
rename to yt_dlp/dependencies/__init__.py
index 5a5363adb1eb29494b3e91a20f038af7a1a913d3..c2214e6dbadca4ca7d482f68259f042d86641242 100644 (file)
         certifi = None
 
 
         certifi = None
 
 
-try:
-    from Cryptodome.Cipher import AES as Cryptodome_AES
-except ImportError:
-    try:
-        from Crypto.Cipher import AES as Cryptodome_AES
-    except (ImportError, SyntaxError):  # Old Crypto gives SyntaxError in newer Python
-        Cryptodome_AES = None
-    else:
-        try:
-            # In pycrypto, mode defaults to ECB. See:
-            # https://www.pycryptodome.org/en/latest/src/vs_pycrypto.html#:~:text=not%20have%20ECB%20as%20default%20mode
-            Cryptodome_AES.new(b'abcdefghijklmnop')
-        except TypeError:
-            pass
-        else:
-            Cryptodome_AES._yt_dlp__identifier = 'pycrypto'
-
-
 try:
     import mutagen
 except ImportError:
 try:
     import mutagen
 except ImportError:
         xattr._yt_dlp__identifier = 'pyxattr'
 
 
         xattr._yt_dlp__identifier = 'pyxattr'
 
 
+from . import Cryptodome
+
 all_dependencies = {k: v for k, v in globals().items() if not k.startswith('_')}
 all_dependencies = {k: v for k, v in globals().items() if not k.startswith('_')}
+available_dependencies = {k: v for k, v in all_dependencies.items() if v}
 
 
 
 
-available_dependencies = {k: v for k, v in all_dependencies.items() if v}
+# Deprecated
+Cryptodome_AES = Cryptodome.Cipher.AES if Cryptodome else None
 
 
 __all__ = [
 
 
 __all__ = [
index 2010f3dc9e24ca2988889e927ead749beaaa2d2b..ae18ac419a58480b7c33a50dc521bb369c402c70 100644 (file)
@@ -7,7 +7,7 @@
 from .external import FFmpegFD
 from .fragment import FragmentFD
 from .. import webvtt
 from .external import FFmpegFD
 from .fragment import FragmentFD
 from .. import webvtt
-from ..dependencies import Cryptodome_AES
+from ..dependencies import Cryptodome
 from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
 
 
 from ..utils import bug_reports_message, parse_m3u8_attributes, update_url_query
 
 
@@ -63,7 +63,7 @@ def real_download(self, filename, info_dict):
         can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
         if can_download:
             has_ffmpeg = FFmpegFD.available()
         can_download, message = self.can_download(s, info_dict, self.params.get('allow_unplayable_formats')), None
         if can_download:
             has_ffmpeg = FFmpegFD.available()
-            no_crypto = not Cryptodome_AES and '#EXT-X-KEY:METHOD=AES-128' in s
+            no_crypto = not Cryptodome and '#EXT-X-KEY:METHOD=AES-128' in s
             if no_crypto and has_ffmpeg:
                 can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
             elif no_crypto:
             if no_crypto and has_ffmpeg:
                 can_download, message = False, 'The stream has AES-128 encryption and pycryptodomex is not available'
             elif no_crypto:
index d4b05248f37f1259aa9b520d094cbe5d55bd83f8..266d57871e305b06340a16721346cca11a8cf39c 100644 (file)
@@ -6,6 +6,7 @@
 import urllib.parse
 
 from .common import InfoExtractor, SearchInfoExtractor
 import urllib.parse
 
 from .common import InfoExtractor, SearchInfoExtractor
+from ..dependencies import Cryptodome
 from ..utils import (
     ExtractorError,
     GeoRestrictedError,
 from ..utils import (
     ExtractorError,
     GeoRestrictedError,
@@ -893,22 +894,15 @@ def _parse_video_metadata(self, video_data):
         }
 
     def _perform_login(self, username, password):
         }
 
     def _perform_login(self, username, password):
-        try:
-            from Cryptodome.PublicKey import RSA
-            from Cryptodome.Cipher import PKCS1_v1_5
-        except ImportError:
-            try:
-                from Crypto.PublicKey import RSA
-                from Crypto.Cipher import PKCS1_v1_5
-            except ImportError:
-                raise ExtractorError('pycryptodomex not found. Please install', expected=True)
+        if not Cryptodome:
+            raise ExtractorError('pycryptodomex not found. Please install', expected=True)
 
         key_data = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
             note='Downloading login key', errnote='Unable to download login key')['data']
 
 
         key_data = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/key?lang=en-US', None,
             note='Downloading login key', errnote='Unable to download login key')['data']
 
-        public_key = RSA.importKey(key_data['key'])
-        password_hash = PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
+        public_key = Cryptodome.PublicKey.RSA.importKey(key_data['key'])
+        password_hash = Cryptodome.Cipher.PKCS1_v1_5.new(public_key).encrypt((key_data['hash'] + password).encode('utf-8'))
         login_post = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
                 'username': username,
         login_post = self._download_json(
             'https://passport.bilibili.tv/x/intl/passport-login/web/login/password?lang=en-US', None, data=urlencode_postdata({
                 'username': username,
index dc6a48196df14352f85ae2f46a466bb078745814..96220bea9c97d66c998710454eea2aaffdaae839 100644 (file)
@@ -2,11 +2,8 @@
 import re
 
 from .common import InfoExtractor
 import re
 
 from .common import InfoExtractor
-from ..utils import (
-    ExtractorError,
-    int_or_none,
-    qualities,
-)
+from ..dependencies import Cryptodome
+from ..utils import ExtractorError, int_or_none, qualities
 
 
 class IviIE(InfoExtractor):
 
 
 class IviIE(InfoExtractor):
@@ -94,18 +91,8 @@ def _real_extract(self, url):
         for site in (353, 183):
             content_data = (data % site).encode()
             if site == 353:
         for site in (353, 183):
             content_data = (data % site).encode()
             if site == 353:
-                try:
-                    from Cryptodome.Cipher import Blowfish
-                    from Cryptodome.Hash import CMAC
-                    pycryptodome_found = True
-                except ImportError:
-                    try:
-                        from Crypto.Cipher import Blowfish
-                        from Crypto.Hash import CMAC
-                        pycryptodome_found = True
-                    except ImportError:
-                        pycryptodome_found = False
-                        continue
+                if not Cryptodome:
+                    continue
 
                 timestamp = (self._download_json(
                     self._LIGHT_URL, video_id,
 
                 timestamp = (self._download_json(
                     self._LIGHT_URL, video_id,
@@ -118,7 +105,8 @@ def _real_extract(self, url):
 
                 query = {
                     'ts': timestamp,
 
                 query = {
                     'ts': timestamp,
-                    'sign': CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data, Blowfish).hexdigest(),
+                    'sign': Cryptodome.Hash.CMAC.new(self._LIGHT_KEY, timestamp.encode() + content_data,
+                                                     Cryptodome.Cipher.Blowfish).hexdigest(),
                 }
             else:
                 query = {}
                 }
             else:
                 query = {}
@@ -138,7 +126,7 @@ def _real_extract(self, url):
                     extractor_msg = 'Video %s does not exist'
                 elif site == 353:
                     continue
                     extractor_msg = 'Video %s does not exist'
                 elif site == 353:
                     continue
-                elif not pycryptodome_found:
+                elif not Cryptodome:
                     raise ExtractorError('pycryptodomex not found. Please install', expected=True)
                 elif message:
                     extractor_msg += ': ' + message
                     raise ExtractorError('pycryptodomex not found. Please install', expected=True)
                 elif message:
                     extractor_msg += ': ' + message