OwlCyberSecurity - MANAGER
Edit File: differ.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import logging from functools import cached_property, partial from io import BytesIO from pathlib import Path from peewee import DoesNotExist from defence360agent.utils import safe_fileops from imav.malwarelib.config import MalwareHitStatus, MalwareScanResourceType from imav.malwarelib.cleanup.storage import CleanupStorage from imav.malwarelib.model import MalwareHit from imav.utils import get_files_diff logger = logging.getLogger(__name__) class DiffError(Exception): pass class MalwareHitDiff: """ Used to compare infected and cleaned versions of a malicious file. """ def __init__(self, id: int, user: str = None): self._id = id self._user = user @cached_property def hit(self): try: return MalwareHit.get( MalwareHit.id == self._id, MalwareHit.resource_type == MalwareScanResourceType.FILE.value, MalwareHit.malicious == True, # noqa: E712 *([MalwareHit.user == self._user] * bool(self._user)), ) except DoesNotExist: raise DiffError( f"No malware file hit found (id={self._id}," f" user={self._user})." ) async def unified_diff(self): diff = b"" if self.hit.status in MalwareHitStatus.CLEANED: backup_file_path = CleanupStorage.get_hit_store_path(self.hit) origin_file_path = Path(self.hit.orig_file) if self._user: open_origin_file_safely = partial( safe_fileops.safe_open_file, origin_file_path, "rb", self._user, respect_homedir=False, ) else: # read as root open_origin_file_safely = partial(open, origin_file_path, "rb") if not backup_file_path.exists(): # no backup, shouldn't happen raise FileNotFoundError( f"Backup file not found for hit(id={self.hit.id})." ) if not origin_file_path.exists(): # original file deleted # compare backup with empty file open_origin_file_safely = partial(BytesIO, b"") elif origin_file_path.stat().st_ctime > self.hit.cleaned_at: raise DiffError( "The file was modified after cleaning, diff is not valid." ) with open_origin_file_safely() as origin_file: diff = await self._get_diff(backup_file_path, origin_file) else: # infected logger.warning("Malicious file was not cleaned. There is no diff.") return diff async def _get_diff(self, backup_file, cleaned_file): # don't block the whole loop while reading files loop = asyncio.get_event_loop() return await loop.run_in_executor( None, get_files_diff, backup_file, cleaned_file )