OwlCyberSecurity - MANAGER
Edit File: plugin.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import json import logging import os import pwd import shutil import time from collections import defaultdict from pathlib import Path import sentry_sdk from peewee import SqliteDatabase from defence360agent.api import inactivity from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from defence360agent.model import instance from defence360agent.utils import atomic_rewrite, check_run from imav.model.wordpress import WordpressSite, WPSite from imav.wordpress import cli, PLUGIN_SLUG, telemetry from imav.wordpress.utils import ( build_command_for_user, calculate_next_scan_timestamp, clear_get_cagefs_enabled_users_cache, get_last_scan, get_malware_history, ) logger = logging.getLogger(__name__) COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) def get_data_dir(site: WPSite): return Path(site.docroot) / "wp-content" / "imunify-security" async def _get_scan_data_for_user(sink, username: str, uid: int): # Get the last scan data last_scan = await get_last_scan(sink, username) # Extract the last scan date last_scan_time = last_scan.get("scan_date", None) next_scan_time = None if MalwareScanSchedule.INTERVAL != Interval.NONE: next_scan_time = calculate_next_scan_timestamp() # Get all WordPress sites for the user (the main site is always last) all_users_sites = get_sites_for_user(uid) # Get the malware history for the user malware_history = get_malware_history(username) # Split malware history by site. This part relies on the main site being the last one in the list. # Without this all malware could be attributed to the main site. malware_by_site = defaultdict(list) for item in malware_history: if item["resource_type"] == "file": for site_path in all_users_sites: if item["file"].startswith(site_path): malware_by_site[site_path].append(item) break return last_scan_time, next_scan_time, malware_by_site async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore): async with semaphore: try: await coro except Exception as e: logger.error(f"Telemetry task failed: {e}") async def process_telemetry_tasks(coroutines: list, concurrency=10): semaphore = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(_send_telemetry_task(coro, semaphore)) for coro in coroutines ] try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Some telemetry tasks failed: {e}") async def install_for_users(users: set[str], sink): """Install the imunify-security plugin for all sites where it is not installed.""" logger.info("Installing imunify-security wp plugin") # Keep track of the installed sites installed = set() telemetry_coros = [] with inactivity.track.task("wp-plugin-installation"): try: clear_get_cagefs_enabled_users_cache() with instance.db.atomic(): to_install = _get_sites_without_plugin() - set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select() ) if not to_install: return # Group sites by user id sites_by_user = defaultdict(list) for site in to_install: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: sentry_sdk.capture_message( "Skipping installation of WordPress plugin on %s" " site(s) because they belong to user %s and it is" " not possible to retrieve username for this user." " Reason: %s", len(sites), uid, error, ) continue if username not in users: # Skip the user if it's not in the list of users to install the plugin for continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid) for site in sites: try: # Check if site is correctly installed and accessible using WP CLI is_wordpress_installed = ( await cli.is_wordpress_installed(site) ) if not is_wordpress_installed: sentry_sdk.capture_message( "WordPress site is not accessible using WP" " CLI. site=%s", site, ) continue # Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get( site.docroot, [] ), } # Create the scan data file await update_scan_data_file(site, json_data) # Install the plugin await cli.plugin_install(site) installed.add(site) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="installed_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to install plugin to site=%s error=%s", site, error, ) logger.info( "Installed imunify-security wp plugin on %d sites", len(installed), ) except asyncio.CancelledError: logger.info( "Installation imunify-security wp plugin was cancelled. Plugin" " was installed for %d sites", len(installed), ) except Exception as error: logger.error( "Error occurred during plugin installation. error=%s", error ) raise finally: WordpressSite.insert_many( [ { "domain": site.domain, "docroot": site.docroot, "uid": site.uid, "manually_deleted_at": None, } for site in installed ] ).execute() # Send telemetry await process_telemetry_tasks(telemetry_coros) async def delete_plugin_files(site: WPSite): data_dir = get_data_dir(site) if data_dir.exists(): await asyncio.to_thread(shutil.rmtree, data_dir) async def remove_all_installed(sink): """Remove the imunify-security plugin from all sites where it is installed.""" logger.info("Deleting imunify-security wp plugin") telemetry_coros = [] affected = 0 with inactivity.track.task("wp-plugin-removal"): try: to_remove = WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) ) for site in to_remove: try: # Uninstall the plugin from WordPress site. await cli.plugin_deactivate(site) # Delete the data files from the site. await delete_plugin_files(site) # Delete the site from database. affected += ( WordpressSite.delete() .where(WordpressSite.docroot == site.docroot) .execute() ) # Send telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="uninstalled_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to remove plugin from %s %s", site, error ) except asyncio.CancelledError: logger.info( "Deleting imunify-security wp plugin was cancelled. Plugin was" " deleted from %d sites", len(to_remove), ) except Exception as error: logger.error("Error occurred during plugin deleting. %s", error) raise finally: logger.info( "Removed imunify-security wp plugin from %s sites", affected, ) if affected > 0: # send telemetry await process_telemetry_tasks(telemetry_coros) async def mark_site_as_manually_deleted(site, now): logger.info( "Mark site %s as manually deleted at %s (WP-Plugin removed)", site, now ) ( WordpressSite.update(manually_deleted_at=now) .where(WordpressSite.docroot == site.docroot) .execute() ) async def tidy_up_manually_deleted(sink): telemetry_coros = [] try: to_mark_as_manually_removed = _get_sites_without_plugin() & set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null() ) ) if to_mark_as_manually_removed: now = time.time() for site in to_mark_as_manually_removed: await mark_site_as_manually_deleted(site, now) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="removed_by_user", site=site, ) ) except Exception as error: logger.error("Error occurred during site tidy up. %s", error) finally: if telemetry_coros: await process_telemetry_tasks(telemetry_coros) async def update_data_on_sites(sink, sites: list[WPSite]): if not sites: return # Group sites by user id sites_by_user = defaultdict(list) for site in sites: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: logger.error( "Failed to get username for uid=%d. error=%s", uid, error, ) continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid) for site in sites: try: # Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get(site.docroot, []), } # Update the scan data file await update_scan_data_file(site, json_data) except Exception as error: logger.error( "Failed to update scan data on site=%s error=%s", site, error, ) async def update_scan_data_file(site: WPSite, json_data: dict): # Get the gid for the given user user_info = pwd.getpwuid(site.uid) gid = user_info.pw_gid # Create data directory data_dir = get_data_dir(site) if os.path.islink(data_dir): # If the data directory is a symlink, interrupt the process. raise Exception( "Data directory %s is a symlink, skipping.", str(data_dir) ) if not data_dir.exists(): command = build_command_for_user( user_info.pw_name, [ "mkdir", "-p", str(data_dir), ], ) await check_run(command) if not data_dir.exists(): # Directory creation failed. Interrupt the process. raise Exception( "Failed to create directory %s for user %s", str(data_dir), user_info.pw_name, ) scan_data_path = data_dir / "scan_data.php" # Format the PHP file content php_content = ( "<?php\n" "if ( ! defined( 'WPINC' ) ) {\n" "\texit;\n" "}\n" "return json_decode( '" + json.dumps(json_data).replace("'", "\\'") + "', true );" ) # Check if the file exists, create an empty file if it doesn't if not scan_data_path.exists(): scan_data_path.touch() # Write the formatted PHP file atomic_rewrite( scan_data_path, php_content, backup=False, uid=site.uid, gid=gid, permissions=0o400, ) def _get_sites_without_plugin() -> set[WPSite]: """ Get a set of wp sites where imunify-security plugin is not installed. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report GROUP BY uid ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND lr.domain IS NOT NULL AND lr.domain != '' AND NOT EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = wp.id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() } def get_sites_for_user(uid: int) -> list[str]: """ Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure that the main site is the last one in the list. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT MAX(id) as id FROM report WHERE uid = {uid} ) SELECT wp.real_path FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL GROUP BY wp.real_path ORDER BY length(wp.real_path) DESC """ ) return [row[0] for row in cursor.fetchall()] def get_sites_by_path(path: str) -> list[WPSite]: """ Get a set of wp sites by given path. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() # Append * to the path to get all sites that start with the given path. Only if the path doesn't already end with *. if not path.endswith("*"): path += "/*" cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report GROUP BY uid ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND lr.domain IS NOT NULL AND lr.domain != '' AND wp.real_path GLOB '{path}' """ ) return [ WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() ]