import docker from docker.errors import DockerException from typing import List, Optional, Dict from pydantic import BaseModel from datetime import datetime import logging import re logger = logging.getLogger(__name__) class ImageUpdate(BaseModel): image: str current_tag: str latest_tag: Optional[str] has_update: bool registry: str class ImageInfo(BaseModel): image: str tag: str image_id: str created: str size: str containers_using: List[str] class UpdateService: """Service pour gérer les mises à jour des images Docker Inspiré de TrueNAS Scale""" def __init__(self): try: self.client = docker.from_env() except DockerException as e: logger.error(f"Erreur de connexion Docker: {e}") self.client = None def is_connected(self) -> bool: """Vérifie si Docker est accessible""" try: if self.client: self.client.ping() return True except: pass return False def parse_image_name(self, image_full_name: str) -> Dict[str, str]: """Parse le nom complet d'une image Docker Format: [registry/]repository[:tag][@digest] Exemple: docker.io/library/nginx:latest """ # Retirer le digest si présent image_full_name = image_full_name.split('@')[0] # Parser le registry, repository et tag registry = "docker.io" tag = "latest" # Vérifier si un registry personnalisé est spécifié if '/' in image_full_name: parts = image_full_name.split('/') if '.' in parts[0] or ':' in parts[0] or parts[0] == 'localhost': registry = parts[0] image_repo = '/'.join(parts[1:]) else: image_repo = image_full_name else: image_repo = image_full_name # Parser le tag if ':' in image_repo: image_repo, tag = image_repo.rsplit(':', 1) # Si pas de slash dans image_repo, c'est une image officielle if '/' not in image_repo: image_repo = f"library/{image_repo}" return { "registry": registry, "repository": image_repo, "tag": tag, "full_name": image_full_name } def get_all_images_info(self) -> List[ImageInfo]: """Récupère toutes les images locales et les conteneurs qui les utilisent""" if not self.is_connected(): return [] images_info = [] try: containers = {c.image.id: [] for c in self.client.containers.list(all=True)} for container in self.client.containers.list(all=True): containers[container.image.id].append(container.name) for image in self.client.images.list(): for tag in (image.tags or [""]): size_bytes = image.attrs.get('Size', 0) size_mb = size_bytes / (1024 * 1024) created = image.attrs.get('Created', '') images_info.append(ImageInfo( image=tag.split(':')[0] if ':' in tag else tag, tag=tag.split(':')[1] if ':' in tag else "latest", image_id=image.short_id, created=created, size=f"{size_mb:.2f} MB", containers_using=containers.get(image.id, []) )) except Exception as e: logger.error(f"Erreur lors de la récupération des images: {e}") return images_info def check_image_updates(self, image_name: str) -> Optional[ImageUpdate]: """Vérifie si une image a une mise à jour disponible Utilise Docker Registry V2 API pour vérifier les tags disponibles """ if not self.is_connected(): return None try: parsed = self.parse_image_name(image_name) registry = parsed['registry'] repository = parsed['repository'] current_tag = parsed['tag'] # Pour docker.io, utiliser le registry officiel if registry == "docker.io": registry_url = "https://registry-1.docker.io/v2" # Auth token pour docker.io import requests # Obtenir le token d'authentification auth_url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repository}:pull" try: auth_response = requests.get(auth_url, timeout=5) token = auth_response.json().get('token', '') except: token = "" # Récupérer les tags disponibles headers = {} if token: headers['Authorization'] = f'Bearer {token}' try: tags_response = requests.get( f"{registry_url}/{repository}/tags/list", headers=headers, timeout=10 ) if tags_response.status_code == 200: tags = tags_response.json().get('tags', []) # Chercher le dernier tag stable latest_tag = self._find_latest_tag(tags, current_tag) has_update = latest_tag != current_tag return ImageUpdate( image=image_name, current_tag=current_tag, latest_tag=latest_tag, has_update=has_update, registry=registry ) except Exception as e: logger.warning(f"Erreur lors de la vérification des tags: {e}") return None except Exception as e: logger.error(f"Erreur lors de la vérification des mises à jour: {e}") return None def _find_latest_tag(self, tags: List[str], current_tag: str) -> str: """Trouve le dernier tag stable parmi une liste de tags""" if not tags: return current_tag # Filtrer les tags non valides stable_tags = [t for t in tags if t and t != 'latest' and not re.search(r'-(rc|alpha|beta|dev)', t)] if not stable_tags: stable_tags = tags # Trier par version sémantique def parse_version(v): try: parts = [int(x) for x in v.split('.')[:3]] while len(parts) < 3: parts.append(0) return tuple(parts) except: return (0, 0, 0) try: latest = sorted(stable_tags, key=parse_version, reverse=True)[0] return latest except: return stable_tags[0] if stable_tags else current_tag def pull_image(self, image_name: str, tag: str = "latest") -> bool: """Pull une image Docker Similaire à la fonction de TrueNAS Scale """ if not self.is_connected(): return False try: full_image = f"{image_name}:{tag}" logger.info(f"Téléchargement de l'image: {full_image}") # Pull l'image result = self.client.images.pull(full_image) logger.info(f"Image téléchargée avec succès: {full_image}") return True except Exception as e: logger.error(f"Erreur lors du téléchargement de l'image: {e}") return False def update_container_image(self, container_id: str, new_image: str, new_tag: str = "latest") -> Dict: """Met à jour l'image d'un conteneur Processus similaire à TrueNAS Scale: 1. Stop le conteneur 2. Pull la nouvelle image 3. Redémarre le conteneur avec la nouvelle image """ if not self.is_connected(): return {"success": False, "message": "Docker n'est pas accessible"} try: container = self.client.containers.get(container_id) container_name = container.name old_image = container.image.tags[0] if container.image.tags else container.image.id[:12] logger.info(f"Mise à jour du conteneur {container_name}") # 1. Arrêter le conteneur try: container.stop(timeout=10) logger.info(f"Conteneur {container_name} arrêté") except Exception as e: logger.warning(f"Erreur à l'arrêt du conteneur: {e}") # 2. Pull la nouvelle image full_image = f"{new_image}:{new_tag}" if not self.pull_image(new_image, new_tag): return { "success": False, "message": f"Impossible de télécharger l'image {full_image}" } # 3. Redémarrer le conteneur avec la nouvelle image try: # Récupérer la configuration du conteneur config = container.attrs # Supprimer l'ancien conteneur container.remove() logger.info(f"Ancien conteneur supprimé") # Créer un nouveau conteneur avec la nouvelle image new_container = self.client.containers.run( full_image, name=container_name, detach=True, **{k: v for k, v in config['HostConfig'].items() if k not in ['Binds']} ) logger.info(f"Nouveau conteneur créé avec l'image {full_image}") return { "success": True, "message": f"Conteneur {container_name} mis à jour avec succès", "old_image": old_image, "new_image": full_image } except Exception as e: logger.error(f"Erreur lors de la mise à jour du conteneur: {e}") return { "success": False, "message": f"Erreur lors de la mise à jour: {str(e)}" } except Exception as e: logger.error(f"Erreur générale: {e}") return { "success": False, "message": f"Erreur: {str(e)}" } def get_image_history(self, image_name: str) -> Dict: """Récupère l'historique d'une image (layers)""" if not self.is_connected(): return {} try: image = self.client.images.get(image_name) history = image.attrs.get('History', []) return { "image": image_name, "layers_count": len(history), "history": history } except Exception as e: logger.error(f"Erreur lors de la récupération de l'historique: {e}") return {} def prune_unused_images(self, dangling_only: bool = False) -> Dict: """Nettoie les images inutilisées Similaire à la fonction de TrueNAS Scale """ if not self.is_connected(): return {"success": False, "message": "Docker n'est pas accessible"} try: result = self.client.images.prune(filters={"dangling": dangling_only}) deleted_count = len(result.get('ImagesDeleted', [])) space_freed = result.get('SpaceFreed', 0) / (1024 * 1024) # MB return { "success": True, "deleted_images": deleted_count, "space_freed_mb": f"{space_freed:.2f}", "details": result.get('ImagesDeleted', []) } except Exception as e: logger.error(f"Erreur lors du nettoyage des images: {e}") return { "success": False, "message": f"Erreur: {str(e)}" }