D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
restore_infected
/
Filename :
backup_backends_lib.py
back
Copy
import asyncio import functools import inspect import os import shutil import tarfile from abc import abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from typing import BinaryIO, Dict, Literal, Set, Tuple from . import helpers class BackendError(Exception): pass class BackendNotAuthorizedError(BackendError): """ Backup backend was't initialized properly and can't be used without auth """ pass class BackendNonApplicableError(BackendError): """Backup couldn't be used on the systems with current configuration""" pass class BackendClientRequiredError(BackendError): """Backup provider requires Client soft to be installed""" class NoSuchUserError(BackendError): """Backup file owner is not present on the system""" class UnsupportedBackupError(BackendError): """Backend can't recognize a backup file format""" class BaseResource: """ Base class for backup resource """ def __init__(self, path, resource): self.path = path self.resource = resource self.prefix = _split_path(resource) self.prefix_len = len(self.prefix) def __repr__(self): return '<{0} for {1}>'.format(self.__class__.__name__, repr(self.resource)) def is_related(self, path): # type: (str) -> bool return self.prefix == _split_path(path)[:self.prefix_len] class FileData: """ Class to manipulate of file from backup resource """ def __init__(self, resource, filename): # type: (BaseResource, str) -> None self.resource = resource self.size, self.mtime, self.uid, self.gid, self.mode = resource.info( filename ) self.filename = filename def __repr__(self): return ( "<{0}(resource={1}, filename={2}, size={3}, mtime={4}, " "uid={5}, gid={6}, mode={7}>".format( self.__class__.__name__, repr(self.resource), repr(self.filename), repr(self.size), repr(self.mtime), repr(self.uid), repr(self.gid), '0o%03o' % self.mode, ) ) class BackupBase: def __init__(self, path, created): # type: (str, helpers.DateTime) -> None self.path = path self.created = created def __lt__(self, other): return self.created < other.created def __repr__(self): return '<{0}({1})>'.format(self.__class__.__name__, str(self)) def __str__(self): return self.path def close(self): # type: () -> None for resource in self.resources: resource.close() @functools.lru_cache(maxsize=None) def file_data(self, path): # type: (str) -> FileData for resource in self.resources: if resource.is_related(path): return FileData(resource, path) raise FileNotFoundError(repr(path)) def restore(self, items: Set[FileData], destination='/') -> Dict[str, str]: if not os.path.isabs(destination): destination = os.path.abspath(destination) result = {} for item in items: target_name = os.path.join( destination, item.filename.lstrip(os.sep) ) target_dir = os.path.dirname(target_name) helpers.mkdir(target_dir) with item.resource.open(item.filename) as fileobj, open( target_name, 'wb' ) as target: for chunk in helpers.read(fileobj): target.write(chunk) atime = helpers.DateTime.now().timestamp() mtime = item.mtime.timestamp() os.utime(target_name, (atime, mtime)) os.chown(target_name, item.uid, item.gid) os.chmod(target_name, item.mode) result[target_name] = item.filename return result class FileResourceMixin: """ Mixin class for file-based resources (tar, tar.gz, gz) """ fileobj = None # type: BinaryIO def close(self): # type: () -> None if self.fileobj: self.fileobj.close() self.fileobj = None class TarResourceMixin(FileResourceMixin): """ Mixin class for tar resources (tar, tar.gz) """ def _prep(self, path): if not self.fileobj: self.fileobj = tarfile_open(self.path) # type: TarFile return self._normalize_path(path) @functools.lru_cache(maxsize=None) def info(self, path): # type: (str) -> Tuple[int, helpers.DateTime, int, int, int] tar_path = self._prep(path) try: tar_info = self.fileobj.getmember(tar_path) except KeyError: raise FileNotFoundError(repr(path)) return (tar_info.size, helpers.DateTime.fromtimestamp(tar_info.mtime), tar_info.uid, tar_info.gid, tar_info.mode) def open(self, path): # type: (str) -> BinaryIO tar_path = self._prep(path) try: return self.fileobj.extractfile(tar_path) except KeyError: raise FileNotFoundError(repr(path)) except tarfile.ReadError as e: if 'unexpected end of data' in e.args: raise EOFError(*e.args) class FtpBackupBase(BackupBase): """ Base class for backup on FTP server """ @property @classmethod @abstractmethod def FTP_DIR_NAME(cls): pass def __init__(self, ftp, path, created, tmp_dir=None): # type: (helpers.Ftp, str, helpers.DateTime, str) -> None super().__init__(path, created) self.ftp = ftp self.tmp_dir = tmp_dir or os.path.expanduser('~/') self.ftp_dir = os.path.join(self.tmp_dir, self.FTP_DIR_NAME) def __str__(self): return os.path.join(str(self.ftp), self.path.lstrip(os.path.sep)) def _retrieve(self): """ :raises helpers.IsNotDirError: :raises helpers.DirNotEmptyError: """ helpers.mkdir(self.ftp_dir) try: return self.ftp.retrieve(self.path, self.ftp_dir) except helpers.FtpError: helpers.warning("Error retrieving data from %s" % self.ftp) return None def close(self): super().close() with suppress(FileNotFoundError): shutil.rmtree(self.ftp_dir) def _split_path(path): # type: (str) -> Tuple[str, ...] path = path.rstrip(os.sep) path_list = path.split(os.sep) return tuple(path_list) def wraps(wrapped): def decorator(wrapper): wrapper = functools.update_wrapper(wrapper, wrapped) wrapper.__signature__ = getattr(wrapped, '__signature__', inspect.signature(wrapped)) return wrapper return decorator def extra(f): @wraps(f) def wrapper(*args): return f(*args) wrapper.extra = True return wrapper def asyncable(func): async def coroutine_function(*args, **kwargs): return await asyncio.get_event_loop().run_in_executor( asyncable.executor, functools.partial(func, *args, **kwargs), ) @wraps(func) def wrapper(*args, **kwargs): if kwargs.pop('async_', wrapper.async_): if not hasattr(asyncable, 'executor'): asyncable.executor = ThreadPoolExecutor(max_workers=2) coroutine = coroutine_function(*args, **kwargs) coroutine.__qualname__ = func.__qualname__ return coroutine return func(*args, **kwargs) wrapper.async_ = False return wrapper def _backend_checker_decorator(path, exc): def real_decorator(f): @wraps(f) def wrapper(*args, **kwargs): if not os.path.exists(wrapper.token): raise exc return f(*args, **kwargs) # this makes unit testing easier wrapper.token = path return wrapper return real_decorator def backend_auth_required(token_path, error_msg): return _backend_checker_decorator(token_path, BackendNotAuthorizedError(error_msg)) def backup_client_required(client_path, error_msg): return _backend_checker_decorator(client_path, BackendClientRequiredError(error_msg)) class TarFile(tarfile.TarFile): OPEN_METH = { **tarfile.TarFile.OPEN_METH, "zstd": "zstdopen", # zstd compressed tar } @classmethod def zstdopen( cls, name, mode: Literal["r", "w", "x"] = "r", fileobj=None, level_or_option=None, zstd_dict=None, **kwargs ): """Open zstd compressed tar archive name for reading or writing. Appending is not allowed. """ if mode not in ("r", "w", "x"): raise ValueError("mode must be 'r', 'w' or 'x'") try: from pyzstd import ZstdError, ZstdFile except ImportError: raise tarfile.CompressionError("pyzstd module is not available") fileobj = ZstdFile( fileobj or name, mode, level_or_option=level_or_option, zstd_dict=zstd_dict, ) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (ZstdError, OSError, EOFError): fileobj.close() if mode == "r": raise tarfile.ReadError("not a zstd file") raise except Exception: fileobj.close() raise t._extfileobj = False return t tarfile_open = TarFile.open