]>
jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_vendor/urllib3/util/ssltransport.py
5 from ..exceptions
import ProxySchemeUnsupported
6 from ..packages
import six
13 The SSLTransport wraps an existing socket and establishes an SSL connection.
15 Contrary to Python's implementation of SSLSocket, it allows you to chain
16 multiple TLS connections together. It's particularly useful if you need to
17 implement TLS within TLS.
19 The class supports most of the socket API operations.
23 def _validate_ssl_context_for_tls_in_tls(ssl_context
):
25 Raises a ProxySchemeUnsupported if the provided ssl_context can't be used
28 The only requirement is that the ssl_context provides the 'wrap_bio'
32 if not hasattr(ssl_context
, "wrap_bio"):
34 raise ProxySchemeUnsupported(
35 "TLS in TLS requires SSLContext.wrap_bio() which isn't "
36 "supported on Python 2"
39 raise ProxySchemeUnsupported(
40 "TLS in TLS requires SSLContext.wrap_bio() which isn't "
41 "available on non-native SSLContext"
45 self
, socket
, ssl_context
, server_hostname
=None, suppress_ragged_eofs
=True
48 Create an SSLTransport around socket using the provided ssl_context.
50 self
.incoming
= ssl
.MemoryBIO()
51 self
.outgoing
= ssl
.MemoryBIO()
53 self
.suppress_ragged_eofs
= suppress_ragged_eofs
56 self
.sslobj
= ssl_context
.wrap_bio(
57 self
.incoming
, self
.outgoing
, server_hostname
=server_hostname
60 # Perform initial handshake.
61 self
._ssl
_io
_loop
(self
.sslobj
.do_handshake
)
66 def __exit__(self
, *_
):
70 return self
.socket
.fileno()
72 def read(self
, len=1024, buffer=None):
73 return self
._wrap
_ssl
_read
(len, buffer)
75 def recv(self
, len=1024, flags
=0):
77 raise ValueError("non-zero flags not allowed in calls to recv")
78 return self
._wrap
_ssl
_read
(len)
80 def recv_into(self
, buffer, nbytes
=None, flags
=0):
82 raise ValueError("non-zero flags not allowed in calls to recv_into")
83 if buffer and (nbytes
is None):
87 return self
.read(nbytes
, buffer)
89 def sendall(self
, data
, flags
=0):
91 raise ValueError("non-zero flags not allowed in calls to sendall")
93 with memoryview(data
) as view
, view
.cast("B") as byte_view
:
94 amount
= len(byte_view
)
96 v
= self
.send(byte_view
[count
:])
99 def send(self
, data
, flags
=0):
101 raise ValueError("non-zero flags not allowed in calls to send")
102 response
= self
._ssl
_io
_loop
(self
.sslobj
.write
, data
)
106 self
, mode
="r", buffering
=None, encoding
=None, errors
=None, newline
=None
109 Python's httpclient uses makefile and buffered io when reading HTTP
110 messages and we need to support it.
112 This is unfortunately a copy and paste of socket.py makefile with small
113 changes to point to the socket directly.
115 if not set(mode
) <= {"r", "w", "b"}
:
116 raise ValueError("invalid mode %r (only r, w, b allowed)" % (mode
,))
118 writing
= "w" in mode
119 reading
= "r" in mode
or not writing
120 assert reading
or writing
127 raw
= socket
.SocketIO(self
, rawmode
)
128 self
.socket
._io
_refs
+= 1
129 if buffering
is None:
132 buffering
= io
.DEFAULT_BUFFER_SIZE
135 raise ValueError("unbuffered streams must be binary")
137 if reading
and writing
:
138 buffer = io
.BufferedRWPair(raw
, raw
, buffering
)
140 buffer = io
.BufferedReader(raw
, buffering
)
143 buffer = io
.BufferedWriter(raw
, buffering
)
146 text
= io
.TextIOWrapper(buffer, encoding
, errors
, newline
)
151 self
._ssl
_io
_loop
(self
.sslobj
.unwrap
)
156 def getpeercert(self
, binary_form
=False):
157 return self
.sslobj
.getpeercert(binary_form
)
160 return self
.sslobj
.version()
163 return self
.sslobj
.cipher()
165 def selected_alpn_protocol(self
):
166 return self
.sslobj
.selected_alpn_protocol()
168 def selected_npn_protocol(self
):
169 return self
.sslobj
.selected_npn_protocol()
171 def shared_ciphers(self
):
172 return self
.sslobj
.shared_ciphers()
174 def compression(self
):
175 return self
.sslobj
.compression()
177 def settimeout(self
, value
):
178 self
.socket
.settimeout(value
)
180 def gettimeout(self
):
181 return self
.socket
.gettimeout()
183 def _decref_socketios(self
):
184 self
.socket
._decref
_socketios
()
186 def _wrap_ssl_read(self
, len, buffer=None):
188 return self
._ssl
_io
_loop
(self
.sslobj
.read
, len, buffer)
189 except ssl
.SSLError
as e
:
190 if e
.errno
== ssl
.SSL_ERROR_EOF
and self
.suppress_ragged_eofs
:
191 return 0 # eof, return 0.
195 def _ssl_io_loop(self
, func
, *args
):
196 """Performs an I/O loop between incoming/outgoing and the socket."""
204 except ssl
.SSLError
as e
:
205 if e
.errno
not in (ssl
.SSL_ERROR_WANT_READ
, ssl
.SSL_ERROR_WANT_WRITE
):
206 # WANT_READ, and WANT_WRITE are expected, others are not.
210 buf
= self
.outgoing
.read()
211 self
.socket
.sendall(buf
)
215 elif errno
== ssl
.SSL_ERROR_WANT_READ
:
216 buf
= self
.socket
.recv(SSL_BLOCKSIZE
)
218 self
.incoming
.write(buf
)
220 self
.incoming
.write_eof()