This commit is contained in:
Bedir Tuğra Karaabalı 2025-05-05 13:18:29 +03:00
parent 836dd1d27a
commit 5f32db24bf
9 changed files with 300 additions and 14 deletions

0
__init__.py Normal file
View File

0
auth/__init__.py Normal file
View File

173
auth/models.py Normal file
View File

@ -0,0 +1,173 @@
from enum import Enum
from backend.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from backend.config import pwd_context
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from fastapi import Depends, HTTPException
from typing import Annotated
from fastapi.security import OAuth2PasswordBearer
import jwt
class Token(BaseModel):
access_token : str
token_type : str
class TokenData(BaseModel):
username : str | None = None
role : str | None = None
status : str | None = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
#### ENUMS ####
class Role(str, Enum):
user = "user"
admin = "admin"
guest = "guest"
mod = "mod"
class Status(str, Enum):
active = "active"
banned = "banned"
suspended = "suspended"
class User(BaseModel):
username : str | None = None
user_id : int | None = None
role : Role | None = None
status : Status | None = None
class UserInDb(User):
hashed_password : str | None = None
fake_db = {
"bedir": {
"username": "bedir",
"user_id": 1,
"hashed_password": "$2a$12$mYGWGo9c3Di3SJyYjYf3XOAsu5nP8jekf3KTItO9pbUBEm5BcapRO", # Bcrypt örneği
"role": Role.user,
"status": Status.active,
},
"alice": {
"username": "alice",
"user_id": 2,
"hashed_password": "$2b$12$Alic3FakeHashedPasSw0rdxxxxxxxyyyyyyzzzzzz",
"role": Role.user,
"status": Status.suspended,
},
"adminuser": {
"username": "adminuser",
"user_id": 3,
"hashed_password": "$2b$12$AdminFakeHashedPasSw0rdxxxxxxxyyyyyyzzzzzz",
"role": Role.admin,
"status": Status.active,
}
}
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def authenticate_user(fake_db, username: str, password: str) -> UserInDb | bool:
print("username", username)
user = fake_db.get(username)
if not user:
return False
if not verify_password(password, user["hashed_password"]):
return False
return user
def create_access_token(
data : dict,
expires_delta : Annotated[timedelta, None] = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_user(db, username: str) -> UserInDb | None:
if username in db:
user_dict = db[username]
return UserInDb(**user_dict)
return None
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDb | None:
credentials_exception = HTTPException(
status_code=401,
detail="Burda bir hata var",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
token_data = TokenData(**payload)
username : str = token_data.username
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_db, username=username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user : Annotated[UserInDb, Depends(get_current_user)]
) -> UserInDb | None:
if current_user.status == Status.banned:
raise HTTPException(status_code=400, detail="Inactive user")
print("current_user", current_user)
return current_user
"""
class User(BaseModel):
username : str
name : str | None = None
surname : str | None = None
email : EmailStr | None = None
role : Role | None = None
status : Status | None = None
bio : str | None = None
created_date : datetime | None = None
collections : list[str] | None = None
items = list[str] | None = None
class UserInDB(User):
hashed_password : str | None = None
class UserSelfProfile(BaseModel):
username : str
name : str | None = None
surname : str | None = None
email : EmailStr | None = None
role : Role | None = None
status : Status | None = None
bio : str | None = None
created_date : datetime | None = None
collections : list[str] | None = None
items = list[str] | None = None
class UserPublicProfile(BaseModel):
username : str
role : Role | None = None
bio : str | None = None
created_date : datetime | None = None
collections : list[str] | None = None
items = list[str] | None = None
"""

50
auth/router.py Normal file
View File

@ -0,0 +1,50 @@
from fastapi import APIRouter, Depends, HTTPException
from .models import UserInDb, User, Role, Token
from .models import get_current_active_user, authenticate_user, create_access_token , fake_db
from datetime import timedelta, datetime, timezone
from ..config import ACCESS_TOKEN_EXPIRE_MINUTES
from typing import Annotated, Optional
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(
prefix="/auth",
tags=["auth"],
responses={404: {"description": "Not found"}},
dependencies=[],
)
@router.get("/me")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
def ADMIN(current_user: Annotated[UserInDb, Depends(get_current_active_user)]):
if current_user.role != Role.admin:
raise HTTPException(status_code=400, detail="You are not admin")
return current_user
@router.get('/home')
async def home(current_user : Annotated[User, Depends(ADMIN)]):
return {"message" : f"Welcome to home page {current_user.username}"}
@router.post('/login')
async def login_for_access_token(
form_data : Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=400,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user['username'], "role": user['role'], 'status': user['status']}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

0
auth/schemas.py Normal file
View File

0
auth/services.py Normal file
View File

44
config.py Normal file
View File

@ -0,0 +1,44 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from passlib.context import CryptContext
from dotenv import load_dotenv
import os
load_dotenv()
Base = declarative_base() #basic class for declarative models
DATABASE_URL = f"postgresql://{os.getenv('USERNAME_DB')}:{os.getenv('PASSWORD_DB')}@{os.getenv('HOST_DB')}:{os.getenv('PORT_DB')}/{os.getenv('NAME_DB')}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
### SECRET KEY ###
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))
pwd_context = CryptContext(schemes=[f"{os.getenv('CRYPTO_TYPE')}"], deprecated="auto")
origins = [
"http://localhost",
"http://localhost:8080",
"http://localhost:3000",
"http://localhost:8000",
]
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

18
docker-compose.yml Normal file
View File

@ -0,0 +1,18 @@
services:
postgres:
image: postgres:latest
container_name: postgres
restart: always
environment:
POSTGRES_USER: postgres_user
POSTGRES_PASSWORD: postgres_password
POSTGRES_DB: postgres_db
ports:
- "5434:5432"
volumes:
- postgres_data:/db
volumes:
postgres_data:
driver: local

29
main.py
View File

@ -1,17 +1,23 @@
from .config import app
from .auth.router import router as auth_router
app.include_router(auth_router)
'''
from fastapi import FastAPI
from pydantic import BaseModel, Field, EmailStr
from sqlalchemy import (
Column, Integer, String, DateTime, Enum, ForeignKey, Text, Float, Boolean, create_engine
)
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy.dialects.postgresql import JSONB
from enum import Enum as PyEnum
import datetime
from sqlalchemy import Column, Integer, String, DateTime, Float, Text, Boolean, ForeignKey, Enum
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
app = FastAPI()
Base = declarative_base()
from .config import Base #databaese connection
from .config import app #base app
# Enums
# Enums database
class Role(str, PyEnum):
admin = "admin"
user = "user"
@ -33,7 +39,6 @@ class VoteType(str, PyEnum):
# SQLAlchemy Models
class User(Base):
__tablename__ = "users"
user_id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, nullable=False)
name = Column(String)
@ -113,9 +118,5 @@ class VoteCreate(BaseModel):
item_id: int
vote_type: VoteType
# DB Setup (edit with your DB credentials)
DATABASE_URL = "postgresql://username:password@localhost:5432/mydatabase"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
'''