Files
connect/api/api/endpoints/keyring.py
2025-11-06 23:34:47 +05:00

123 lines
3.8 KiB
Python

from fastapi import (
APIRouter,
Depends,
status,
)
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.schemas.account.account_keyring import AccountKeyring
from api.schemas.base import bearer_schema
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
from api.services.auth import get_current_user
from api.services.user_role_validation import UserRoleValidator
from api.error import create_operation_error, create_validation_error
from api.services.endpoints.keyring import KeyringService
api_router = APIRouter(
prefix="/keyring",
tags=["User KeyringModel"],
)
@api_router.get("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def get_keyring_endpoint(
key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user)
):
validator = UserRoleValidator(connection)
await validator.validate_admin(current_user)
service = KeyringService(connection)
keyring = await service.get(key_id)
if keyring is None:
raise create_operation_error(
message="Key not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"key_id": key_id},
)
return keyring
@api_router.post("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def create_keyring_endpoint(
user_id: int,
key_id: str,
key: AccountKeyringUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
validator = UserRoleValidator(connection)
await validator.validate_admin(current_user)
service = KeyringService(connection)
keyring = await service.get(key_id)
if keyring is None:
keyring_new = await service.create(key, key_id)
return keyring_new
else:
raise create_validation_error(
message="A keyring with this information already exists.",
status_code=status.HTTP_400_BAD_REQUEST,
details={"key_id": key_id},
)
@api_router.put("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def update_keyring_endpoint(
user_id: int,
key_id: str,
keyring_update: AccountKeyringUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
validator = UserRoleValidator(connection)
await validator.validate_admin(current_user)
service = KeyringService(connection)
keyring = await service.get(key_id)
if keyring is None:
raise create_operation_error(
message="keyring not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"key_id": key_id},
)
updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return keyring
updated_keyring = await service.update(key_id, updated_values)
return updated_keyring
@api_router.delete("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
async def delete_keyring_endpoint(
user_id: int,
key_id: str,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
validator = UserRoleValidator(connection)
await validator.validate_admin(current_user)
service = KeyringService(connection)
keyring = await service.get(key_id)
if keyring is None:
raise create_operation_error(
message="keyring not found",
status_code=status.HTTP_404_NOT_FOUND,
details={"key_id": key_id},
)
deleted_keyring = await service.delete(key_id)
return deleted_keyring