Initial commit

This commit is contained in:
innotex
2026-01-16 18:40:39 +01:00
commit 9ec63a8aa2
76 changed files with 13235 additions and 0 deletions

View File

View File

@@ -0,0 +1,153 @@
import docker
from docker.errors import DockerException
from typing import List, Optional
from pydantic import BaseModel
class ContainerPort(BaseModel):
private_port: int
public_port: Optional[int]
type: str
class ContainerInfo(BaseModel):
id: str
name: str
image: str
status: str
state: str
cpu_percent: float
memory_usage: str
created: str
ports: List[ContainerPort]
class DockerService:
"""Service pour gérer Docker"""
def __init__(self):
try:
self.client = docker.from_env()
except DockerException as e:
print(f"Erreur de connexion Docker: {e}")
self.client = None
def is_connected(self) -> bool:
"""Vérifie si Docker est accessible"""
try:
if self.client:
self.client.ping()
return True
except:
pass
return False
def get_containers(self, all: bool = True) -> List[ContainerInfo]:
"""Récupère la liste des conteneurs"""
if not self.is_connected():
return []
containers = []
try:
for container in self.client.containers.list(all=all):
stats = container.stats(stream=False)
# Calcul de l'utilisation CPU
cpu_percent = 0.0
try:
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats'].get('cpu_usage', {}).get('total_usage', 0)
system_cpu_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats'].get('system_cpu_usage', 0)
cpu_count = len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', []))
if system_cpu_delta > 0:
cpu_percent = (cpu_delta / system_cpu_delta) * cpu_count * 100.0
except:
cpu_percent = 0.0
# Mémoire utilisée
memory_usage = stats['memory_stats'].get('usage', 0) / (1024 * 1024) # MB
# Ports
ports = []
if container.ports:
for private_port, bindings in container.ports.items():
if bindings:
for binding in bindings:
try:
public_port = int(binding['HostPort'])
except:
public_port = None
ports.append(ContainerPort(
private_port=int(private_port.split('/')[0]),
public_port=public_port,
type=private_port.split('/')[1]
))
containers.append(ContainerInfo(
id=container.short_id,
name=container.name,
image=container.image.tags[0] if container.image.tags else container.image.id[:12],
status=container.status,
state=container.attrs['State']['Status'],
cpu_percent=round(cpu_percent, 2),
memory_usage=f"{memory_usage:.2f} MB",
created=container.attrs['Created'],
ports=ports
))
except Exception as e:
print(f"Erreur lors de la récupération des conteneurs: {e}")
return containers
def start_container(self, container_id: str) -> bool:
"""Démarre un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.start()
return True
except Exception as e:
print(f"Erreur au démarrage du conteneur: {e}")
return False
def stop_container(self, container_id: str) -> bool:
"""Arrête un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.stop(timeout=10)
return True
except Exception as e:
print(f"Erreur à l'arrêt du conteneur: {e}")
return False
def restart_container(self, container_id: str) -> bool:
"""Redémarre un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.restart(timeout=10)
return True
except Exception as e:
print(f"Erreur au redémarrage du conteneur: {e}")
return False
def remove_container(self, container_id: str, force: bool = False) -> bool:
"""Supprime un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.remove(force=force)
return True
except Exception as e:
print(f"Erreur à la suppression du conteneur: {e}")
return False

View File

