211 lines
6.7 KiB
Python
211 lines
6.7 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict
|
|
from jose import JWTError, jwt
|
|
from pydantic import BaseModel
|
|
from fastapi import Depends, HTTPException, status, Request
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from passlib.context import CryptContext
|
|
import pam
|
|
import logging
|
|
import re
|
|
from app.core.config import settings
|
|
|
|
# Configuration du logging de sécurité
|
|
logger = logging.getLogger("innotexboard.security")
|
|
|
|
# Contexte de hashage avec bcrypt (plus sécurisé que MD5/SHA)
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
# Stockage temporaire des tentatives de connexion (en production, utiliser Redis)
|
|
login_attempts: Dict[str, list] = {}
|
|
|
|
|
|
class TokenData(BaseModel):
|
|
username: str
|
|
exp: datetime
|
|
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
username: str
|
|
|
|
|
|
class User(BaseModel):
|
|
username: str
|
|
is_authenticated: bool = True
|
|
|
|
|
|
def validate_username(username: str) -> bool:
|
|
"""
|
|
Valide le format du nom d'utilisateur pour prévenir les injections
|
|
Accepte uniquement: lettres, chiffres, tirets, underscores
|
|
"""
|
|
if not username or len(username) < 2 or len(username) > 32:
|
|
return False
|
|
pattern = re.compile(r'^[a-zA-Z0-9_-]+$')
|
|
return bool(pattern.match(username))
|
|
|
|
|
|
def check_rate_limit(username: str, ip_address: str) -> bool:
|
|
"""
|
|
Vérifie si l'utilisateur/IP a dépassé le nombre de tentatives autorisées
|
|
Protection contre les attaques brute force
|
|
"""
|
|
key = f"{username}:{ip_address}"
|
|
current_time = datetime.utcnow()
|
|
|
|
# Nettoyer les anciennes tentatives
|
|
if key in login_attempts:
|
|
login_attempts[key] = [
|
|
attempt_time for attempt_time in login_attempts[key]
|
|
if (current_time - attempt_time).seconds < settings.LOGIN_ATTEMPT_WINDOW
|
|
]
|
|
|
|
# Vérifier le nombre de tentatives
|
|
if key in login_attempts and len(login_attempts[key]) >= settings.MAX_LOGIN_ATTEMPTS:
|
|
logger.warning(
|
|
f"Rate limit exceeded for {username} from {ip_address}. "
|
|
f"{len(login_attempts[key])} attempts in last {settings.LOGIN_ATTEMPT_WINDOW}s"
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def record_failed_attempt(username: str, ip_address: str):
|
|
"""Enregistre une tentative de connexion échouée"""
|
|
key = f"{username}:{ip_address}"
|
|
if key not in login_attempts:
|
|
login_attempts[key] = []
|
|
login_attempts[key].append(datetime.utcnow())
|
|
|
|
logger.warning(f"Failed login attempt for {username} from {ip_address}")
|
|
|
|
|
|
def clear_login_attempts(username: str, ip_address: str):
|
|
"""Efface les tentatives après une connexion réussie"""
|
|
key = f"{username}:{ip_address}"
|
|
if key in login_attempts:
|
|
del login_attempts[key]
|
|
|
|
|
|
def authenticate_user(username: str, password: str, ip_address: str) -> Optional[User]:
|
|
"""
|
|
Authentifie un utilisateur via PAM (Pluggable Authentication Module)
|
|
Validé contre le système Debian/Linux
|
|
Inclut protection brute force
|
|
"""
|
|
# Validation du format du username (protection injection)
|
|
if not validate_username(username):
|
|
logger.warning(f"Invalid username format attempted: {username} from {ip_address}")
|
|
return None
|
|
|
|
# Vérification du rate limiting
|
|
if not check_rate_limit(username, ip_address):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=f"Trop de tentatives de connexion. Réessayez dans {settings.LOGIN_ATTEMPT_WINDOW // 60} minutes."
|
|
)
|
|
|
|
try:
|
|
pam_auth = pam.pam()
|
|
if pam_auth.authenticate(username, password):
|
|
clear_login_attempts(username, ip_address)
|
|
logger.info(f"Successful authentication for {username} from {ip_address}")
|
|
return User(username=username)
|
|
else:
|
|
record_failed_attempt(username, ip_address)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"PAM authentication error for {username} from {ip_address}: {e}")
|
|
record_failed_attempt(username, ip_address)
|
|
return None
|
|
|
|
|
|
def create_access_token(username: str, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Crée un token JWT sécurisé"""
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(
|
|
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
|
)
|
|
|
|
to_encode = {
|
|
"sub": username,
|
|
"exp": expire,
|
|
"iat": datetime.utcnow(), # Issued at
|
|
"type": "access"
|
|
}
|
|
|
|
encoded_jwt = jwt.encode(
|
|
to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM
|
|
)
|
|
return encoded_jwt
|
|
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security)
|
|
) -> User:
|
|
"""
|
|
Valide le token JWT et retourne l'utilisateur actuel
|
|
Inclut validation supplémentaire et logging
|
|
"""
|
|
credential_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Impossible de valider les credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
try:
|
|
token = credentials.credentials
|
|
|
|
# Décodage et validation du token
|
|
payload = jwt.decode(
|
|
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
|
|
username: str = payload.get("sub")
|
|
exp: int = payload.get("exp")
|
|
token_type: str = payload.get("type")
|
|
|
|
# Validations supplémentaires
|
|
if username is None or token_type != "access":
|
|
logger.warning(f"Invalid token structure")
|
|
raise credential_exception
|
|
|
|
# Vérification de l'expiration
|
|
if exp and datetime.fromtimestamp(exp) < datetime.utcnow():
|
|
logger.warning(f"Expired token for {username}")
|
|
raise credential_exception
|
|
|
|
# Validation du format username
|
|
if not validate_username(username):
|
|
logger.warning(f"Invalid username in token: {username}")
|
|
raise credential_exception
|
|
|
|
token_data = TokenData(username=username, exp=exp)
|
|
|
|
except JWTError as e:
|
|
logger.warning(f"JWT decode error: {e}")
|
|
raise credential_exception
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in token validation: {e}")
|
|
raise credential_exception
|
|
|
|
return User(username=token_data.username)
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash un mot de passe avec bcrypt"""
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Vérifie un mot de passe contre son hash"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|