from datetime import datetime, timedelta, timezone from fastapi import ( APIRouter, Depends, HTTPException, Request, Response, status, ) from loguru import logger from fastapi_jwt_auth import AuthJWT from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncConnection from api.config import get_settings from api.db.connection.session import get_connection_dep from api.services.auth import authenticate_user from api.db.logic.auth import add_new_refresh_token, upgrade_old_refresh_token from api.schemas.endpoints.auth import Auth, Access api_router = APIRouter( prefix="/auth", tags=["User auth"], ) class Settings(BaseModel): authjwt_secret_key: str = get_settings().SECRET_KEY # Configure application to store and get JWT from cookies authjwt_token_location: set = {"headers", "cookies"} authjwt_cookie_domain: str = get_settings().DOMAIN # Only allow JWT cookies to be sent over https authjwt_cookie_secure: bool = get_settings().ENV == "prod" # Enable csrf double submit protection. default is True authjwt_cookie_csrf_protect: bool = False authjwt_cookie_samesite: str = "lax" @AuthJWT.load_config def get_config(): return Settings() @api_router.post("", response_model=Access) async def login_for_access_token( user: Auth, response: Response, connection: AsyncConnection = Depends(get_connection_dep), Authorize: AuthJWT = Depends(), ): """Авторизирует, выставляет токены в куки.""" user = await authenticate_user(connection, user.login, user.password) # print("login_for_access_token", user) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", # headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=get_settings().ACCESS_TOKEN_EXPIRE_MINUTES) refresh_token_expires = timedelta(days=get_settings().REFRESH_TOKEN_EXPIRE_DAYS) logger.debug(f"refresh_token_expires {refresh_token_expires}") access_token = Authorize.create_access_token(subject=user.login, expires_time=access_token_expires) refresh_token = Authorize.create_refresh_token(subject=user.login, expires_time=refresh_token_expires) refresh_token_expires_time = datetime.now(timezone.utc) + refresh_token_expires await add_new_refresh_token(connection, refresh_token, refresh_token_expires_time, user) Authorize.set_refresh_cookies(refresh_token) return Access(access_token=access_token) @api_router.post("/refresh",response_model=Access) async def refresh( request: Request, connection: AsyncConnection = Depends(get_connection_dep), Authorize: AuthJWT = Depends() ): refresh_token = request.cookies.get("refresh_token_cookie") # print("Refresh Token:", refresh_token) if not refresh_token: raise HTTPException(status_code=401, detail="Refresh token is missing") try: Authorize.jwt_refresh_token_required() current_user = Authorize.get_jwt_subject() except Exception as e: await upgrade_old_refresh_token(connection, current_user, refresh_token) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", ) access_token_expires = timedelta(minutes=get_settings().ACCESS_TOKEN_EXPIRE_MINUTES) new_access_token = Authorize.create_access_token(subject=current_user, expires_time=access_token_expires) return Access(access_token=new_access_token)