%PDF-1.5 %���� ºaâÚÎΞ-ÌE1ÍØÄ÷{òò2ÿ ÛÖ^ÔÀá TÎ{¦?§®¥kuµùÕ5sLOšuY
Server IP : 188.40.95.74 / Your IP : 216.73.216.142 Web Server : Apache System : Linux cp01.striminghost.net 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64 User : vlasotin ( 1054) PHP Version : 5.6.40 Disable Function : NONE MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/api/ |
Upload File : |
import random import os import string from datetime import datetime, timedelta from pathlib import Path from defence360agent.subsys.panels.base import InvalidTokenException from defence360agent.contracts.config import UIRole, UserType from defence360agent.utils import atomic_rewrite UIRoleToUserType = { UIRole.ADMIN: UserType.ROOT, UIRole.CLIENT: UserType.NON_ROOT, } class JWTIssuer: JWT_SECRET_FILE = Path("/var/imunify360/.api-secret.key") JWT_SECRET_FILE_PREV = Path("/var/imunify360/.api-secret-prev.key") JWT_TOKEN_EXPIRATION_TTL_HOURS = os.getenv( "I360_JWT_TOKEN_EXPIRATION_TTL_HOURS", 6 ) JWT_SECRET_EXPIRATION_TTL_HOURS = os.getenv( "I360_JWT_SECRET_EXPIRATION_TTL_HOURS", 24 ) TOKEN_EXPIRATION_TTL = timedelta(hours=int(JWT_TOKEN_EXPIRATION_TTL_HOURS)) SECRET_EXPIRATION_TTL = timedelta( hours=int(JWT_SECRET_EXPIRATION_TTL_HOURS) ) @classmethod def is_secret_expired(cls): try: stat = os.stat(cls.JWT_SECRET_FILE) except FileNotFoundError: st_mtime = 0.0 else: st_mtime = stat.st_mtime return ( datetime.now().timestamp() - st_mtime > cls.SECRET_EXPIRATION_TTL.seconds ) @classmethod def _get_secret(cls) -> str: if cls.is_secret_expired(): new_secret = "".join( random.choice(string.ascii_uppercase + string.digits) for _ in range(64) ) if not cls.JWT_SECRET_FILE.exists(): cls.JWT_SECRET_FILE.touch() atomic_rewrite( str(cls.JWT_SECRET_FILE), new_secret, backup=str(cls.JWT_SECRET_FILE_PREV), uid=-1, permissions=0o600, ) return new_secret else: return cls.JWT_SECRET_FILE.read_text() @classmethod def get_token(cls, user_name: str, user_type: UIRole) -> str: """ Generates a token with several encoded fields: user name, user type, expiration timestamp """ import jwt return jwt.encode( { "user_type": user_type, "username": user_name, "exp": (datetime.now() + cls.TOKEN_EXPIRATION_TTL).timestamp(), }, cls._get_secret(), ) @classmethod def _parse_token(cls, token: str, secret: str) -> dict | None: import jwt # if handle these exceptions at global level, # jwt shoud be imported there, # increasing memory consumation try: return jwt.decode(token, secret, algorithms=["HS256"]) except jwt.PyJWTError: pass @classmethod def parse_token(cls, token: str): for secret_pth in [cls.JWT_SECRET_FILE, cls.JWT_SECRET_FILE_PREV]: if not secret_pth.exists(): continue decoded = cls._parse_token(token, secret_pth.read_text()) if decoded: return { "user_name": decoded["username"], "user_type": UIRoleToUserType[decoded["user_type"]], } else: raise InvalidTokenException("INVALID_TOKEN")