controlpanel-old/backend/auth.py

75 lines
1.8 KiB
Python

import os
import json
from fastapi import APIRouter, Depends, HTTPException, status, Header
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext as PasswordContext
from cryptography.fernet import Fernet, InvalidToken, InvalidSignature
from models import User
AUTH_ERROR = HTTPException(status.HTTP_400_BAD_REQUEST)
password_context = PasswordContext(schemes=["bcrypt"], deprecated="auto")
SECRET_KEY = os.environ["SECRET_KEY"]
crypto_context = Fernet(SECRET_KEY)
users_db = {
"kai": {
"username": "kai",
"hashed_password": "$2b$12$mh5hSniE.1SqxK3IDDTIO.1jDKgU0KX2eZev3yFu4Z1ZUPXWMw2Xa",
}
}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
class UserInDb(User):
hashed_password: str
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
token_data = json.loads(crypto_context.decrypt(token.encode()))
except (InvalidToken, InvalidSignature):
raise AUTH_ERROR
return User(username=token_data["username"])
auth_router = APIRouter()
class TokenLoginResponse(BaseModel):
access_token: str
token_type: str
@auth_router.post(
"/login",
tags=["auth"],
responses={
status.HTTP_400_BAD_REQUEST: {},
status.HTTP_200_OK: {
"model": TokenLoginResponse,
},
},
)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
if form_data.username not in users_db:
raise AUTH_ERROR
user = UserInDb(**users_db[form_data.username])
if not password_context.verify(form_data.password, user.hashed_password):
raise AUTH_ERROR
token = crypto_context.encrypt(json.dumps({"username": user.username}).encode())
return {
"access_token": token,
"token_type": "bearer",
}