fix: rename process schema

This commit is contained in:
TheNoxium
2025-08-02 22:05:29 +05:00
parent c66c21eb14
commit 3c300f7c7a
4 changed files with 61 additions and 66 deletions

View File

@@ -0,0 +1,176 @@
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncConnection
from api.db.connection.session import get_connection_dep
from api.db.logic.account import get_user_by_login
from api.db.logic.process_schema import (
get_process_schema_by_title,
create_process_schema,
get_process_schema_by_id,
update_process_schema_by_id,
get_process_schema_page_DTO,
)
from api.schemas.process.process_schema import ProcessSchema
from api.db.tables.process import ProcessStatus
from api.schemas.base import bearer_schema
from api.schemas.endpoints.process_schema import ProcessSchemaUpdate, AllProcessSchemaResponse, ProcessSchemaFilterDTO
from api.services.auth import get_current_user
from api.services.user_role_validation import (
db_user_role_validation_for_listevents_and_processschema_by_listevent_id,
db_user_role_validation_for_listevents_and_processschema,
)
api_router = APIRouter(
prefix="/process_schema",
tags=["process schema"],
)
@api_router.get("", dependencies=[Depends(bearer_schema)], response_model=AllProcessSchemaResponse)
async def get_all_process_schema(
page: int = Query(1, description="Page number", gt=0),
limit: int = Query(10, description="Number of items per page", gt=0),
search: Optional[str] = Query(None, description="Search term to filter by title or description"),
order_field: Optional[str] = Query("id", description="Field to sort by"),
order_direction: Optional[str] = Query("asc", description="Sort direction (asc/desc)"),
status_filter: Optional[List[str]] = Query(None, description="Filter by status"),
owner_id: Optional[List[str]] = Query(None, description="Filter by owner id"),
connection: AsyncConnection = Depends(get_connection_dep),
creator_id: Optional[int] = Query(None, description="Filter by creator id"),
current_user=Depends(get_current_user),
):
filters = {
**({"status": status_filter} if status_filter else {}),
**({"owner_id": owner_id} if owner_id else {}),
**({"creator_id": [str(creator_id)]} if creator_id else {}),
}
filter_dto = ProcessSchemaFilterDTO(
pagination={"page": page, "limit": limit},
search=search,
order={"field": order_field, "direction": order_direction},
filters=filters if filters else None,
)
authorize_user, page_flag = await db_user_role_validation_for_listevents_and_processschema(connection, current_user)
if not page_flag:
if filter_dto.filters is None:
filter_dto.filters = {}
filter_dto.filters["creator_id"] = [str(authorize_user.id)]
process_schema_page = await get_process_schema_page_DTO(connection, filter_dto)
if process_schema_page is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schema_page
@api_router.get("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def get_process_schema(
process_schema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, process_schema_validation.creator_id
)
if process_schema_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
return process_schema_validation
@api_router.post("", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def create_processschema(
process_schema: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
user_validation = await get_user_by_login(connection, current_user)
process_schema_validation = await get_process_schema_by_title(connection, process_schema.title)
if process_schema_validation is None:
await create_process_schema(connection, process_schema, user_validation.id)
process_schema_new = await get_process_schema_by_title(connection, process_schema.title)
return process_schema_new
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="An process schema with this information already exists."
)
@api_router.put("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def update_process_schema(
process_schema_id: int,
process_schema_update: ProcessSchemaUpdate,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, process_schema_validation.creator_id
)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return process_schema_validation
await update_process_schema_by_id(connection, updated_values, process_schema_validation)
process_schema = await get_process_schema_by_id(connection, process_schema_id)
return process_schema
@api_router.delete("/{process_schema_id}", dependencies=[Depends(bearer_schema)], response_model=ProcessSchema)
async def delete_process_schema(
process_schema_id: int,
connection: AsyncConnection = Depends(get_connection_dep),
current_user=Depends(get_current_user),
):
process_schema_validation = await get_process_schema_by_id(connection, process_schema_id)
if process_schema_validation is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found")
authorize_user = await db_user_role_validation_for_listevents_and_processschema_by_listevent_id(
connection, current_user, process_schema_validation.creator_id
)
process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value)
updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True)
if not updated_values:
return process_schema_validation
await update_process_schema_by_id(connection, updated_values, process_schema_validation)
process_schema = await get_process_schema_by_id(connection, process_schema_id)
return process_schema