115 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			115 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from fastapi import (
 | 
						|
    APIRouter,
 | 
						|
    Depends,
 | 
						|
    HTTPException,
 | 
						|
    status,
 | 
						|
)
 | 
						|
from orm.tables.account import KeyStatus
 | 
						|
from sqlalchemy.ext.asyncio import AsyncConnection
 | 
						|
 | 
						|
from api.db.connection.session import get_connection_dep
 | 
						|
from api.db.logic.keyring import create_key, get_key_by_id, update_key_by_id
 | 
						|
from api.schemas.account.account_keyring import AccountKeyring
 | 
						|
from api.schemas.base import bearer_schema
 | 
						|
from api.schemas.endpoints.account_keyring import AccountKeyringUpdate
 | 
						|
from api.services.auth import get_current_user
 | 
						|
from api.services.user_role_validation import db_user_role_validation
 | 
						|
 | 
						|
api_router = APIRouter(
 | 
						|
    prefix="/keyring",
 | 
						|
    tags=["User KeyringModel"],
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@api_router.get("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
 | 
						|
async def get_keyring_endpoint(
 | 
						|
    key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user)
 | 
						|
):
 | 
						|
    authorize_user = await db_user_role_validation(connection, current_user)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
 | 
						|
    if keyring is None:
 | 
						|
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Key not found")
 | 
						|
 | 
						|
    return keyring
 | 
						|
 | 
						|
 | 
						|
@api_router.post("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
 | 
						|
async def create_keyring_endpoint(
 | 
						|
    user_id: int,
 | 
						|
    key_id: str,
 | 
						|
    key: AccountKeyringUpdate,
 | 
						|
    connection: AsyncConnection = Depends(get_connection_dep),
 | 
						|
    current_user=Depends(get_current_user),
 | 
						|
):
 | 
						|
    authorize_user = await db_user_role_validation(connection, current_user)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
 | 
						|
    if keyring is None:
 | 
						|
        keyring_new = await create_key(
 | 
						|
            connection,
 | 
						|
            key,
 | 
						|
            key_id,
 | 
						|
        )
 | 
						|
        return keyring_new
 | 
						|
 | 
						|
    else:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_400_BAD_REQUEST, detail="An keyring with this information already exists."
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@api_router.put("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
 | 
						|
async def update_keyring_endpoint(
 | 
						|
    user_id: int,
 | 
						|
    key_id: str,
 | 
						|
    keyring_update: AccountKeyringUpdate,
 | 
						|
    connection: AsyncConnection = Depends(get_connection_dep),
 | 
						|
    current_user=Depends(get_current_user),
 | 
						|
):
 | 
						|
    authorize_user = await db_user_role_validation(connection, current_user)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
    if keyring is None:
 | 
						|
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="keyring not found")
 | 
						|
 | 
						|
    updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
 | 
						|
 | 
						|
    if not updated_values:
 | 
						|
        return keyring
 | 
						|
 | 
						|
    await update_key_by_id(connection, updated_values, keyring)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
 | 
						|
    return keyring
 | 
						|
 | 
						|
 | 
						|
@api_router.delete("/{user_id}/{key_id}", dependencies=[Depends(bearer_schema)], response_model=AccountKeyring)
 | 
						|
async def delete_keyring_endpoint(
 | 
						|
    user_id: int,
 | 
						|
    key_id: str,
 | 
						|
    connection: AsyncConnection = Depends(get_connection_dep),
 | 
						|
    current_user=Depends(get_current_user),
 | 
						|
):
 | 
						|
    authorize_user = await db_user_role_validation(connection, current_user)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
    if keyring is None:
 | 
						|
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="keyring not found")
 | 
						|
 | 
						|
    keyring_update = AccountKeyringUpdate(status=KeyStatus.DELETED.value)
 | 
						|
 | 
						|
    updated_values = keyring_update.model_dump(by_alias=True, exclude_none=True)
 | 
						|
 | 
						|
    if not updated_values:
 | 
						|
        return keyring
 | 
						|
 | 
						|
    await update_key_by_id(connection, updated_values, keyring)
 | 
						|
 | 
						|
    keyring = await get_key_by_id(connection, key_id)
 | 
						|
 | 
						|
    return keyring
 |