import apt import asyncio import subprocess from typing import List, Optional from pydantic import BaseModel from enum import Enum class PackageStatus(str, Enum): INSTALLED = "installed" AVAILABLE = "available" UPGRADABLE = "upgradable" class PackageInfo(BaseModel): name: str version: str installed_version: Optional[str] = None status: PackageStatus description: str size: int # en bytes maintainer: Optional[str] = None class PackageListResponse(BaseModel): total: int installed: int upgradable: int packages: List[PackageInfo] class PackageOperationResult(BaseModel): success: bool message: str package: str class PackageService: """Service pour gérer les paquets système avec apt""" def __init__(self): self.cache = None @staticmethod def _get_cache(): """Obtenir le cache apt""" try: return apt.Cache() except Exception as e: raise Exception(f"Erreur lors de l'accès au cache apt: {str(e)}") @staticmethod def list_installed_packages(search: Optional[str] = None, limit: int = 50, offset: int = 0) -> PackageListResponse: """Liste les paquets installés""" try: cache = PackageService._get_cache() packages = [] installed_count = 0 upgradable_count = 0 for pkg in cache: # Filtrer si recherche if search and search.lower() not in pkg.name.lower() and search.lower() not in (pkg.candidate.summary if pkg.candidate else ""): continue if pkg.is_installed: installed_count += 1 # Vérifier si upgradable is_upgradable = pkg.is_upgradable if is_upgradable: upgradable_count += 1 packages.append(PackageInfo( name=pkg.name, version=pkg.installed.version if pkg.installed else "unknown", installed_version=pkg.installed.version if pkg.installed else None, status=PackageStatus.UPGRADABLE if is_upgradable else PackageStatus.INSTALLED, description=pkg.candidate.summary if pkg.candidate else "No description", size=pkg.installed.size if pkg.installed else 0, maintainer=pkg.candidate.record.get("Maintainer") if pkg.candidate and pkg.candidate.record else None )) # Paginer paginated = packages[offset:offset + limit] return PackageListResponse( total=len(packages), installed=installed_count, upgradable=upgradable_count, packages=paginated ) except Exception as e: raise Exception(f"Erreur lors de la récupération des paquets: {str(e)}") @staticmethod def search_packages(query: str, limit: int = 20) -> List[PackageInfo]: """Recherche des paquets disponibles""" try: cache = PackageService._get_cache() packages = [] for pkg in cache: if query.lower() in pkg.name.lower() or (pkg.candidate and query.lower() in pkg.candidate.summary.lower()): installed_version = None if pkg.is_installed: installed_version = pkg.installed.version status = PackageStatus.INSTALLED if pkg.is_installed else PackageStatus.AVAILABLE packages.append(PackageInfo( name=pkg.name, version=pkg.candidate.version if pkg.candidate else "unknown", installed_version=installed_version, status=status, description=pkg.candidate.summary if pkg.candidate else "No description", size=pkg.candidate.size if pkg.candidate else 0, maintainer=pkg.candidate.record.get("Maintainer") if pkg.candidate and pkg.candidate.record else None )) if len(packages) >= limit: break return packages except Exception as e: raise Exception(f"Erreur lors de la recherche: {str(e)}") @staticmethod async def install_package(package_name: str) -> PackageOperationResult: """Installer un paquet de manière asynchrone""" try: # Exécuter apt install en arrière-plan process = await asyncio.create_subprocess_exec( 'sudo', 'apt-get', 'install', '-y', package_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) if process.returncode == 0: return PackageOperationResult( success=True, message=f"Paquet '{package_name}' installé avec succès", package=package_name ) else: error_msg = stderr.decode() if stderr else "Erreur inconnue" return PackageOperationResult( success=False, message=f"Erreur lors de l'installation: {error_msg}", package=package_name ) except asyncio.TimeoutError: return PackageOperationResult( success=False, message="L'installation a dépassé le délai d'attente", package=package_name ) except Exception as e: return PackageOperationResult( success=False, message=f"Erreur: {str(e)}", package=package_name ) @staticmethod async def remove_package(package_name: str) -> PackageOperationResult: """Désinstaller un paquet de manière asynchrone""" try: # Exécuter apt remove en arrière-plan process = await asyncio.create_subprocess_exec( 'sudo', 'apt-get', 'remove', '-y', package_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) if process.returncode == 0: return PackageOperationResult( success=True, message=f"Paquet '{package_name}' supprimé avec succès", package=package_name ) else: error_msg = stderr.decode() if stderr else "Erreur inconnue" return PackageOperationResult( success=False, message=f"Erreur lors de la suppression: {error_msg}", package=package_name ) except asyncio.TimeoutError: return PackageOperationResult( success=False, message="La suppression a dépassé le délai d'attente", package=package_name ) except Exception as e: return PackageOperationResult( success=False, message=f"Erreur: {str(e)}", package=package_name ) @staticmethod async def upgrade_package(package_name: str) -> PackageOperationResult: """Mettre à jour un paquet de manière asynchrone""" try: # Exécuter apt install --only-upgrade process = await asyncio.create_subprocess_exec( 'sudo', 'apt-get', 'install', '--only-upgrade', '-y', package_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) if process.returncode == 0: return PackageOperationResult( success=True, message=f"Paquet '{package_name}' mis à jour avec succès", package=package_name ) else: error_msg = stderr.decode() if stderr else "Erreur inconnue" return PackageOperationResult( success=False, message=f"Erreur lors de la mise à jour: {error_msg}", package=package_name ) except asyncio.TimeoutError: return PackageOperationResult( success=False, message="La mise à jour a dépassé le délai d'attente", package=package_name ) except Exception as e: return PackageOperationResult( success=False, message=f"Erreur: {str(e)}", package=package_name ) @staticmethod def get_system_info() -> dict: """Obtenir les informations système sur les paquets""" try: cache = PackageService._get_cache() installed = sum(1 for pkg in cache if pkg.is_installed) upgradable = sum(1 for pkg in cache if pkg.is_upgradable) total = len(cache) return { "total_packages": total, "installed": installed, "upgradable": upgradable, "available": total - installed } except Exception as e: raise Exception(f"Erreur: {str(e)}")