from enum import Enum from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES ,pwd_context, get_session_db, Base from datetime import datetime, timedelta, timezone from pydantic import BaseModel from fastapi import Depends, HTTPException from typing import Annotated from fastapi.security import OAuth2PasswordBearer from pydantic.networks import EmailStr from sqlalchemy import Column, Integer, String from sqlalchemy.orm import Session import jwt class Token(BaseModel): access_token: str token_type: str ### ENUMS ### class Role(str, Enum): user = "user" admin = "admin" guest = "guest" mod = "mod" class Status(str, Enum): active = "active" banned = "banned" suspended = "suspended" ### KULLANICI MODELLERİ ### sqlalchemy ve pydantic modelleri farklıdır class UserBase(BaseModel): #bu bir veri tabanı modeli değil !!!! lütfen dikkat et username: str | None = None #Option yerine Union kullanabilirsin role: Role | None = None status: Status | None = None class UserInDb(UserBase): user_id: int | None = None email: EmailStr | None = None hashed_password: str | None = None class UserPublic(BaseModel): username : str | None = None role : Role | None = None status : Status | None = None class UserCreate(BaseModel): username: str | None = None role: Role | None = None email : EmailStr | None = None status: Status | None = None password : str | None = None ### VERİTABANI MODELİ ### class DBUser(Base): __tablename__ = "users_table" user_id = Column(Integer, primary_key=True, index=True) username = Column(String, unique=True, index=True, nullable=False) email = Column(String, unique=True, index=True, nullable=False) hashed_password = Column(String, nullable=False) role = Column(String, default="user") status = Column(String, default="active") created_date = Column(String, default=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")) ### AUTH ### oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") ### SERVİSLER ### def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def authenticate_user( session: Annotated[Session, Depends(get_session_db)], username: str, password: str ) -> UserInDb | None: user = session.query(DBUser).filter(DBUser.username == username).first() if user is None or not verify_password(password, user.hashed_password): #sqlalchemy'de bu şekilde kontrol ediliyor None ile return None return user def create_access_token( data: dict, expires_delta: Annotated[timedelta, None] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES), ) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_user( session: Annotated[Session, Depends(get_session_db)], username: str ) -> UserInDb | None: user = session.query(DBUser).filter(DBUser.username == username).first() return user async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], session: Annotated[Session, Depends(get_session_db)] ) -> UserPublic: credentials_exception = HTTPException( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username : str | None = payload.get("sub") user = UserInDb.model_validate(payload) if username is None: raise credentials_exception except jwt.PyJWTError: raise credentials_exception user = await get_user(session, username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[UserInDb, Depends(get_current_user)] ) -> UserPublic: if current_user.status == Status.banned: raise HTTPException(status_code=400, detail="Inactive user") return current_user ### Kullanıcı kaydı def register_user( session: Annotated[Session, Depends(get_session_db)], user: Annotated[UserCreate, Depends()] ) -> UserPublic: user_dict = user.dict() # kullanıcıdan gelen verileri alıyoruz çunku şifreyi hashleyeceğiz user_dict['hashed_password'] = get_password_hash(user.password) # şifreyi hashliyoruz if not verify_password(user.password, user_dict['hashed_password']): raise HTTPException(status_code=400, detail="Password hashing failed") # şifre hashleme işlemi başarısız oldu # Kullanıcı adı ve e-posta adresinin benzersiz olduğunu kontrol et existing_user = session.query(DBUser).filter( (DBUser.username == user.username) | (DBUser.email == user.email) ).first() if existing_user: raise HTTPException(status_code=400, detail="Username or email already registered") user_dict['created_date'] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # kullanıcı oluşturulma tarihi user_dict.pop('password') ##password'u veri tabanına eklemiyoruz zaten sınıfımızda tanımlı değil hata verir db_user = DBUser(**user_dict) #alchemy ile pydantic modelleri farklıdır bir birine session.add(db_user) # donuşum yaparken dikkat et session.commit() session.refresh(db_user) return db_user