%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY
Server IP : 188.40.95.74 / Your IP : 216.73.216.205 Web Server : Apache System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64 User : vlasotin ( 1054) PHP Version : 5.6.40 Disable Function : NONE MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /lib/python2.7/site-packages/leapp/messaging/ |
Upload File : |
import multiprocessing import six from six.moves import configparser try: from six.moves.configparser import SafeConfigParser as ConfigParser except ImportError: from six.moves.configparser import ConfigParser from leapp.exceptions import CommandError from leapp.utils.audit import create_audit_entry def _comment_out(key, text): """Returns a commented-out string. If any newlines are present it properly deals with them""" text = str(text) split_by_newline = [s for s in text.split('\n') if s.strip()] res = '# {key:<20}{line1}\n{linesN}'.format(key='{}:'.format(key) if key else '', line1=split_by_newline[0], linesN=''.join([_comment_out('', line) for line in split_by_newline[1:]])) return res class AnswerStore(object): """ AnswerStore handles storing and loading answer files for user questions. """ def __init__(self, manager=None): """ Initialize the answer store. :param manager: Passes through a given instance of multiprocessing.Manager() to use, by default it creates an own instance. """ self._manager = manager or multiprocessing.Manager() self._storage = self._manager.dict() def answer(self, scope, key, value): dialog_scope = self._storage.get(scope, {}) dialog_scope[key] = value # As _storage is a dict proxy, we have to explicitly update the object via __setitem__ # Otherwise we won't get the update propagated to the parent process # So don't even bother updating this to some more 'pythonic' coding style self._storage[scope] = dialog_scope @classmethod def _load_ini(cls, inifile): """ Loads an ini config file from the given location. :param inifile: Path to the answer file to load. :return: configparser.ConfigParser object :raises CommandError if any of the values are not in key=value format """ conf = ConfigParser(allow_no_value=False) try: conf.read(inifile) return conf except configparser.ParsingError as exc: # Some of the sections were not in key = value format raise CommandError('Failed to load answer file {inifile} with the following errors: {errors}'.format( inifile=inifile, errors=exc.message)) def update(self, answer_file, allow_missing=False): """ Update answerfile with all answers from answerstore that have correspondent sections in the file. Returns a list of sections that were not found in original answerfile and thus were not updated. """ # NOTE(ivasilev): py2 configparser doesn't have any means to save comments in ini file. Switch to cfgparse? conf = AnswerStore._load_ini(answer_file) not_updated = [] for section, answerdict in self._storage.items(): for opt, val in answerdict.items(): if section not in conf.sections() and allow_missing: conf.add_section(section) try: conf.set(section, opt, str(val)) except configparser.NoSectionError: not_updated.append("{sec}.{opt}={val}".format(sec=section, opt=opt, val=val)) with open(answer_file, 'w') as afile: conf.write(afile) return not_updated def load(self, answer_file): """ Loads an answer file from the given location and updates the loaded data with it. :param answer_file: Path to the answer file to load. :return: None :raises CommandError if any of the values are not in key=value format """ conf = AnswerStore._load_ini(answer_file) for section in conf.sections(): self._storage[section] = self._manager.dict(conf.items(section=section, raw=True)) def load_and_translate_for_workflow(self, answer_file, workflow): """ Loads an answer file from the given location and updates the loaded data with it and translates the data to the correct value types based on the dialogs in the given workflow. :param answer_file: Path to the answer file to load. :param workflow: :return: None :raises CommandError if any of the values are not in key=value format """ conf = AnswerStore._load_ini(answer_file) for section in conf.sections(): self._storage[section] = dict(conf.items(section=section, raw=True)) self.translate_for_workflow(workflow) def get(self, scope, fallback=None): """ Dict compatible interface to get a sub dictionary by dialog scope. :param scope: Scope of the data to retrieve. :param fallback: Fallback value to return if not found. :return: A shallow copy of data stored in _storage by scope key """ # NOTE(ivasilev) self.storage.get() will return a DictProxy. To avoid TypeError during later # JSON serialization a copy() should be invoked to get a shallow copy of data answer = self._storage.get(scope, fallback).copy() # NOTE(dkubek): It is possible that we do not need to save the 'answer' # here as it is being stored with dialog question right after query create_audit_entry('dialog-answer', {'scope': scope, 'fallback': fallback, 'answer': answer}) return answer def translate_for_workflow(self, workflow): """ Translates the data for all dialogs in the current workflow. :param workflow: Instance of a workflow to translate all dialogs from. :type workflow: :py:class:`leapp.workflows.Workflow` :return: None """ for dialog in workflow.dialogs: self.translate(dialog) def translate(self, dialog): """ Translates configuration values from their string form to the corresponding value format based on the given dialog. :param dialog: A dialog instance to translate the data for. :type dialog: :py:class:`leapp.dialogs.dialog.Dialog` :return: None """ entry = self._storage.get(dialog.scope) if entry: for component in dialog.components: if component.key in entry and isinstance(entry[component.key], six.string_types): if component.value_type is bool: entry[component.key] = entry[component.key].lower() == 'true' elif component.value_type is int: entry[component.key] = int(entry[component.key]) elif component.value_type is tuple: elements = entry[component.key].split(';') entry[component.key] = tuple(set(elements).intersection(component.choices)) elif hasattr(component, 'choices'): value = entry[component.key] entry[component.key] = value if value in component.choices else None component.value = entry.get(component.key) self._storage.update({dialog.scope: entry}) def generate(self, dialogs, answer_file_path): """ Generates an answer file for the given dialogs and stores it to `answer_file_path`. :param dialogs: :param answer_file_path: :return: """ with open(answer_file_path, 'w') as f: for dialog in dialogs: if not dialog.components: # Skip dialogs without questions continue f.writelines([ '[{}]\n'.format(dialog.scope), _comment_out('Title', dialog.title), _comment_out('Reason', dialog.reason) ]) for component in dialog.components: answer = self._storage.get(dialog.scope, {}).get(component.key) default = component.default choices = '' answer_entry = '' if hasattr(component, 'choices'): choices += '# Available choices: {}\n'.format('/'.join(component.choices)) if component.value_type is tuple: default = ';'.join(component.default) answer = ';'.join(answer) if answer else answer answer_entry = '#\n# Values are separated by semi-colon ";"\n' if answer is not None: answer_entry += '{key} = {value}\n'.format(key=component.key, value=answer) else: answer_entry += '# Unanswered question. Uncomment the following line with your answer\n' answer_entry += '# {key} = {value}\n'.format(key=component.key, value=default or '') f.writelines([ '# {}\n'.format(' {}.{} '.format(dialog.scope, component.key).center(77, '=')), _comment_out('Label', component.label), _comment_out('Description', component.description), _comment_out('Reason', component.reason), _comment_out('Type', component.value_type.__name__), _comment_out('Default', default), choices, answer_entry, '\n' ])