71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from fastapi import (
|
|
Request,
|
|
status,
|
|
)
|
|
|
|
from fastapi.responses import JSONResponse
|
|
from api.config import get_settings
|
|
|
|
import re
|
|
from re import escape
|
|
|
|
|
|
from fastapi_jwt_auth import AuthJWT
|
|
|
|
|
|
|
|
class MiddlewareAccessTokenValidadtion(BaseHTTPMiddleware):
|
|
def __init__(self, app):
|
|
super().__init__(app)
|
|
|
|
self.prefix = escape(get_settings().PATH_PREFIX)
|
|
self.excluded_routes = [
|
|
re.compile(r'^' + re.escape(self.prefix) + r'/auth/refresh/?$'),
|
|
re.compile(r'^' + re.escape(self.prefix) + r'/auth/?$')
|
|
]
|
|
|
|
|
|
|
|
async def dispatch(self,
|
|
request: Request,
|
|
call_next):
|
|
|
|
if request.method in ["GET", "POST", "PUT", "DELETE"]:
|
|
if any(pattern.match(request.url.path) for pattern in self.excluded_routes):
|
|
return await call_next(request)
|
|
else:
|
|
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header:
|
|
return JSONResponse(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
content={"detail": "Missing authorization header."},
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
token = auth_header.split(" ")[1]
|
|
Authorize = AuthJWT(request)
|
|
|
|
try:
|
|
current_user = Authorize.get_jwt_subject()
|
|
request.state.current_user = current_user
|
|
return await call_next(request)
|
|
|
|
except Exception:
|
|
return JSONResponse(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
content={"detail": "The access token is invalid or expired."},
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
|
|
|
|
# async with get_connection() as connection:
|
|
# authorize_user = await get_user_login(connection, current_user)
|
|
# print(authorize_user)
|
|
# if authorize_user is None :
|
|
# return JSONResponse(
|
|
# status_code=status.HTTP_404_NOT_FOUND ,
|
|
# detail="User not found.")
|