]>
jfr.im git - yt-dlp.git/blob - yt_dlp/socks.py
1 # Public Domain SOCKS proxy protocol implementation
2 # Adapted from https://gist.github.com/bluec0re/cafd3764412967417fd3
4 # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
5 # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
6 # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
7 # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
12 from .compat
import compat_ord
, compat_struct_pack
, compat_struct_unpack
14 __author__
= 'Timo Schmid <coding@timoschmid.de>'
17 SOCKS4_REPLY_VERSION
= 0x00
18 # Excerpt from SOCKS4A protocol:
19 # if the client cannot resolve the destination host's domain name to find its
20 # IP address, it should set the first three bytes of DSTIP to NULL and the last
21 # byte to a non-zero value.
22 SOCKS4_DEFAULT_DSTIP
= compat_struct_pack('!BBBB', 0, 0, 0, 0xFF)
25 SOCKS5_USER_AUTH_VERSION
= 0x01
26 SOCKS5_USER_AUTH_SUCCESS
= 0x00
34 class Socks5Command(Socks4Command
):
35 CMD_UDP_ASSOCIATE
= 0x03
42 AUTH_NO_ACCEPTABLE
= 0xFF # For server response
45 class Socks5AddressType
:
47 ATYP_DOMAINNAME
= 0x03
51 class ProxyError(socket
.error
):
54 def __init__(self
, code
=None, msg
=None):
55 if code
is not None and msg
is None:
56 msg
= self
.CODES
.get(code
) or 'unknown error'
57 super().__init
__(code
, msg
)
60 class InvalidVersionError(ProxyError
):
61 def __init__(self
, expected_version
, got_version
):
62 msg
= ('Invalid response version from server. Expected {:02x} got '
63 '{:02x}'.format(expected_version
, got_version
))
64 super().__init
__(0, msg
)
67 class Socks4Error(ProxyError
):
71 91: 'request rejected or failed',
72 92: 'request rejected because SOCKS server cannot connect to identd on the client',
73 93: 'request rejected because the client program and identd report different user-ids'
77 class Socks5Error(ProxyError
):
78 ERR_GENERAL_FAILURE
= 0x01
81 0x01: 'general SOCKS server failure',
82 0x02: 'connection not allowed by ruleset',
83 0x03: 'Network unreachable',
84 0x04: 'Host unreachable',
85 0x05: 'Connection refused',
87 0x07: 'Command not supported',
88 0x08: 'Address type not supported',
89 0xFE: 'unknown username or invalid password',
90 0xFF: 'all offered authentication methods were rejected'
100 Proxy
= collections
.namedtuple('Proxy', (
101 'type', 'host', 'port', 'username', 'password', 'remote_dns'))
104 class sockssocket(socket
.socket
):
105 def __init__(self
, *args
, **kwargs
):
107 super().__init
__(*args
, **kwargs
)
109 def setproxy(self
, proxytype
, addr
, port
, rdns
=True, username
=None, password
=None):
110 assert proxytype
in (ProxyType
.SOCKS4
, ProxyType
.SOCKS4A
, ProxyType
.SOCKS5
)
112 self
._proxy
= Proxy(proxytype
, addr
, port
, username
, password
, rdns
)
114 def recvall(self
, cnt
):
116 while len(data
) < cnt
:
117 cur
= self
.recv(cnt
- len(data
))
119 raise EOFError(f
'{cnt - len(data)} bytes missing')
123 def _recv_bytes(self
, cnt
):
124 data
= self
.recvall(cnt
)
125 return compat_struct_unpack(f
'!{cnt}B', data
)
128 def _len_and_data(data
):
129 return compat_struct_pack('!B', len(data
)) + data
131 def _check_response_version(self
, expected_version
, got_version
):
132 if got_version
!= expected_version
:
134 raise InvalidVersionError(expected_version
, got_version
)
136 def _resolve_address(self
, destaddr
, default
, use_remote_dns
):
138 return socket
.inet_aton(destaddr
)
140 if use_remote_dns
and self
._proxy
.remote_dns
:
143 return socket
.inet_aton(socket
.gethostbyname(destaddr
))
145 def _setup_socks4(self
, address
, is_4a
=False):
146 destaddr
, port
= address
148 ipaddr
= self
._resolve
_address
(destaddr
, SOCKS4_DEFAULT_DSTIP
, use_remote_dns
=is_4a
)
150 packet
= compat_struct_pack('!BBH', SOCKS4_VERSION
, Socks4Command
.CMD_CONNECT
, port
) + ipaddr
152 username
= (self
._proxy
.username
or '').encode()
153 packet
+= username
+ b
'\x00'
155 if is_4a
and self
._proxy
.remote_dns
:
156 packet
+= destaddr
.encode() + b
'\x00'
160 version
, resp_code
, dstport
, dsthost
= compat_struct_unpack('!BBHI', self
.recvall(8))
162 self
._check
_response
_version
(SOCKS4_REPLY_VERSION
, version
)
164 if resp_code
!= Socks4Error
.ERR_SUCCESS
:
166 raise Socks4Error(resp_code
)
168 return (dsthost
, dstport
)
170 def _setup_socks4a(self
, address
):
171 self
._setup
_socks
4(address
, is_4a
=True)
173 def _socks5_auth(self
):
174 packet
= compat_struct_pack('!B', SOCKS5_VERSION
)
176 auth_methods
= [Socks5Auth
.AUTH_NONE
]
177 if self
._proxy
.username
and self
._proxy
.password
:
178 auth_methods
.append(Socks5Auth
.AUTH_USER_PASS
)
180 packet
+= compat_struct_pack('!B', len(auth_methods
))
181 packet
+= compat_struct_pack(f
'!{len(auth_methods)}B', *auth_methods
)
185 version
, method
= self
._recv
_bytes
(2)
187 self
._check
_response
_version
(SOCKS5_VERSION
, version
)
189 if method
== Socks5Auth
.AUTH_NO_ACCEPTABLE
or (
190 method
== Socks5Auth
.AUTH_USER_PASS
and (not self
._proxy
.username
or not self
._proxy
.password
)):
192 raise Socks5Error(Socks5Auth
.AUTH_NO_ACCEPTABLE
)
194 if method
== Socks5Auth
.AUTH_USER_PASS
:
195 username
= self
._proxy
.username
.encode()
196 password
= self
._proxy
.password
.encode()
197 packet
= compat_struct_pack('!B', SOCKS5_USER_AUTH_VERSION
)
198 packet
+= self
._len
_and
_data
(username
) + self
._len
_and
_data
(password
)
201 version
, status
= self
._recv
_bytes
(2)
203 self
._check
_response
_version
(SOCKS5_USER_AUTH_VERSION
, version
)
205 if status
!= SOCKS5_USER_AUTH_SUCCESS
:
207 raise Socks5Error(Socks5Error
.ERR_GENERAL_FAILURE
)
209 def _setup_socks5(self
, address
):
210 destaddr
, port
= address
212 ipaddr
= self
._resolve
_address
(destaddr
, None, use_remote_dns
=True)
217 packet
= compat_struct_pack('!BBB', SOCKS5_VERSION
, Socks5Command
.CMD_CONNECT
, reserved
)
219 destaddr
= destaddr
.encode()
220 packet
+= compat_struct_pack('!B', Socks5AddressType
.ATYP_DOMAINNAME
)
221 packet
+= self
._len
_and
_data
(destaddr
)
223 packet
+= compat_struct_pack('!B', Socks5AddressType
.ATYP_IPV4
) + ipaddr
224 packet
+= compat_struct_pack('!H', port
)
228 version
, status
, reserved
, atype
= self
._recv
_bytes
(4)
230 self
._check
_response
_version
(SOCKS5_VERSION
, version
)
232 if status
!= Socks5Error
.ERR_SUCCESS
:
234 raise Socks5Error(status
)
236 if atype
== Socks5AddressType
.ATYP_IPV4
:
237 destaddr
= self
.recvall(4)
238 elif atype
== Socks5AddressType
.ATYP_DOMAINNAME
:
239 alen
= compat_ord(self
.recv(1))
240 destaddr
= self
.recvall(alen
)
241 elif atype
== Socks5AddressType
.ATYP_IPV6
:
242 destaddr
= self
.recvall(16)
243 destport
= compat_struct_unpack('!H', self
.recvall(2))[0]
245 return (destaddr
, destport
)
247 def _make_proxy(self
, connect_func
, address
):
249 return connect_func(self
, address
)
251 result
= connect_func(self
, (self
._proxy
.host
, self
._proxy
.port
))
252 if result
!= 0 and result
is not None:
255 ProxyType
.SOCKS4
: self
._setup
_socks
4,
256 ProxyType
.SOCKS4A
: self
._setup
_socks
4a
,
257 ProxyType
.SOCKS5
: self
._setup
_socks
5,
259 setup_funcs
[self
._proxy
.type](address
)
262 def connect(self
, address
):
263 self
._make
_proxy
(socket
.socket
.connect
, address
)
265 def connect_ex(self
, address
):
266 return self
._make
_proxy
(socket
.socket
.connect_ex
, address
)