Files
innotexBoard/backend/app/services/docker_service.py
2026-01-16 18:40:39 +01:00

154 lines
5.3 KiB
Python

import docker
from docker.errors import DockerException
from typing import List, Optional
from pydantic import BaseModel
class ContainerPort(BaseModel):
private_port: int
public_port: Optional[int]
type: str
class ContainerInfo(BaseModel):
id: str
name: str
image: str
status: str
state: str
cpu_percent: float
memory_usage: str
created: str
ports: List[ContainerPort]
class DockerService:
"""Service pour gérer Docker"""
def __init__(self):
try:
self.client = docker.from_env()
except DockerException as e:
print(f"Erreur de connexion Docker: {e}")
self.client = None
def is_connected(self) -> bool:
"""Vérifie si Docker est accessible"""
try:
if self.client:
self.client.ping()
return True
except:
pass
return False
def get_containers(self, all: bool = True) -> List[ContainerInfo]:
"""Récupère la liste des conteneurs"""
if not self.is_connected():
return []
containers = []
try:
for container in self.client.containers.list(all=all):
stats = container.stats(stream=False)
# Calcul de l'utilisation CPU
cpu_percent = 0.0
try:
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats'].get('cpu_usage', {}).get('total_usage', 0)
system_cpu_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats'].get('system_cpu_usage', 0)
cpu_count = len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', []))
if system_cpu_delta > 0:
cpu_percent = (cpu_delta / system_cpu_delta) * cpu_count * 100.0
except:
cpu_percent = 0.0
# Mémoire utilisée
memory_usage = stats['memory_stats'].get('usage', 0) / (1024 * 1024) # MB
# Ports
ports = []
if container.ports:
for private_port, bindings in container.ports.items():
if bindings:
for binding in bindings:
try:
public_port = int(binding['HostPort'])
except:
public_port = None
ports.append(ContainerPort(
private_port=int(private_port.split('/')[0]),
public_port=public_port,
type=private_port.split('/')[1]
))
containers.append(ContainerInfo(
id=container.short_id,
name=container.name,
image=container.image.tags[0] if container.image.tags else container.image.id[:12],
status=container.status,
state=container.attrs['State']['Status'],
cpu_percent=round(cpu_percent, 2),
memory_usage=f"{memory_usage:.2f} MB",
created=container.attrs['Created'],
ports=ports
))
except Exception as e:
print(f"Erreur lors de la récupération des conteneurs: {e}")
return containers
def start_container(self, container_id: str) -> bool:
"""Démarre un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.start()
return True
except Exception as e:
print(f"Erreur au démarrage du conteneur: {e}")
return False
def stop_container(self, container_id: str) -> bool:
"""Arrête un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.stop(timeout=10)
return True
except Exception as e:
print(f"Erreur à l'arrêt du conteneur: {e}")
return False
def restart_container(self, container_id: str) -> bool:
"""Redémarre un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.restart(timeout=10)
return True
except Exception as e:
print(f"Erreur au redémarrage du conteneur: {e}")
return False
def remove_container(self, container_id: str, force: bool = False) -> bool:
"""Supprime un conteneur"""
if not self.is_connected():
return False
try:
container = self.client.containers.get(container_id)
container.remove(force=force)
return True
except Exception as e:
print(f"Erreur à la suppression du conteneur: {e}")
return False