Files
connect/api/api/endpoints/keyring.py
2025-07-31 21:53:33 +05:00

126 lines
3.7 KiB
Python

from fastapi import (
APIRouter,
Body,
Depends,
Form,
HTTPException,
Response,
status,
)
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.keyring import get_key_by_id, create_key, update_key_by_id
from api.db.tables.account import KeyStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.schemas.account.account_keyring import AccountKeyring
from api.services.auth import get_current_user
from api.services.user_role_validation import db_user_role_validation
api_router = APIRouter(
prefix="/keyring",
tags=["User KeyringModel"],
)
@api_router.get("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def get_keyring(
key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user)
):
authorize_user = await db_user_role_validation(connection, current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Key not found")
return keyring
@api_router.post("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def create_keyring(
user_id: int,
key_id: str,
key: AccountKeyringUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:
keyring_new = await create_key(
connection,
key,
key_id,
)
return keyring_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An keyring with this information already exists."
)
@api_router.put("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def update_keyring(
user_id: int,
key_id: str,
keyring_update: AccountKeyringUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="keyring not found")
updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return keyring
await update_key_by_id(connection, updated_values, keyring)
keyring = await get_key_by_id(connection, key_id)
return keyring
@api_router.delete("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def delete_keyring(
user_id: int,
key_id: str,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
authorize_user = await db_user_role_validation(connection, current_user)
keyring = await get_key_by_id(connection, key_id)
if keyring is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="keyring not found")
keyring_update = AccountKeyringUpdate(status=KeyStatus.DELETED.value)
updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return keyring
await update_key_by_id(connection, updated_values, keyring)
keyring = await get_key_by_id(connection, key_id)
return keyring