]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/socks.py
[cleanup] Remove questionable extractors (#9911)
[yt-dlp.git] / yt_dlp / socks.py
index ffa960e03ca0ba8ee0153a1d228f2b6b86ad6e52..b4957ac2ed281dedc1bbc725cd7aa72765c44925 100644 (file)
@@ -8,12 +8,9 @@
 
 import collections
 import socket
+import struct
 
-from .compat import (
-    compat_ord,
-    compat_struct_pack,
-    compat_struct_unpack,
-)
+from .compat import compat_ord
 
 __author__ = 'Timo Schmid <coding@timoschmid.de>'
 
@@ -23,7 +20,7 @@
 # if the client cannot resolve the destination host's domain name to find its
 # IP address, it should set the first three bytes of DSTIP to NULL and the last
 # byte to a non-zero value.
-SOCKS4_DEFAULT_DSTIP = compat_struct_pack('!BBBB', 0, 0, 0, 0xFF)
+SOCKS4_DEFAULT_DSTIP = struct.pack('!BBBB', 0, 0, 0, 0xFF)
 
 SOCKS5_VERSION = 5
 SOCKS5_USER_AUTH_VERSION = 0x01
@@ -52,7 +49,7 @@ class Socks5AddressType:
     ATYP_IPV6 = 0x04
 
 
-class ProxyError(socket.error):
+class ProxyError(OSError):
     ERR_SUCCESS = 0x00
 
     def __init__(self, code=None, msg=None):
@@ -126,42 +123,47 @@ def recvall(self, cnt):
 
     def _recv_bytes(self, cnt):
         data = self.recvall(cnt)
-        return compat_struct_unpack(f'!{cnt}B', data)
+        return struct.unpack(f'!{cnt}B', data)
 
     @staticmethod
     def _len_and_data(data):
-        return compat_struct_pack('!B', len(data)) + data
+        return struct.pack('!B', len(data)) + data
 
     def _check_response_version(self, expected_version, got_version):
         if got_version != expected_version:
             self.close()
             raise InvalidVersionError(expected_version, got_version)
 
-    def _resolve_address(self, destaddr, default, use_remote_dns):
-        try:
-            return socket.inet_aton(destaddr)
-        except OSError:
-            if use_remote_dns and self._proxy.remote_dns:
-                return default
-            else:
-                return socket.inet_aton(socket.gethostbyname(destaddr))
+    def _resolve_address(self, destaddr, default, use_remote_dns, family=None):
+        for f in (family,) if family else (socket.AF_INET, socket.AF_INET6):
+            try:
+                return f, socket.inet_pton(f, destaddr)
+            except OSError:
+                continue
+
+        if use_remote_dns and self._proxy.remote_dns:
+            return 0, default
+        else:
+            res = socket.getaddrinfo(destaddr, None, family=family or 0)
+            f, _, _, _, ipaddr = res[0]
+            return f, socket.inet_pton(f, ipaddr[0])
 
     def _setup_socks4(self, address, is_4a=False):
         destaddr, port = address
 
-        ipaddr = self._resolve_address(destaddr, SOCKS4_DEFAULT_DSTIP, use_remote_dns=is_4a)
+        _, ipaddr = self._resolve_address(destaddr, SOCKS4_DEFAULT_DSTIP, use_remote_dns=is_4a, family=socket.AF_INET)
 
-        packet = compat_struct_pack('!BBH', SOCKS4_VERSION, Socks4Command.CMD_CONNECT, port) + ipaddr
+        packet = struct.pack('!BBH', SOCKS4_VERSION, Socks4Command.CMD_CONNECT, port) + ipaddr
 
-        username = (self._proxy.username or '').encode('utf-8')
+        username = (self._proxy.username or '').encode()
         packet += username + b'\x00'
 
-        if is_4a and self._proxy.remote_dns:
-            packet += destaddr.encode('utf-8') + b'\x00'
+        if is_4a and self._proxy.remote_dns and ipaddr == SOCKS4_DEFAULT_DSTIP:
+            packet += destaddr.encode() + b'\x00'
 
         self.sendall(packet)
 
-        version, resp_code, dstport, dsthost = compat_struct_unpack('!BBHI', self.recvall(8))
+        version, resp_code, dstport, dsthost = struct.unpack('!BBHI', self.recvall(8))
 
         self._check_response_version(SOCKS4_REPLY_VERSION, version)
 
@@ -175,14 +177,14 @@ def _setup_socks4a(self, address):
         self._setup_socks4(address, is_4a=True)
 
     def _socks5_auth(self):
-        packet = compat_struct_pack('!B', SOCKS5_VERSION)
+        packet = struct.pack('!B', SOCKS5_VERSION)
 
         auth_methods = [Socks5Auth.AUTH_NONE]
         if self._proxy.username and self._proxy.password:
             auth_methods.append(Socks5Auth.AUTH_USER_PASS)
 
-        packet += compat_struct_pack('!B', len(auth_methods))
-        packet += compat_struct_pack(f'!{len(auth_methods)}B', *auth_methods)
+        packet += struct.pack('!B', len(auth_methods))
+        packet += struct.pack(f'!{len(auth_methods)}B', *auth_methods)
 
         self.sendall(packet)
 
@@ -196,9 +198,9 @@ def _socks5_auth(self):
             raise Socks5Error(Socks5Auth.AUTH_NO_ACCEPTABLE)
 
         if method == Socks5Auth.AUTH_USER_PASS:
-            username = self._proxy.username.encode('utf-8')
-            password = self._proxy.password.encode('utf-8')
-            packet = compat_struct_pack('!B', SOCKS5_USER_AUTH_VERSION)
+            username = self._proxy.username.encode()
+            password = self._proxy.password.encode()
+            packet = struct.pack('!B', SOCKS5_USER_AUTH_VERSION)
             packet += self._len_and_data(username) + self._len_and_data(password)
             self.sendall(packet)
 
@@ -213,19 +215,21 @@ def _socks5_auth(self):
     def _setup_socks5(self, address):
         destaddr, port = address
 
-        ipaddr = self._resolve_address(destaddr, None, use_remote_dns=True)
+        family, ipaddr = self._resolve_address(destaddr, None, use_remote_dns=True)
 
         self._socks5_auth()
 
         reserved = 0
-        packet = compat_struct_pack('!BBB', SOCKS5_VERSION, Socks5Command.CMD_CONNECT, reserved)
+        packet = struct.pack('!BBB', SOCKS5_VERSION, Socks5Command.CMD_CONNECT, reserved)
         if ipaddr is None:
-            destaddr = destaddr.encode('utf-8')
-            packet += compat_struct_pack('!B', Socks5AddressType.ATYP_DOMAINNAME)
+            destaddr = destaddr.encode()
+            packet += struct.pack('!B', Socks5AddressType.ATYP_DOMAINNAME)
             packet += self._len_and_data(destaddr)
-        else:
-            packet += compat_struct_pack('!B', Socks5AddressType.ATYP_IPV4) + ipaddr
-        packet += compat_struct_pack('!H', port)
+        elif family == socket.AF_INET:
+            packet += struct.pack('!B', Socks5AddressType.ATYP_IPV4) + ipaddr
+        elif family == socket.AF_INET6:
+            packet += struct.pack('!B', Socks5AddressType.ATYP_IPV6) + ipaddr
+        packet += struct.pack('!H', port)
 
         self.sendall(packet)
 
@@ -244,7 +248,7 @@ def _setup_socks5(self, address):
             destaddr = self.recvall(alen)
         elif atype == Socks5AddressType.ATYP_IPV6:
             destaddr = self.recvall(16)
-        destport = compat_struct_unpack('!H', self.recvall(2))[0]
+        destport = struct.unpack('!H', self.recvall(2))[0]
 
         return (destaddr, destport)