58 lines
2.0 KiB
Python
58 lines
2.0 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from .models import UserInDb, Role, Token, UserPublic, UserBase
|
|
from .models import get_current_active_user, authenticate_user, create_access_token,get_current_user
|
|
from datetime import timedelta
|
|
from ..auth.models import get_password_hash, verify_password
|
|
from typing import Annotated
|
|
from sqlmodel import Session
|
|
from ..config import get_session_db
|
|
from fastapi import Depends
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from .models import UserCreate, DBUser
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/auth",
|
|
tags=["auth"],
|
|
responses={404: {"description": "Not found"}},
|
|
dependencies=[],
|
|
)
|
|
|
|
@router.post('/login')
|
|
async def login_for_access_token(
|
|
form_data : Annotated[OAuth2PasswordRequestForm, Depends()],
|
|
session : Annotated[Session, Depends(get_session_db)],
|
|
) -> Token:
|
|
|
|
user = authenticate_user(session, form_data.username, form_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
access_token_expires = timedelta(minutes=30)
|
|
access_token = create_access_token(
|
|
data={"sub": user.username, "role": user.role, 'status': user.status}, expires_delta=access_token_expires
|
|
)
|
|
return Token(access_token=access_token, token_type="bearer")
|
|
|
|
|
|
@router.post('/register', response_model=UserPublic)
|
|
async def create_user(
|
|
session : Annotated[Session, Depends(get_session_db)],
|
|
user : Annotated[UserCreate, Depends()]
|
|
):
|
|
user_dict = user.dict()
|
|
print(user.password)
|
|
user_dict['hashed_password'] = get_password_hash(user.password)
|
|
print (user_dict['hashed_password'])
|
|
|
|
if not verify_password(user.password, user_dict['hashed_password']):
|
|
raise HTTPException(status_code=400, detail="Password hashing failed")
|
|
|
|
db_user = DBUser.model_validate(user_dict)
|
|
session.add(db_user)
|
|
session.commit()
|
|
session.refresh(db_user)
|
|
return db_user |