]> jfr.im git - dlqueue.git/blame - venv/lib/python3.11/site-packages/pip/_vendor/urllib3/contrib/ntlmpool.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _vendor / urllib3 / contrib / ntlmpool.py
CommitLineData
e0df8241
JR
1"""
2NTLM authenticating pool, contributed by erikcederstran
3
4Issue #10, see: http://code.google.com/p/urllib3/issues/detail?id=10
5"""
6from __future__ import absolute_import
7
8import warnings
9from logging import getLogger
10
11from ntlm import ntlm
12
13from .. import HTTPSConnectionPool
14from ..packages.six.moves.http_client import HTTPSConnection
15
16warnings.warn(
17 "The 'urllib3.contrib.ntlmpool' module is deprecated and will be removed "
18 "in urllib3 v2.0 release, urllib3 is not able to support it properly due "
19 "to reasons listed in issue: https://github.com/urllib3/urllib3/issues/2282. "
20 "If you are a user of this module please comment in the mentioned issue.",
21 DeprecationWarning,
22)
23
24log = getLogger(__name__)
25
26
27class NTLMConnectionPool(HTTPSConnectionPool):
28 """
29 Implements an NTLM authentication version of an urllib3 connection pool
30 """
31
32 scheme = "https"
33
34 def __init__(self, user, pw, authurl, *args, **kwargs):
35 """
36 authurl is a random URL on the server that is protected by NTLM.
37 user is the Windows user, probably in the DOMAIN\\username format.
38 pw is the password for the user.
39 """
40 super(NTLMConnectionPool, self).__init__(*args, **kwargs)
41 self.authurl = authurl
42 self.rawuser = user
43 user_parts = user.split("\\", 1)
44 self.domain = user_parts[0].upper()
45 self.user = user_parts[1]
46 self.pw = pw
47
48 def _new_conn(self):
49 # Performs the NTLM handshake that secures the connection. The socket
50 # must be kept open while requests are performed.
51 self.num_connections += 1
52 log.debug(
53 "Starting NTLM HTTPS connection no. %d: https://%s%s",
54 self.num_connections,
55 self.host,
56 self.authurl,
57 )
58
59 headers = {"Connection": "Keep-Alive"}
60 req_header = "Authorization"
61 resp_header = "www-authenticate"
62
63 conn = HTTPSConnection(host=self.host, port=self.port)
64
65 # Send negotiation message
66 headers[req_header] = "NTLM %s" % ntlm.create_NTLM_NEGOTIATE_MESSAGE(
67 self.rawuser
68 )
69 log.debug("Request headers: %s", headers)
70 conn.request("GET", self.authurl, None, headers)
71 res = conn.getresponse()
72 reshdr = dict(res.headers)
73 log.debug("Response status: %s %s", res.status, res.reason)
74 log.debug("Response headers: %s", reshdr)
75 log.debug("Response data: %s [...]", res.read(100))
76
77 # Remove the reference to the socket, so that it can not be closed by
78 # the response object (we want to keep the socket open)
79 res.fp = None
80
81 # Server should respond with a challenge message
82 auth_header_values = reshdr[resp_header].split(", ")
83 auth_header_value = None
84 for s in auth_header_values:
85 if s[:5] == "NTLM ":
86 auth_header_value = s[5:]
87 if auth_header_value is None:
88 raise Exception(
89 "Unexpected %s response header: %s" % (resp_header, reshdr[resp_header])
90 )
91
92 # Send authentication message
93 ServerChallenge, NegotiateFlags = ntlm.parse_NTLM_CHALLENGE_MESSAGE(
94 auth_header_value
95 )
96 auth_msg = ntlm.create_NTLM_AUTHENTICATE_MESSAGE(
97 ServerChallenge, self.user, self.domain, self.pw, NegotiateFlags
98 )
99 headers[req_header] = "NTLM %s" % auth_msg
100 log.debug("Request headers: %s", headers)
101 conn.request("GET", self.authurl, None, headers)
102 res = conn.getresponse()
103 log.debug("Response status: %s %s", res.status, res.reason)
104 log.debug("Response headers: %s", dict(res.headers))
105 log.debug("Response data: %s [...]", res.read()[:100])
106 if res.status != 200:
107 if res.status == 401:
108 raise Exception("Server rejected request: wrong username or password")
109 raise Exception("Wrong server response: %s %s" % (res.status, res.reason))
110
111 res.fp = None
112 log.debug("Connection established")
113 return conn
114
115 def urlopen(
116 self,
117 method,
118 url,
119 body=None,
120 headers=None,
121 retries=3,
122 redirect=True,
123 assert_same_host=True,
124 ):
125 if headers is None:
126 headers = {}
127 headers["Connection"] = "Keep-Alive"
128 return super(NTLMConnectionPool, self).urlopen(
129 method, url, body, headers, retries, redirect, assert_same_host
130 )