%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY
Server IP : 188.40.95.74 / Your IP : 216.73.216.205 Web Server : Apache System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64 User : vlasotin ( 1054) PHP Version : 5.6.40 Disable Function : NONE MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python2.7/site-packages/leapp/utils/ |
Upload File : |
import os import socket import requests import requests.adapters import requests.exceptions try: import requests.packages.urllib3 as urllib3 # noqa # pylint: disable=consider-using-from-import except ImportError: import urllib3 RequestException = requests.exceptions.RequestException """ Exceptions that are raised through the actor API, which is retrieved from :py:func:`get_actor_api` """ _session = None class _LeappAPIConnection(urllib3.connection.HTTPConnection): def __init__(self, timeout=60): self.sock = None super(_LeappAPIConnection, self).__init__('localhost', timeout=timeout) self.timeout = timeout def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) socket_path = os.environ.get('LEAPP_ACTOR_API', '/tmp/actors.sock') sock.connect(socket_path) self.sock = sock class _LeappAPIConnectionPool(urllib3.connectionpool.HTTPConnectionPool): def __init__(self, timeout=60): super(_LeappAPIConnectionPool, self).__init__('localhost') self.timeout = timeout def _new_conn(self): return _LeappAPIConnection(self.timeout) class _LeappAPIAdapter(requests.adapters.HTTPAdapter): def __init__(self, timeout=60): super(_LeappAPIAdapter, self).__init__() self.timeout = timeout def get_connection(self, url, proxies=None): return _LeappAPIConnectionPool(timeout=self.timeout) def request_url(self, request, proxies): return request.path_url def get_actor_api(): """ :return: An instance of the Leapp actor API session that is using :py:class:`requests.Session` over a UNIX domain socket """ global _session if _session: return _session _session = requests.session() _session.adapters.clear() _session.mount('leapp://', _LeappAPIAdapter()) return _session