@@ -0,0 +1,262 @@
import apt
import asyncio
import subprocess
from typing import List, Optional
from pydantic import BaseModel
from enum import Enum
class PackageStatus(str, Enum):
INSTALLED = "installed"
AVAILABLE = "available"
UPGRADABLE = "upgradable"
class PackageInfo(BaseModel):
name: str
version: str
installed_version: Optional[str] = None
status: PackageStatus
description: str
size: int # en bytes
maintainer: Optional[str] = None
class PackageListResponse(BaseModel):
total: int
installed: int
upgradable: int
packages: List[PackageInfo]
class PackageOperationResult(BaseModel):
success: bool
message: str
package: str
class PackageService:
"""Service pour gérer les paquets système avec apt"""
def __init__(self):
self.cache = None
@staticmethod
def _get_cache():
"""Obtenir le cache apt"""
try:
return apt.Cache()
except Exception as e:
raise Exception(f"Erreur lors de l'accès au cache apt: {str(e)}")
@staticmethod
def list_installed_packages(search: Optional[str] = None, limit: int = 50, offset: int = 0) -> PackageListResponse:
"""Liste les paquets installés"""
try:
cache = PackageService._get_cache()
packages = []
installed_count = 0
upgradable_count = 0
for pkg in cache:
# Filtrer si recherche
if search and search.lower() not in pkg.name.lower() and search.lower() not in (pkg.candidate.summary if pkg.candidate else ""):
continue
if pkg.is_installed:
installed_count += 1
# Vérifier si upgradable
is_upgradable = pkg.is_upgradable
if is_upgradable:
upgradable_count += 1
packages.append(PackageInfo(
name=pkg.name,
version=pkg.installed.version if pkg.installed else "unknown",
installed_version=pkg.installed.version if pkg.installed else None,
status=PackageStatus.UPGRADABLE if is_upgradable else PackageStatus.INSTALLED,
description=pkg.candidate.summary if pkg.candidate else "No description",
size=pkg.installed.size if pkg.installed else 0,
maintainer=pkg.candidate.record.get("Maintainer") if pkg.candidate and pkg.candidate.record else None
))
# Paginer
paginated = packages[offset:offset + limit]
return PackageListResponse(
total=len(packages),
installed=installed_count,
upgradable=upgradable_count,
packages=paginated
)
except Exception as e:
raise Exception(f"Erreur lors de la récupération des paquets: {str(e)}")
@staticmethod
def search_packages(query: str, limit: int = 20) -> List[PackageInfo]:
"""Recherche des paquets disponibles"""
try:
cache = PackageService._get_cache()
packages = []
for pkg in cache:
if query.lower() in pkg.name.lower() or (pkg.candidate and query.lower() in pkg.candidate.summary.lower()):
installed_version = None
if pkg.is_installed:
installed_version = pkg.installed.version
status = PackageStatus.INSTALLED if pkg.is_installed else PackageStatus.AVAILABLE
packages.append(PackageInfo(
name=pkg.name,
version=pkg.candidate.version if pkg.candidate else "unknown",
installed_version=installed_version,
status=status,
description=pkg.candidate.summary if pkg.candidate else "No description",
size=pkg.candidate.size if pkg.candidate else 0,
maintainer=pkg.candidate.record.get("Maintainer") if pkg.candidate and pkg.candidate.record else None
))
if len(packages) >= limit:
break
return packages
except Exception as e:
raise Exception(f"Erreur lors de la recherche: {str(e)}")
@staticmethod
async def install_package(package_name: str) -> PackageOperationResult:
"""Installer un paquet de manière asynchrone"""
try:
# Exécuter apt install en arrière-plan
process = await asyncio.create_subprocess_exec(
'sudo', 'apt-get', 'install', '-y', package_name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
if process.returncode == 0:
return PackageOperationResult(
success=True,
message=f"Paquet '{package_name}' installé avec succès",
package=package_name
)
else:
error_msg = stderr.decode() if stderr else "Erreur inconnue"
return PackageOperationResult(
success=False,
message=f"Erreur lors de l'installation: {error_msg}",
package=package_name
)
except asyncio.TimeoutError:
return PackageOperationResult(
success=False,
message="L'installation a dépassé le délai d'attente",
package=package_name
)
except Exception as e:
return PackageOperationResult(
success=False,
message=f"Erreur: {str(e)}",
package=package_name
)
@staticmethod
async def remove_package(package_name: str) -> PackageOperationResult:
"""Désinstaller un paquet de manière asynchrone"""
try:
# Exécuter apt remove en arrière-plan
process = await asyncio.create_subprocess_exec(
'sudo', 'apt-get', 'remove', '-y', package_name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
if process.returncode == 0:
return PackageOperationResult(
success=True,
message=f"Paquet '{package_name}' supprimé avec succès",
package=package_name
)
else:
error_msg = stderr.decode() if stderr else "Erreur inconnue"
return PackageOperationResult(
success=False,
message=f"Erreur lors de la suppression: {error_msg}",
package=package_name
)
except asyncio.TimeoutError:
return PackageOperationResult(
success=False,
message="La suppression a dépassé le délai d'attente",
package=package_name
)
except Exception as e:
return PackageOperationResult(
success=False,
message=f"Erreur: {str(e)}",
package=package_name
)
@staticmethod
async def upgrade_package(package_name: str) -> PackageOperationResult:
"""Mettre à jour un paquet de manière asynchrone"""
try:
# Exécuter apt install --only-upgrade
process = await asyncio.create_subprocess_exec(
'sudo', 'apt-get', 'install', '--only-upgrade', '-y', package_name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
if process.returncode == 0:
return PackageOperationResult(
success=True,
message=f"Paquet '{package_name}' mis à jour avec succès",
package=package_name
)
else:
error_msg = stderr.decode() if stderr else "Erreur inconnue"
return PackageOperationResult(
success=False,
message=f"Erreur lors de la mise à jour: {error_msg}",
package=package_name
)
except asyncio.TimeoutError:
return PackageOperationResult(
success=False,
message="La mise à jour a dépassé le délai d'attente",
package=package_name
)
except Exception as e:
return PackageOperationResult(
success=False,
message=f"Erreur: {str(e)}",
package=package_name
)
@staticmethod
def get_system_info() -> dict:
"""Obtenir les informations système sur les paquets"""
try:
cache = PackageService._get_cache()
installed = sum(1 for pkg in cache if pkg.is_installed)
upgradable = sum(1 for pkg in cache if pkg.is_upgradable)
total = len(cache)
return {
"total_packages": total,
"installed": installed,
"upgradable": upgradable,
"available": total - installed
}
except Exception as e:
raise Exception(f"Erreur: {str(e)}")

View File

@@ -0,0 +1,171 @@
import json
import os
from typing import List, Optional
from pydantic import BaseModel
from pathlib import Path
class ServiceShortcut(BaseModel):
id: str
name: str
url: str
icon: str # Base64 or emoji or URL
description: Optional[str] = None
category: str = "other"
color: str = "#3B82F6" # Couleur personnalisée
order: int = 0
class ShortcutsConfig(BaseModel):
version: str = "1.0"
shortcuts: List[ServiceShortcut] = []
class ShortcutsService:
"""Service pour gérer les raccourcis vers les services self-hosted"""
CONFIG_FILE = "/home/innotex/Documents/Projet/innotexboard/backend/config/shortcuts.json"
@staticmethod
def _ensure_config_dir():
"""S'assurer que le répertoire de config existe"""
config_dir = os.path.dirname(ShortcutsService.CONFIG_FILE)
Path(config_dir).mkdir(parents=True, exist_ok=True)
@staticmethod
def _load_config() -> ShortcutsConfig:
"""Charger la configuration des raccourcis"""
ShortcutsService._ensure_config_dir()
if not os.path.exists(ShortcutsService.CONFIG_FILE):
return ShortcutsConfig()
try:
with open(ShortcutsService.CONFIG_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return ShortcutsConfig(**data)
except Exception as e:
print(f"Erreur lors du chargement de la config: {e}")
return ShortcutsConfig()
@staticmethod
def _save_config(config: ShortcutsConfig):
"""Sauvegarder la configuration des raccourcis"""
ShortcutsService._ensure_config_dir()
try:
with open(ShortcutsService.CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config.model_dump(), f, indent=2, ensure_ascii=False)
except Exception as e:
raise Exception(f"Erreur lors de la sauvegarde: {str(e)}")
@staticmethod
def get_all_shortcuts() -> List[ServiceShortcut]:
"""Récupère tous les raccourcis"""
config = ShortcutsService._load_config()
# Trier par ordre
return sorted(config.shortcuts, key=lambda x: x.order)
@staticmethod
def get_shortcuts_by_category(category: str) -> List[ServiceShortcut]:
"""Récupère les raccourcis d'une catégorie"""
shortcuts = ShortcutsService.get_all_shortcuts()
return [s for s in shortcuts if s.category == category]
@staticmethod
def add_shortcut(shortcut: ServiceShortcut) -> ServiceShortcut:
"""Ajoute un nouveau raccourci"""
config = ShortcutsService._load_config()
# Générer un ID unique si nécessaire
if not shortcut.id:
shortcut.id = f"shortcut_{len(config.shortcuts) + 1}"
# S'assurer qu'il n'existe pas déjà
if any(s.id == shortcut.id for s in config.shortcuts):
raise Exception(f"Un raccourci avec l'ID '{shortcut.id}' existe déjà")
# Définir l'ordre
if shortcut.order == 0:
shortcut.order = len(config.shortcuts)
config.shortcuts.append(shortcut)
ShortcutsService._save_config(config)
return shortcut
@staticmethod
def update_shortcut(shortcut_id: str, shortcut: ServiceShortcut) -> ServiceShortcut:
"""Met à jour un raccourci existant"""
config = ShortcutsService._load_config()
for i, s in enumerate(config.shortcuts):
if s.id == shortcut_id:
shortcut.id = shortcut_id # Garder l'ID
config.shortcuts[i] = shortcut
ShortcutsService._save_config(config)
return shortcut
raise Exception(f"Raccourci avec l'ID '{shortcut_id}' non trouvé")
@staticmethod
def delete_shortcut(shortcut_id: str) -> dict:
"""Supprime un raccourci"""
config = ShortcutsService._load_config()
initial_count = len(config.shortcuts)
config.shortcuts = [s for s in config.shortcuts if s.id != shortcut_id]
if len(config.shortcuts) == initial_count:
raise Exception(f"Raccourci avec l'ID '{shortcut_id}' non trouvé")
# Réorganiser les ordres
for i, s in enumerate(config.shortcuts):
s.order = i
ShortcutsService._save_config(config)
return {"message": "Raccourci supprimé", "id": shortcut_id}
@staticmethod
def reorder_shortcuts(shortcut_ids: List[str]) -> List[ServiceShortcut]:
"""Réordonne les raccourcis"""
config = ShortcutsService._load_config()
# Créer un dict pour accès rapide
shortcuts_dict = {s.id: s for s in config.shortcuts}
# Réorganiser selon l'ordre donné
reordered = []
for i, shortcut_id in enumerate(shortcut_ids):
if shortcut_id in shortcuts_dict:
s = shortcuts_dict[shortcut_id]
s.order = i
reordered.append(s)
config.shortcuts = reordered
ShortcutsService._save_config(config)
return reordered
@staticmethod
def export_shortcuts() -> dict:
"""Exporte les raccourcis en JSON"""
config = ShortcutsService._load_config()
return config.model_dump()
@staticmethod
def import_shortcuts(shortcuts_data: List[dict]) -> List[ServiceShortcut]:
"""Importe des raccourcis depuis JSON"""
config = ShortcutsConfig()
for i, data in enumerate(shortcuts_data):
try:
shortcut = ServiceShortcut(**data)
shortcut.order = i
config.shortcuts.append(shortcut)
except Exception as e:
print(f"Erreur lors de l'import du raccourci {i}: {e}")
ShortcutsService._save_config(config)
return config.shortcuts

View File

@@ -0,0 +1,310 @@
import psutil
import json
import subprocess
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
class CPUUsage(BaseModel):
percent: float
average: float
cores: int
per_cpu: List[float]
freq: float # GHz
class MemoryUsage(BaseModel):
percent: float
used: int
total: int
available: int
cached: int
class DiskUsage(BaseModel):
device: str
total: int
used: int
free: int
percent: float
class NetworkUsage(BaseModel):
bytes_sent: int
bytes_recv: int
packets_sent: int
packets_recv: int
class ProcessInfo(BaseModel):
pid: int
name: str
status: str
cpu_percent: float
memory_percent: float
memory_mb: float
username: str
class BlockDevicePartition(BaseModel):
"""Représente une partition d'un disque"""
name: str
type: str
size: str
used: str
available: str
percent_used: float
mountpoint: Optional[str] = None
class BlockDevice(BaseModel):
"""Représente un disque ou un périphérique bloc"""
name: str
type: str
size: str
used: str
available: str
percent_used: float
mountpoint: Optional[str] = None
partitions: List[BlockDevicePartition] = []
class BlockDevicesInfo(BaseModel):
"""Informations sur tous les disques et partitions"""
devices: List[BlockDevice]
total_size: str
total_used: str
total_available: str
class SystemStats(BaseModel):
cpu: CPUUsage
memory: MemoryUsage
disk: List[DiskUsage]
network: NetworkUsage
processes: List[ProcessInfo]
class SystemService:
"""Service pour récupérer les informations système"""
@staticmethod
def get_cpu_usage() -> CPUUsage:
"""Récupère l'utilisation CPU"""
per_cpu = psutil.cpu_percent(interval=0, percpu=True)
avg_cpu = sum(per_cpu) / len(per_cpu) if per_cpu else 0
try:
freq = psutil.cpu_freq().current / 1000 # GHz
except:
freq = 0
return CPUUsage(
percent=avg_cpu,
average=avg_cpu,
cores=psutil.cpu_count(),
per_cpu=per_cpu,
freq=freq
)
@staticmethod
def get_memory_usage() -> MemoryUsage:
"""Récupère l'utilisation mémoire"""
mem = psutil.virtual_memory()
return MemoryUsage(
percent=mem.percent,
used=mem.used,
total=mem.total,
available=mem.available,
cached=mem.cached or 0
)
@staticmethod
def get_disk_usage() -> List[DiskUsage]:
"""Récupère l'utilisation disque"""
disks = []
try:
for partition in psutil.disk_partitions(all=False):
try:
usage = psutil.disk_usage(partition.mountpoint)
disks.append(DiskUsage(
device=partition.device,
total=usage.total,
used=usage.used,
free=usage.free,
percent=usage.percent
))
except (PermissionError, OSError):
continue
except:
pass
return disks
@staticmethod
def get_network_usage() -> NetworkUsage:
"""Récupère les stats réseau"""
net = psutil.net_io_counters()
return NetworkUsage(
bytes_sent=net.bytes_sent,
bytes_recv=net.bytes_recv,
packets_sent=net.packets_sent,
packets_recv=net.packets_recv
)
@staticmethod
def get_top_processes(limit: int = 10) -> List[ProcessInfo]:
"""Récupère les processus les plus actifs"""
processes = []
try:
for proc in psutil.process_iter(['pid', 'name', 'status', 'username']):
try:
info = proc.info
cpu_percent = proc.cpu_percent(interval=0)
mem_percent = proc.memory_percent()
mem_mb = proc.memory_info().rss / 1024 / 1024
# Ne garder que les processus avec activité
if cpu_percent > 0.5 or mem_percent > 0.5:
processes.append(ProcessInfo(
pid=info['pid'],
name=info['name'],
status=info['status'],
cpu_percent=cpu_percent,
memory_percent=mem_percent,
memory_mb=mem_mb,
username=info['username'] or 'N/A'
))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# Trier par CPU + mémoire
processes.sort(key=lambda x: (x.cpu_percent + x.memory_percent), reverse=True)
return processes[:limit]
except Exception:
return []
@staticmethod
def get_system_stats() -> SystemStats:
"""Récupère toutes les stats système"""
return SystemStats(
cpu=SystemService.get_cpu_usage(),
memory=SystemService.get_memory_usage(),
disk=SystemService.get_disk_usage(),
network=SystemService.get_network_usage(),
processes=SystemService.get_top_processes()
)
@staticmethod
def format_bytes(bytes_val: int) -> str:
"""Convertit les bytes en format lisible"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_val < 1024:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024
return f"{bytes_val:.2f} PB"
@staticmethod
def get_block_devices() -> BlockDevicesInfo:
"""Récupère les disques et partitions avec lsblk"""
try:
# Exécuter lsblk avec sortie JSON
result = subprocess.run(
['lsblk', '--json', '--bytes', '--output', 'NAME,TYPE,SIZE,FSTYPE,MOUNTPOINTS'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
raise Exception(f"lsblk failed: {result.stderr}")
lsblk_data = json.loads(result.stdout)
devices = []
total_size = 0
total_used = 0
total_available = 0
for block_device in lsblk_data.get('blockdevices', []):
# Obtenir les informations d'utilisation pour ce disque
device_name = f"/dev/{block_device['name']}"
size = block_device.get('size', 0)
used = 0
available = 0
percent_used = 0
mountpoint = None
partitions = []
# Essayer d'obtenir les stats d'utilisation
try:
# Chercher le premier mountpoint
if block_device.get('mountpoints'):
mountpoint = block_device['mountpoints'][0]
if mountpoint:
disk_usage = psutil.disk_usage(mountpoint)
used = disk_usage.used
available = disk_usage.free
percent_used = disk_usage.percent
total_used += used
total_available += available
except:
pass
# Traiter les partitions
if 'children' in block_device:
for child in block_device['children']:
child_device = f"/dev/{child['name']}"
child_size = child.get('size', 0)
child_used = 0
child_available = 0
child_percent = 0
child_mountpoint = None
try:
if child.get('mountpoints'):
child_mountpoint = child['mountpoints'][0]
if child_mountpoint:
disk_usage = psutil.disk_usage(child_mountpoint)
child_used = disk_usage.used
child_available = disk_usage.free
child_percent = disk_usage.percent
except:
pass
partitions.append(BlockDevicePartition(
name=child['name'],
type=child.get('type', 'unknown'),
size=SystemService.format_bytes(child_size),
used=SystemService.format_bytes(child_used),
available=SystemService.format_bytes(child_available),
percent_used=child_percent,
mountpoint=child_mountpoint
))
total_size += size
devices.append(BlockDevice(
name=block_device['name'],
type=block_device.get('type', 'unknown'),
size=SystemService.format_bytes(size),
used=SystemService.format_bytes(used),
available=SystemService.format_bytes(available),
percent_used=percent_used,
mountpoint=mountpoint,
partitions=partitions
))
return BlockDevicesInfo(
devices=devices,
total_size=SystemService.format_bytes(total_size),
total_used=SystemService.format_bytes(total_used),
total_available=SystemService.format_bytes(total_available)
)
except Exception as e:
# Si lsblk échoue, retourner une liste vide
return BlockDevicesInfo(
devices=[],
total_size="0 B",
total_used="0 B",
total_available="0 B"
)

View File

@@ -0,0 +1,101 @@
import psutil
from typing import List, Dict
from pydantic import BaseModel
class CPUUsage(BaseModel):
percent: float
average: float
cores: int
per_cpu: List[float]
class MemoryUsage(BaseModel):
percent: float
used: int # en bytes
total: int # en bytes
available: int # en bytes
class ProcessInfo(BaseModel):
pid: int
name: str
status: str
cpu_percent: float
memory_percent: float
username: str
class SystemStats(BaseModel):
cpu: CPUUsage
memory: MemoryUsage
processes: List[ProcessInfo]
class SystemService:
"""Service pour récupérer les informations système"""
@staticmethod
def get_cpu_usage() -> CPUUsage:
"""Récupère l'utilisation CPU (non-bloquant, utilise le cache)"""
per_cpu = psutil.cpu_percent(interval=0, percpu=True)
avg_cpu = sum(per_cpu) / len(per_cpu) if per_cpu else 0
return CPUUsage(
percent=avg_cpu,
average=avg_cpu,
cores=psutil.cpu_count(),
per_cpu=per_cpu
)
@staticmethod
def get_memory_usage() -> MemoryUsage:
"""Récupère l'utilisation mémoire"""
mem = psutil.virtual_memory()
return MemoryUsage(
percent=mem.percent,
used=mem.used,
total=mem.total,
available=mem.available
)
@staticmethod
def get_top_processes(limit: int = 5) -> List[ProcessInfo]:
"""Récupère les processus les plus actifs (optimisé)"""
processes = []
try:
for proc in psutil.process_iter(['pid', 'name', 'status', 'username']):
try:
info = proc.info
cpu_percent = proc.cpu_percent(interval=0)
# Ne garder que les processus avec une activité CPU > 0
if cpu_percent > 0.1:
mem_percent = proc.memory_percent()
processes.append(ProcessInfo(
pid=info['pid'],
name=info['name'],
status=info['status'],
cpu_percent=cpu_percent,
memory_percent=mem_percent,
username=info['username'] or 'N/A'
))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# Trier par utilisation CPU
processes.sort(key=lambda x: x.cpu_percent, reverse=True)
return processes[:limit]
except Exception as e:
return []
return []
print(f"Erreur lors de la récupération des processus: {e}")
return []
@staticmethod
def get_system_stats() -> SystemStats:
"""Récupère les statistiques système complètes"""
return SystemStats(
cpu=SystemService.get_cpu_usage(),
memory=SystemService.get_memory_usage(),
processes=SystemService.get_top_processes(15)
)