Files
innotexBoard/backend/app/core/security.py

211 lines
6.7 KiB
Python

from datetime import datetime, timedelta
from typing import Optional, Dict
from jose import JWTError, jwt
from pydantic import BaseModel
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext
import pam
import logging
import re
from app.core.config import settings
# Configuration du logging de sécurité
logger = logging.getLogger("innotexboard.security")
# Contexte de hashage avec bcrypt (plus sécurisé que MD5/SHA)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Stockage temporaire des tentatives de connexion (en production, utiliser Redis)
login_attempts: Dict[str, list] = {}
class TokenData(BaseModel):
username: str
exp: datetime
class Token(BaseModel):
access_token: str
token_type: str
username: str
class User(BaseModel):
username: str
is_authenticated: bool = True
def validate_username(username: str) -> bool:
"""
Valide le format du nom d'utilisateur pour prévenir les injections
Accepte uniquement: lettres, chiffres, tirets, underscores
"""
if not username or len(username) < 2 or len(username) > 32:
return False
pattern = re.compile(r'^[a-zA-Z0-9_-]+$')
return bool(pattern.match(username))
def check_rate_limit(username: str, ip_address: str) -> bool:
"""
Vérifie si l'utilisateur/IP a dépassé le nombre de tentatives autorisées
Protection contre les attaques brute force
"""
key = f"{username}:{ip_address}"
current_time = datetime.utcnow()
# Nettoyer les anciennes tentatives
if key in login_attempts:
login_attempts[key] = [
attempt_time for attempt_time in login_attempts[key]
if (current_time - attempt_time).seconds < settings.LOGIN_ATTEMPT_WINDOW
]
# Vérifier le nombre de tentatives
if key in login_attempts and len(login_attempts[key]) >= settings.MAX_LOGIN_ATTEMPTS:
logger.warning(
f"Rate limit exceeded for {username} from {ip_address}. "
f"{len(login_attempts[key])} attempts in last {settings.LOGIN_ATTEMPT_WINDOW}s"
)
return False
return True
def record_failed_attempt(username: str, ip_address: str):
"""Enregistre une tentative de connexion échouée"""
key = f"{username}:{ip_address}"
if key not in login_attempts:
login_attempts[key] = []
login_attempts[key].append(datetime.utcnow())
logger.warning(f"Failed login attempt for {username} from {ip_address}")
def clear_login_attempts(username: str, ip_address: str):
"""Efface les tentatives après une connexion réussie"""
key = f"{username}:{ip_address}"
if key in login_attempts:
del login_attempts[key]
def authenticate_user(username: str, password: str, ip_address: str) -> Optional[User]:
"""
Authentifie un utilisateur via PAM (Pluggable Authentication Module)
Validé contre le système Debian/Linux
Inclut protection brute force
"""
# Validation du format du username (protection injection)
if not validate_username(username):
logger.warning(f"Invalid username format attempted: {username} from {ip_address}")
return None
# Vérification du rate limiting
if not check_rate_limit(username, ip_address):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Trop de tentatives de connexion. Réessayez dans {settings.LOGIN_ATTEMPT_WINDOW // 60} minutes."
)
try:
pam_auth = pam.pam()
if pam_auth.authenticate(username, password):
clear_login_attempts(username, ip_address)
logger.info(f"Successful authentication for {username} from {ip_address}")
return User(username=username)
else:
record_failed_attempt(username, ip_address)
return None
except Exception as e:
logger.error(f"PAM authentication error for {username} from {ip_address}: {e}")
record_failed_attempt(username, ip_address)
return None
def create_access_token(username: str, expires_delta: Optional[timedelta] = None) -> str:
"""Crée un token JWT sécurisé"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {
"sub": username,
"exp": expire,
"iat": datetime.utcnow(), # Issued at
"type": "access"
}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
return encoded_jwt
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""
Valide le token JWT et retourne l'utilisateur actuel
Inclut validation supplémentaire et logging
"""
credential_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Impossible de valider les credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
# Décodage et validation du token
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
exp: int = payload.get("exp")
token_type: str = payload.get("type")
# Validations supplémentaires
if username is None or token_type != "access":
logger.warning(f"Invalid token structure")
raise credential_exception
# Vérification de l'expiration
if exp and datetime.fromtimestamp(exp) < datetime.utcnow():
logger.warning(f"Expired token for {username}")
raise credential_exception
# Validation du format username
if not validate_username(username):
logger.warning(f"Invalid username in token: {username}")
raise credential_exception
token_data = TokenData(username=username, exp=exp)
except JWTError as e:
logger.warning(f"JWT decode error: {e}")
raise credential_exception
except Exception as e:
logger.error(f"Unexpected error in token validation: {e}")
raise credential_exception
return User(username=token_data.username)
def hash_password(password: str) -> str:
"""Hash un mot de passe avec bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Vérifie un mot de passe contre son hash"""
return pwd_context.verify(plain_password, hashed_password)