%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY
Server IP : 188.40.95.74 / Your IP : 216.73.216.142 Web Server : Apache System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64 User : vlasotin ( 1054) PHP Version : 5.6.40 Disable Function : NONE MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/ |
Upload File : |
import asyncio import logging import os import sys from pathlib import Path import defence360agent.internals.logger from defence360agent.contracts.config import Core as Config from defence360agent.rpc_tools.exceptions import ResponseError from defence360agent.simple_rpc import SUCCESS, SocketError from defence360agent.utils import is_root_user from defence360agent.utils.cli import ( EXIT_CODES, EXITCODE_GENERAL_ERROR, print_error, print_response, print_warnings, ) from defence360agent.utils.parsers import EnvParser, create_cli_parser from defence360agent.sentry import flush_sentry logger = logging.getLogger(__name__) RPM_TRANSACTION_LOCK = Path( "/var/lib/rpm-state/imunify360-transaction-in-progress" ) def main(rpc_handlers_init, cli_args): # get ready to start: set conservative umask os.umask(Config.FILE_UMASK) defence360agent.internals.logger.reconfigure() rpc_handlers_init() parser = create_cli_parser() args = parser.parse_args(args=cli_args) if args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE"): defence360agent.internals.logger.update_logging_config_from_file( args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE") ) if args.console_log_level: defence360agent.internals.logger.setConsoleLogLevel( args.console_log_level ) if hasattr(args, "endpoint") and hasattr(args, "generate_endpoint_params"): try: cli_kwargs = args.generate_endpoint_params(args) envvar_kwargs = EnvParser.parse( os.environ, args.command, args.envvar_parameter_options, exclude=cli_kwargs, ) result, data = args.endpoint(**envvar_kwargs, **cli_kwargs) print_warnings(data) flush_sentry() if result == SUCCESS: print_response(args.command, data, args.json, args.verbose) else: print_error(result, data, args.json, args.verbose) sys.exit(EXIT_CODES[result]) except SocketError as e: print_response( None, {"items": "ERROR: {}".format(e)}, args.json, args.verbose ) sys.exit(EXITCODE_GENERAL_ERROR) else: print(parser.format_help()) def entrypoint(rpc_handlers_init): if not is_root_user(): logger.info("%s could be used by the root user only!", Config.NAME) print( "Imunify360 CLI is unavailable for non-root user", file=sys.stderr ) sys.exit(EXITCODE_GENERAL_ERROR) try: main(rpc_handlers_init, sys.argv[1:]) except KeyboardInterrupt: logger.warning("User pressed Ctrl+C, exiting...") sys.exit(EXITCODE_GENERAL_ERROR) except ResponseError as e: logger.error("Response error: %s", e) sys.exit(EXITCODE_GENERAL_ERROR) except ImportError as e: if RPM_TRANSACTION_LOCK.exists(): logger.error("RPM transaction is in progress. %s", e) print( "RPM transaction is in progress. Please, wait until it is " "finished and try again.", file=sys.stderr, ) sys.exit(EXITCODE_GENERAL_ERROR) else: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR) except Exception: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR) finally: # ensure loop is closed to prevent asyncio warning # (https://bugs.python.org/issue23548) asyncio.get_event_loop().close()