]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/network/xmlrpc.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / network / xmlrpc.py
1 """xmlrpclib.Transport implementation
2 """
3
4 import logging
5 import urllib.parse
6 import xmlrpc.client
7 from typing import TYPE_CHECKING, Tuple
8
9 from pip._internal.exceptions import NetworkConnectionError
10 from pip._internal.network.session import PipSession
11 from pip._internal.network.utils import raise_for_status
12
13 if TYPE_CHECKING:
14 from xmlrpc.client import _HostType, _Marshallable
15
16 logger = logging.getLogger(__name__)
17
18
19 class PipXmlrpcTransport(xmlrpc.client.Transport):
20 """Provide a `xmlrpclib.Transport` implementation via a `PipSession`
21 object.
22 """
23
24 def __init__(
25 self, index_url: str, session: PipSession, use_datetime: bool = False
26 ) -> None:
27 super().__init__(use_datetime)
28 index_parts = urllib.parse.urlparse(index_url)
29 self._scheme = index_parts.scheme
30 self._session = session
31
32 def request(
33 self,
34 host: "_HostType",
35 handler: str,
36 request_body: bytes,
37 verbose: bool = False,
38 ) -> Tuple["_Marshallable", ...]:
39 assert isinstance(host, str)
40 parts = (self._scheme, host, handler, None, None, None)
41 url = urllib.parse.urlunparse(parts)
42 try:
43 headers = {"Content-Type": "text/xml"}
44 response = self._session.post(
45 url,
46 data=request_body,
47 headers=headers,
48 stream=True,
49 )
50 raise_for_status(response)
51 self.verbose = verbose
52 return self.parse_response(response.raw)
53 except NetworkConnectionError as exc:
54 assert exc.response
55 logger.critical(
56 "HTTP error %s while getting %s",
57 exc.response.status_code,
58 url,
59 )
60 raise