X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/1d3586d0d513783e313428a6c57e18691a51f1fe..f859ed3ba1e8b129ae6a467592c65687e73fbca1:/yt_dlp/aes.py diff --git a/yt_dlp/aes.py b/yt_dlp/aes.py index b37f0dd39..b3a383cd9 100644 --- a/yt_dlp/aes.py +++ b/yt_dlp/aes.py @@ -1,26 +1,18 @@ -from __future__ import unicode_literals - +import base64 from math import ceil -from .compat import ( - compat_b64decode, - compat_ord, - compat_pycrypto_AES, -) -from .utils import ( - bytes_to_intlist, - intlist_to_bytes, -) - +from .compat import compat_ord +from .dependencies import Cryptodome +from .utils import bytes_to_intlist, intlist_to_bytes -if compat_pycrypto_AES: +if Cryptodome.AES: def aes_cbc_decrypt_bytes(data, key, iv): """ Decrypt bytes with AES-CBC using pycryptodome """ - return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_CBC, iv).decrypt(data) + return Cryptodome.AES.new(key, Cryptodome.AES.MODE_CBC, iv).decrypt(data) def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce): """ Decrypt bytes with AES-GCM using pycryptodome """ - return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_GCM, nonce).decrypt_and_verify(data, tag) + return Cryptodome.AES.new(key, Cryptodome.AES.MODE_GCM, nonce).decrypt_and_verify(data, tag) else: def aes_cbc_decrypt_bytes(data, key, iv): @@ -32,16 +24,59 @@ def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce): return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce)))) +def aes_cbc_encrypt_bytes(data, key, iv, **kwargs): + return intlist_to_bytes(aes_cbc_encrypt(*map(bytes_to_intlist, (data, key, iv)), **kwargs)) + + +BLOCK_SIZE_BYTES = 16 + + def unpad_pkcs7(data): return data[:-compat_ord(data[-1])] -BLOCK_SIZE_BYTES = 16 +def pkcs7_padding(data): + """ + PKCS#7 padding + + @param {int[]} data cleartext + @returns {int[]} padding data + """ + + remaining_length = BLOCK_SIZE_BYTES - len(data) % BLOCK_SIZE_BYTES + return data + [remaining_length] * remaining_length + + +def pad_block(block, padding_mode): + """ + Pad a block with the given padding mode + @param {int[]} block block to pad + @param padding_mode padding mode + """ + padding_size = BLOCK_SIZE_BYTES - len(block) + + PADDING_BYTE = { + 'pkcs7': padding_size, + 'iso7816': 0x0, + 'whitespace': 0x20, + 'zero': 0x0, + } + + if padding_size < 0: + raise ValueError('Block size exceeded') + elif padding_mode not in PADDING_BYTE: + raise NotImplementedError(f'Padding mode {padding_mode} is not implemented') + + if padding_mode == 'iso7816' and padding_size: + block = block + [0x80] # NB: += mutates list + padding_size -= 1 + + return block + [PADDING_BYTE[padding_mode]] * padding_size def aes_ecb_encrypt(data, key, iv=None): """ - Encrypt with aes in ECB mode + Encrypt with aes in ECB mode. Using PKCS#7 padding @param {int[]} data cleartext @param {int[]} key 16/24/32-Byte cipher key @@ -54,8 +89,7 @@ def aes_ecb_encrypt(data, key, iv=None): encrypted_data = [] for i in range(block_count): block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES] - encrypted_data += aes_encrypt(block, expanded_key) - encrypted_data = encrypted_data[:len(data)] + encrypted_data += aes_encrypt(pkcs7_padding(block), expanded_key) return encrypted_data @@ -145,13 +179,14 @@ def aes_cbc_decrypt(data, key, iv): return decrypted_data -def aes_cbc_encrypt(data, key, iv): +def aes_cbc_encrypt(data, key, iv, *, padding_mode='pkcs7'): """ - Encrypt with aes in CBC mode. Using PKCS#7 padding + Encrypt with aes in CBC mode @param {int[]} data cleartext @param {int[]} key 16/24/32-Byte cipher key @param {int[]} iv 16-Byte IV + @param padding_mode Padding mode to use @returns {int[]} encrypted data """ expanded_key = key_expansion(key) @@ -161,8 +196,8 @@ def aes_cbc_encrypt(data, key, iv): previous_cipher_block = iv for i in range(block_count): block = data[i * BLOCK_SIZE_BYTES: (i + 1) * BLOCK_SIZE_BYTES] - remaining_length = BLOCK_SIZE_BYTES - len(block) - block += [remaining_length] * remaining_length + block = pad_block(block, padding_mode) + mixed_block = xor(block, previous_cipher_block) encrypted_block = aes_encrypt(mixed_block, expanded_key) @@ -273,8 +308,8 @@ def aes_decrypt_text(data, password, key_size_bytes): """ NONCE_LENGTH_BYTES = 8 - data = bytes_to_intlist(compat_b64decode(data)) - password = bytes_to_intlist(password.encode('utf-8')) + data = bytes_to_intlist(base64.b64decode(data)) + password = bytes_to_intlist(password.encode()) key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password)) key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES) @@ -503,20 +538,30 @@ def ghash(subkey, data): last_y = [0] * BLOCK_SIZE_BYTES for i in range(0, len(data), BLOCK_SIZE_BYTES): - block = data[i : i + BLOCK_SIZE_BYTES] # noqa: E203 + block = data[i: i + BLOCK_SIZE_BYTES] last_y = block_product(xor(last_y, block), subkey) return last_y __all__ = [ - 'aes_ctr_decrypt', 'aes_cbc_decrypt', 'aes_cbc_decrypt_bytes', + 'aes_ctr_decrypt', 'aes_decrypt_text', - 'aes_encrypt', + 'aes_decrypt', + 'aes_ecb_decrypt', 'aes_gcm_decrypt_and_verify', 'aes_gcm_decrypt_and_verify_bytes', + + 'aes_cbc_encrypt', + 'aes_cbc_encrypt_bytes', + 'aes_ctr_encrypt', + 'aes_ecb_encrypt', + 'aes_encrypt', + 'key_expansion', + 'pad_block', + 'pkcs7_padding', 'unpad_pkcs7', ]