diff --git a/api/api/db/alembic/versions/80840e78631e_add_cascade_delete_to_node_link_foreign_.py b/api/api/db/alembic/versions/80840e78631e_add_cascade_delete_to_node_link_foreign_.py new file mode 100644 index 0000000..935900f --- /dev/null +++ b/api/api/db/alembic/versions/80840e78631e_add_cascade_delete_to_node_link_foreign_.py @@ -0,0 +1,52 @@ +"""add_cascade_delete_to_node_link_foreign_keys + +Revision ID: 80840e78631e +Revises: cc3b95f1f99d +Create Date: 2025-10-26 18:47:24.004327 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '80840e78631e' +down_revision: Union[str, None] = 'cc3b95f1f99d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Drop existing foreign key constraints + # Note: These constraint names are MySQL auto-generated names + # If they differ, check with: SHOW CREATE TABLE node_link; + op.drop_constraint('node_link_ibfk_2', 'node_link', type_='foreignkey') # next_node_id + op.drop_constraint('node_link_ibfk_3', 'node_link', type_='foreignkey') # node_id + + # Add new foreign key constraints with CASCADE + op.create_foreign_key( + 'fk_node_link_next_node_id_cascade', + 'node_link', 'ps_node', + ['next_node_id'], ['id'], + ondelete='CASCADE' + ) + op.create_foreign_key( + 'fk_node_link_node_id_cascade', + 'node_link', 'ps_node', + ['node_id'], ['id'], + ondelete='CASCADE' + ) + + +def downgrade() -> None: + """Downgrade schema.""" + # Drop CASCADE foreign key constraints + op.drop_constraint('fk_node_link_next_node_id_cascade', 'node_link', type_='foreignkey') + op.drop_constraint('fk_node_link_node_id_cascade', 'node_link', type_='foreignkey') + + # Restore original foreign key constraints without CASCADE + op.create_foreign_key('node_link_ibfk_2', 'node_link', 'ps_node', ['next_node_id'], ['id']) + op.create_foreign_key('node_link_ibfk_3', 'node_link', 'ps_node', ['node_id'], ['id']) diff --git a/api/api/db/logic/process_schema.py b/api/api/db/logic/process_schema.py index 1bc6f51..c953eb8 100644 --- a/api/api/db/logic/process_schema.py +++ b/api/api/db/logic/process_schema.py @@ -114,6 +114,7 @@ async def get_process_schema_page_DTO( limit=limit, ) + async def get_process_schema_by_id(connection: AsyncConnection, id: int) -> Optional[ProcessSchema]: """ Получает process_schema по id. diff --git a/api/api/db/logic/ps_node.py b/api/api/db/logic/ps_node.py index 2aafa75..ad7389c 100644 --- a/api/api/db/logic/ps_node.py +++ b/api/api/db/logic/ps_node.py @@ -1,15 +1,15 @@ -from typing import Optional +from typing import Optional, List from datetime import datetime, timezone -from sqlalchemy import insert, select, desc +from sqlalchemy import insert, select, desc, and_, delete, update from sqlalchemy.ext.asyncio import AsyncConnection -from orm.tables.process import ps_node_table +from orm.tables.process import ps_node_table, node_link_table, process_schema_table from api.schemas.process.ps_node import Ps_Node from model_nodes.node_listen_models import ListenNodeCoreSchema -from orm.tables.process import NodeStatus, NodeType +from orm.tables.process import NodeStatus async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps_Node]: @@ -27,21 +27,6 @@ async def get_ps_node_by_id(connection: AsyncConnection, id: int) -> Optional[Ps return Ps_Node.model_validate(ps_node_data) -async def get_ps_node_by_type_and_ps_id(connection: AsyncConnection, node_type: str, ps_id: int) -> Optional[Ps_Node]: - """ - Получает ps_node по node_type и ps_id. - """ - query = select(ps_node_table).where(ps_node_table.c.node_type == node_type, ps_node_table.c.ps_id == ps_id) - - ps_node_db_cursor = await connection.execute(query) - - ps_node_data = ps_node_db_cursor.mappings().one_or_none() - if not ps_node_data: - return None - - return Ps_Node.model_validate(ps_node_data) - - async def get_last_ps_node_by_creator_and_ps_id( connection: AsyncConnection, creator_id: int, ps_id: int ) -> Optional[Ps_Node]: @@ -86,3 +71,121 @@ async def create_ps_node_schema( await connection.commit() return await get_last_ps_node_by_creator_and_ps_id(connection, creator_id, validated_schema.ps_id) + + +async def check_node_connection(connection: AsyncConnection, node_id: int, next_node_id: int, port: int) -> bool: + """ + Проверяет, подключен ли next_node_id к node_id через указанный порт. + """ + query = select(node_link_table).where( + and_( + node_link_table.c.node_id == node_id, + node_link_table.c.next_node_id == next_node_id, + node_link_table.c.link_point_id == port, + ) + ) + + result = await connection.execute(query) + return result.mappings().first() is not None + + +async def get_nodes_for_deletion_ordered(connection: AsyncConnection, node_id: int) -> List[int]: + """ + Рекурсивно находит ВСЕ дочерние узлы и возвращает их ID в правильном порядке: + от самых глубоких к корневым. + """ + all_child_nodes = [] + visited_nodes = set() + + # тут надо будет поработать с зацикливанием на вышестояющую ноду, сейчас вышестоящая нода если она уже была учтена, не будет занесена в спиок на удаление. + async def find_children_with_depth(current_node_id: int, current_depth: int): + if current_node_id in visited_nodes: + return + + visited_nodes.add(current_node_id) + + query = ( + select(ps_node_table) + .join(node_link_table, ps_node_table.c.id == node_link_table.c.next_node_id) + .where(node_link_table.c.node_id == current_node_id) + ) + + result = await connection.execute(query) + child_nodes = result.mappings().all() + + for node_data in child_nodes: + node = Ps_Node.model_validate(node_data) + all_child_nodes.append((node, current_depth + 1)) + await find_children_with_depth(node.id, current_depth + 1) + + await find_children_with_depth(node_id, 0) + + all_child_nodes.sort(key=lambda x: x[1], reverse=True) + + ordered_node_ids = [node.id for node, depth in all_child_nodes] + + ordered_node_ids.append(node_id) + + return ordered_node_ids + + +async def delete_ps_nodes_delete_handler(connection: AsyncConnection, node_ids: List[int]) -> List[int]: + """ + Очищает settings и удаляет ноды для каждого ps_id. + Возвращает список успешно удаленных ID нод. + """ + if not node_ids: + return [] + + ps_id_rows = await connection.execute( + select(ps_node_table.c.id, ps_node_table.c.ps_id).where(ps_node_table.c.id.in_(node_ids)) + ) + rows = ps_id_rows.mappings().all() + if not rows: + return [] + + ps_to_node_ids = {} + for r in rows: + ps_to_node_ids.setdefault(r["ps_id"], []).append(r["id"]) + + deleted_all = [] + + for ps_id, ids_in_ps in ps_to_node_ids.items(): + await remove_nodes_from_process_schema_settings(connection, ps_id, ids_in_ps) + result = await connection.execute(delete(ps_node_table).where(ps_node_table.c.id.in_(ids_in_ps))) + if result.rowcount and result.rowcount > 0: + deleted_all.extend(ids_in_ps) + else: + raise Exception(f"Failed to delete nodes for ps_id={ps_id}") + + await connection.commit() + return deleted_all + + +async def remove_nodes_from_process_schema_settings(connection: AsyncConnection, ps_id: int, node_ids: List[int]): + """ + Удаляет ноды из поля settings в таблице process_schema по списку node_ids. + """ + from api.db.logic.process_schema import get_process_schema_by_id + + process_schema = await get_process_schema_by_id(connection, ps_id) + if not process_schema or not process_schema.settings: + return + + settings = process_schema.settings + if "nodes" in settings and isinstance(settings["nodes"], list): + node_ids_set = set(node_ids) + settings["nodes"] = [ + node_item + for node_item in settings["nodes"] + if not ( + isinstance(node_item, dict) + and "node" in node_item + and isinstance(node_item["node"], dict) + and node_item["node"].get("id") in node_ids_set + ) + ] + + await connection.execute( + update(process_schema_table).where(process_schema_table.c.id == ps_id).values(settings=settings) + ) diff --git a/api/api/endpoints/account.py b/api/api/endpoints/account.py index da1f0d2..a743c55 100644 --- a/api/api/endpoints/account.py +++ b/api/api/endpoints/account.py @@ -17,7 +17,7 @@ from api.schemas.account.account import User from api.schemas.base import bearer_schema from api.schemas.endpoints.account import AllUserResponse, UserCreate, UserFilterDTO, UserUpdate from api.services.auth import get_current_user -from api.services.user_role_validation import db_user_role_validation +from api.services.user_role_validation import UserRoleValidator api_router = APIRouter( prefix="/account", @@ -38,7 +38,8 @@ async def get_all_account_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) filters = { **({"status": status_filter} if status_filter else {}), @@ -67,7 +68,8 @@ async def get_account_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) user = await get_user_by_id(connection, user_id) @@ -83,7 +85,8 @@ async def create_account_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) user_validation = await get_user_by_login(connection, user.login) @@ -104,7 +107,8 @@ async def update_account_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) user = await get_user_by_id(connection, user_id) if user is None: @@ -131,7 +135,8 @@ async def delete_account_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) user = await get_user_by_id(connection, user_id) if user is None: diff --git a/api/api/endpoints/keyring.py b/api/api/endpoints/keyring.py index 6c12f50..d919a78 100644 --- a/api/api/endpoints/keyring.py +++ b/api/api/endpoints/keyring.py @@ -13,7 +13,7 @@ from api.schemas.account.account_keyring import AccountKeyring from api.schemas.base import bearer_schema from api.schemas.endpoints.account_keyring import AccountKeyringUpdate from api.services.auth import get_current_user -from api.services.user_role_validation import db_user_role_validation +from api.services.user_role_validation import UserRoleValidator api_router = APIRouter( prefix="/keyring", @@ -25,7 +25,8 @@ api_router = APIRouter( async def get_keyring_endpoint( key_id: str, connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user) ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) keyring = await get_key_by_id(connection, key_id) @@ -43,7 +44,8 @@ async def create_keyring_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) keyring = await get_key_by_id(connection, key_id) @@ -69,7 +71,8 @@ async def update_keyring_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) keyring = await get_key_by_id(connection, key_id) if keyring is None: @@ -94,7 +97,8 @@ async def delete_keyring_endpoint( connection: AsyncConnection = Depends(get_connection_dep), current_user=Depends(get_current_user), ): - authorize_user = await db_user_role_validation(connection, current_user) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_admin(current_user) keyring = await get_key_by_id(connection, key_id) if keyring is None: diff --git a/api/api/endpoints/list_events.py b/api/api/endpoints/list_events.py index 36b04c4..5874620 100644 --- a/api/api/endpoints/list_events.py +++ b/api/api/endpoints/list_events.py @@ -17,10 +17,7 @@ from api.schemas.base import bearer_schema from api.schemas.endpoints.list_events import AllListEventResponse, ListEventFilterDTO, ListEventUpdate from api.schemas.events.list_events import ListEvent from api.services.auth import get_current_user -from api.services.user_role_validation import ( - db_user_role_validation_for_list_events_and_process_schema, - db_user_role_validation_for_list_events_and_process_schema_by_list_event_id, -) +from api.services.user_role_validation import UserRoleValidator api_router = APIRouter( prefix="/list_events", @@ -54,9 +51,8 @@ async def get_all_list_events_endpoint( filters=filters if filters else None, ) - authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema( - connection, current_user - ) + validator = UserRoleValidator(connection) + authorize_user, page_flag = await validator.get_user(current_user) if not page_flag: if filter_dto.filters is None: @@ -82,9 +78,8 @@ async def get_list_events_endpoint( if list_events_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, list_events_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) if list_events_id is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") @@ -124,9 +119,8 @@ async def update_list_events( if list_events_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, list_events_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) updated_values = list_events_update.model_dump(by_alias=True, exclude_none=True) @@ -151,9 +145,8 @@ async def delete_list_events_endpoint( if list_events_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List events not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, list_events_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, list_events_validation.creator_id) list_events_update = ListEventUpdate(status=EventStatus.DELETED.value) diff --git a/api/api/endpoints/process_schema.py b/api/api/endpoints/process_schema.py index dffbbba..7257e0c 100644 --- a/api/api/endpoints/process_schema.py +++ b/api/api/endpoints/process_schema.py @@ -18,10 +18,7 @@ from api.schemas.process.process_schema import ProcessSchema, ProcessSchemaSetti from api.schemas.process.ps_node import Ps_NodeFrontResponseNode from api.schemas.process.ps_node import Ps_NodeFrontResponse from api.services.auth import get_current_user -from api.services.user_role_validation import ( - db_user_role_validation_for_list_events_and_process_schema, - db_user_role_validation_for_list_events_and_process_schema_by_list_event_id, -) +from api.services.user_role_validation import UserRoleValidator from api.db.logic.ps_node import create_ps_node_schema @@ -78,9 +75,8 @@ async def get_all_process_schema_endpoint( filters=filters if filters else None, ) - authorize_user, page_flag = await db_user_role_validation_for_list_events_and_process_schema( - connection, current_user - ) + validator = UserRoleValidator(connection) + authorize_user, page_flag = await validator.get_user(current_user) if not page_flag: if filter_dto.filters is None: @@ -106,9 +102,8 @@ async def get_process_schema_endpoint( if process_schema_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, process_schema_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) if process_schema_id is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") @@ -146,8 +141,6 @@ async def create_processschema_endpoint( validated_start_schema = start_node.validate() - print(validated_start_schema) - db_start_schema = await create_ps_node_schema(connection, validated_start_schema, user_validation.id) node = ProcessSchemaSettingsNode( @@ -192,9 +185,8 @@ async def update_process_schema_endpoint( if process_schema_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, process_schema_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) updated_values = process_schema_update.model_dump(by_alias=True, exclude_none=True) @@ -219,9 +211,8 @@ async def delete_process_schema_endpoint( if process_schema_validation is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Process schema not found") - authorize_user = await db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, process_schema_validation.creator_id - ) + validator = UserRoleValidator(connection) + authorize_user = await validator.validate_ownership(current_user, process_schema_validation.creator_id) process_schema_update = ProcessSchemaUpdate(status=ProcessStatus.DELETED.value) diff --git a/api/api/endpoints/ps_node.py b/api/api/endpoints/ps_node.py index f4c750a..744e2aa 100644 --- a/api/api/endpoints/ps_node.py +++ b/api/api/endpoints/ps_node.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, status from sqlalchemy.ext.asyncio import AsyncConnection @@ -7,20 +7,28 @@ from api.db.logic.account import get_user_by_login from api.schemas.base import bearer_schema from api.schemas.process.process_schema import ProcessSchemaSettingsNodeLink, ProcessSchemaSettingsNode -from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest +from api.schemas.process.ps_node import Ps_NodeFrontResponseNode, Ps_NodeRequest, Ps_NodeDeleteRequest from api.schemas.process.ps_node import Ps_NodeFrontResponse from api.services.auth import get_current_user -from api.db.logic.ps_node import create_ps_node_schema +from api.db.logic.ps_node import ( + create_ps_node_schema, + get_ps_node_by_id, + check_node_connection, + get_nodes_for_deletion_ordered, + delete_ps_nodes_delete_handler, +) from api.db.logic.node_link import get_last_link_name_by_node_id, create_node_link_schema -from api.db.logic.process_schema import update_process_schema_settings_by_id +from api.db.logic.process_schema import update_process_schema_settings_by_id, get_process_schema_by_id +from api.services.user_role_validation import UserRoleValidator from core import VorkNodeRegistry, VorkNodeLink from model_nodes import VorkNodeLinkData from api.utils.to_camel_dict import to_camel_dict +from api.error import create_operation_error, create_access_error, create_validation_error, create_server_error api_router = APIRouter( @@ -29,6 +37,77 @@ api_router = APIRouter( ) +@api_router.delete("", dependencies=[Depends(bearer_schema)], status_code=status.HTTP_200_OK) +async def delete_ps_node_endpoint( + ps_node_delete_data: Ps_NodeDeleteRequest, + connection: AsyncConnection = Depends(get_connection_dep), + current_user=Depends(get_current_user), +): + process_schema = await get_process_schema_by_id(connection, ps_node_delete_data.schema_id) + if process_schema is None: + raise create_operation_error( + message="Process schema not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"schema_id": ps_node_delete_data.schema_id}, + ) + + validator = UserRoleValidator(connection) + try: + await validator.validate_ownership(current_user, process_schema.creator_id) + except Exception as e: + raise create_access_error( + message="Access denied", + status_code=status.HTTP_403_FORBIDDEN, + details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)}, + ) + + ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.node_id) + if ps_node is None: + raise create_operation_error( + message="PS node not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"node_id": ps_node_delete_data.node_id}, + ) + + next_ps_node = await get_ps_node_by_id(connection, ps_node_delete_data.next_node_id) + if next_ps_node is None: + raise create_operation_error( + message="Next PS node not found", + status_code=status.HTTP_400_BAD_REQUEST, + details={"next_node_id": ps_node_delete_data.next_node_id}, + ) + + is_connected = await check_node_connection( + connection, ps_node_delete_data.node_id, ps_node_delete_data.next_node_id, int(ps_node_delete_data.port) + ) + + if not is_connected: + raise create_validation_error( + message="Node connection validation failed", + status_code=status.HTTP_400_BAD_REQUEST, + details={ + "node_id": ps_node_delete_data.node_id, + "next_node_id": ps_node_delete_data.next_node_id, + "port": ps_node_delete_data.port, + }, + ) + + ordered_node_ids = await get_nodes_for_deletion_ordered(connection, ps_node_delete_data.next_node_id) + + try: + deleted_node_ids = await delete_ps_nodes_delete_handler(connection, ordered_node_ids) + except Exception as e: + raise create_server_error( + message="Failed to delete nodes", + status_code=500, + details={"error": str(e), "ordered_node_ids": ordered_node_ids}, + ) + + return { + "deleted_node_ids": deleted_node_ids, + } + + @api_router.post("", dependencies=[Depends(bearer_schema)], response_model=Ps_NodeFrontResponse) async def create_ps_node_endpoint( ps_node: Ps_NodeRequest, @@ -37,12 +116,34 @@ async def create_ps_node_endpoint( ): user_validation = await get_user_by_login(connection, current_user) + process_schema = await get_process_schema_by_id(connection, ps_node.data["ps_id"]) + if process_schema is None: + raise create_operation_error( + message="Process schema not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"schema_id": ps_node.data["ps_id"]}, + ) + + validator = UserRoleValidator(connection) + try: + await validator.validate_ownership(current_user, process_schema.creator_id) + except Exception as e: + raise create_access_error( + message="Access denied", + status_code=status.HTTP_403_FORBIDDEN, + details={"user_id": current_user, "schema_creator_id": process_schema.creator_id, "reason": str(e)}, + ) + registery = VorkNodeRegistry() vork_node = registery.get(ps_node.data["node_type"]) if vork_node is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Node not found") + raise create_operation_error( + message="Node type not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"node_type": ps_node.data["node_type"]}, + ) node_descriptor = vork_node.form() try: @@ -51,43 +152,93 @@ async def create_ps_node_endpoint( node_instance_validated = node_instance.validate() except Exception as e: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + raise create_validation_error( + message="Node validation failed", + status_code=status.HTTP_400_BAD_REQUEST, + details={"error": str(e)}, + ) - db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id) - link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id) + # Проверка: parent_id принадлежит тому же ps_id + parent_id = node_instance_validated.parent_id + target_ps_id = ps_node.data["ps_id"] - link_data = VorkNodeLinkData( - parent_port_number=node_instance_validated.parent_port_number, - to_id=db_ps_node.id, - from_id=node_instance_validated.parent_id, - last_link_name=link_name, - ) + parent_node = await get_ps_node_by_id(connection, parent_id) + if parent_node is None: + raise create_operation_error( + message="Parent PS node not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"parent_id": parent_id}, + ) + if parent_node.ps_id != target_ps_id: + raise create_validation_error( + message="Parent PS node belongs to another process schema", + status_code=status.HTTP_400_BAD_REQUEST, + details={"parent_id": parent_id, "expected_ps_id": target_ps_id, "actual_ps_id": parent_node.ps_id}, + ) - link = VorkNodeLink(data=link_data.model_dump()) + # Проверка: у родительской ноды есть указанный порт + parent_port_number = node_instance_validated.parent_port_number - validated_link = link.validate() + # Извлекаем номера портов из settings родительской ноды + parent_settings = parent_node.settings or {} + available_port_numbers = [] - db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id) + # Ищем все ключи, содержащие "port" в названии + for key, value in parent_settings.items(): + if "port" in key.lower() and isinstance(value, int): + available_port_numbers.append(value) - links_settings = ProcessSchemaSettingsNodeLink( - id=db_node_link.id, - link_name=db_node_link.link_name, - parent_port_number=db_node_link.link_point_id, - from_id=db_node_link.node_id, - to_id=db_node_link.next_node_id, - ) + # Проверяем, что указанный порт существует в settings + if parent_port_number not in available_port_numbers: + raise create_validation_error( + message="Parent port number is invalid", + status_code=status.HTTP_400_BAD_REQUEST, + details={"parent_id": parent_id, "parent_settings": parent_settings}, + ) - node_settings = ProcessSchemaSettingsNode( - id=db_ps_node.id, - node_type=db_ps_node.node_type, - data=node_instance_validated.data.model_dump(), - from_node=None, - links=[{"links": links_settings.model_dump()}], - ) + try: + db_ps_node = await create_ps_node_schema(connection, node_instance_validated, user_validation.id) + link_name = await get_last_link_name_by_node_id(connection, db_ps_node.ps_id) - settings_dict = {"node": node_settings.model_dump(mode="json")} + link_data = VorkNodeLinkData( + parent_port_number=node_instance_validated.parent_port_number, + to_id=db_ps_node.id, + from_id=node_instance_validated.parent_id, + last_link_name=link_name, + ) - await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict) + link = VorkNodeLink(data=link_data.model_dump()) + + validated_link = link.validate() + + db_node_link = await create_node_link_schema(connection, validated_link, user_validation.id) + + links_settings = ProcessSchemaSettingsNodeLink( + id=db_node_link.id, + link_name=db_node_link.link_name, + parent_port_number=db_node_link.link_point_id, + from_id=db_node_link.node_id, + to_id=db_node_link.next_node_id, + ) + + node_settings = ProcessSchemaSettingsNode( + id=db_ps_node.id, + node_type=db_ps_node.node_type, + data=node_instance_validated.data.model_dump(), + from_node=None, + links=[{"links": links_settings.model_dump()}], + ) + + settings_dict = {"node": node_settings.model_dump(mode="json")} + + await update_process_schema_settings_by_id(connection, db_ps_node.ps_id, settings_dict) + + except Exception as e: + raise create_server_error( + message="Failed to create node", + status_code=500, + details={"error": str(e)}, + ) ps_node_front_response = Ps_NodeFrontResponse( description=node_descriptor.model_dump(), diff --git a/api/api/error/__init__.py b/api/api/error/__init__.py new file mode 100644 index 0000000..c19a55f --- /dev/null +++ b/api/api/error/__init__.py @@ -0,0 +1,26 @@ +""" +Модуль для обработки ошибок API. +""" + +from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType + +from .error_handlers import ( + handle_api_error, + create_server_error, + create_access_error, + create_operation_error, + create_validation_error, +) + +__all__ = [ + "ServerError", + "AccessError", + "OperationError", + "ValidationError", + "ErrorType", + "handle_api_error", + "create_server_error", + "create_access_error", + "create_operation_error", + "create_validation_error", +] diff --git a/api/api/error/error_handlers.py b/api/api/error/error_handlers.py new file mode 100644 index 0000000..8960f52 --- /dev/null +++ b/api/api/error/error_handlers.py @@ -0,0 +1,54 @@ +""" +Обработчики ошибок для API. +""" + +from typing import Optional, Dict, Any +from fastapi import HTTPException + +from .error_model.error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType + + +def handle_api_error( + error_type: ErrorType, message: str, status_code: int, details: Optional[Dict[str, Any]] = None +) -> HTTPException: + """ + Функция для создания HTTPException с правильной структурой ошибки. + + """ + match error_type: + case ErrorType.SERVER: + error = ServerError(message=message, details=details) + case ErrorType.ACCESS: + error = AccessError(message=message, details=details) + case ErrorType.OPERATION: + error = OperationError(message=message, details=details) + case ErrorType.VALIDATION: + error = ValidationError(message=message, details=details) + case _: + error = ServerError(message=message, details=details) + + return HTTPException(status_code=status_code, detail=error.model_dump(mode="json")) + + +def create_server_error( + message: str, status_code: int = 500, details: Optional[Dict[str, Any]] = None +) -> HTTPException: + return handle_api_error(error_type=ErrorType.SERVER, message=message, status_code=status_code, details=details) + + +def create_access_error( + message: str, status_code: int = 403, details: Optional[Dict[str, Any]] = None +) -> HTTPException: + return handle_api_error(error_type=ErrorType.ACCESS, message=message, status_code=status_code, details=details) + + +def create_operation_error( + message: str, status_code: int = 400, details: Optional[Dict[str, Any]] = None +) -> HTTPException: + return handle_api_error(error_type=ErrorType.OPERATION, message=message, status_code=status_code, details=details) + + +def create_validation_error( + message: str, status_code: int = 422, details: Optional[Dict[str, Any]] = None +) -> HTTPException: + return handle_api_error(error_type=ErrorType.VALIDATION, message=message, status_code=status_code, details=details) diff --git a/api/api/error/error_model/__init__.py b/api/api/error/error_model/__init__.py new file mode 100644 index 0000000..b9070d1 --- /dev/null +++ b/api/api/error/error_model/__init__.py @@ -0,0 +1,7 @@ +""" +Модели ошибок для API. +""" + +from .error_types import ServerError, AccessError, OperationError, ValidationError, ErrorType + +__all__ = ["ServerError", "AccessError", "OperationError", "ValidationError", "ErrorType"] diff --git a/api/api/error/error_model/error_types.py b/api/api/error/error_model/error_types.py new file mode 100644 index 0000000..ee32422 --- /dev/null +++ b/api/api/error/error_model/error_types.py @@ -0,0 +1,61 @@ +""" +Типизированные модели ошибок для API. +""" + +from enum import Enum +from typing import Optional, Dict, Any +from pydantic import BaseModel + + +class ErrorType(str, Enum): + """ + Типы ошибок API. + """ + + SERVER = "SERVER" + ACCESS = "ACCESS" + OPERATION = "OPERATION" + VALIDATION = "VALIDATION" + + +class BaseError(BaseModel): + """ + Базовая модель ошибки. + """ + + error_type: ErrorType + message: str + details: Optional[Dict[str, Any]] = None + + +class ServerError(BaseError): + """ + Критические серверные ошибки (БД, соединения и прочие неприятности). + """ + + error_type: ErrorType = ErrorType.SERVER + + +class AccessError(BaseError): + """ + Ошибки доступа (несоответствие тенантности, ролям доступа). + """ + + error_type: ErrorType = ErrorType.ACCESS + + +class OperationError(BaseError): + """ + Ошибки операции (несоответствие прохождению верификации, ошибки в датасете). + """ + + error_type: ErrorType = ErrorType.OPERATION + + +class ValidationError(BaseError): + """ + Ошибки валидации (несоответствие первичной валидации). + """ + + error_type: ErrorType = ErrorType.VALIDATION + field_errors: Optional[Dict[str, str]] = None diff --git a/api/api/schemas/process/ps_node.py b/api/api/schemas/process/ps_node.py index 5d75a1a..49d7bb3 100644 --- a/api/api/schemas/process/ps_node.py +++ b/api/api/schemas/process/ps_node.py @@ -6,6 +6,13 @@ from orm.tables.process import NodeStatus, NodeType from api.schemas.base import Base +class Ps_NodeDeleteRequest(Base): + schema_id: int + node_id: int + port: str + next_node_id: int + + class Ps_NodeRequest(Base): data: Dict[str, Any] links: Dict[str, Any] diff --git a/api/api/services/user_role_validation.py b/api/api/services/user_role_validation.py index bb1f86a..0e7b458 100644 --- a/api/api/services/user_role_validation.py +++ b/api/api/services/user_role_validation.py @@ -1,32 +1,75 @@ -from fastapi import ( - HTTPException, - status, -) +from fastapi import status from orm.tables.account import AccountRole - from api.db.logic.account import get_user_by_login +from api.error import create_operation_error, create_access_error -async def db_user_role_validation(connection, current_user): - authorize_user = await get_user_by_login(connection, current_user) - if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions") - return authorize_user +class UserRoleValidator: + """Валидатор ролей пользователей""" + def __init__(self, connection): + self.connection = connection -async def db_user_role_validation_for_list_events_and_process_schema_by_list_event_id( - connection, current_user, current_listevents_creator_id -): - authorize_user = await get_user_by_login(connection, current_user) - if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: - if authorize_user.id != current_listevents_creator_id: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You do not have enough permissions") - return authorize_user + async def validate_admin(self, current_user: int): + """Проверяет права администратора или владельца""" + try: + authorize_user = await get_user_by_login(self.connection, current_user) + except Exception as e: + raise create_operation_error( + message="User not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"user_id": current_user, "error": str(e)}, + ) + if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: + raise create_access_error( + message="Insufficient permissions", + status_code=status.HTTP_403_FORBIDDEN, + details={ + "user_id": current_user, + "user_role": authorize_user.role.value, + "required_roles": [AccountRole.OWNER.value, AccountRole.ADMIN.value], + }, + ) -async def db_user_role_validation_for_list_events_and_process_schema(connection, current_user): - authorize_user = await get_user_by_login(connection, current_user) - if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: - return authorize_user, False - else: - return authorize_user, True + return authorize_user + + async def validate_ownership(self, current_user: int, resource_owner_id: int): + """Проверяет владение ресурсом или права администратора""" + try: + authorize_user = await get_user_by_login(self.connection, current_user) + except Exception as e: + raise create_operation_error( + message="User not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"user_id": current_user, "error": str(e)}, + ) + + if authorize_user.role not in {AccountRole.OWNER, AccountRole.ADMIN}: + if authorize_user.id != resource_owner_id: + raise create_access_error( + message="Access denied", + status_code=status.HTTP_403_FORBIDDEN, + details={ + "user_id": current_user, + "resource_owner_id": resource_owner_id, + "user_role": authorize_user.role.value, + "reason": "User is not the owner and does not have admin privileges", + }, + ) + + return authorize_user + + async def get_user(self, current_user: int): + """Получает пользователя с админ-статусом""" + try: + authorize_user = await get_user_by_login(self.connection, current_user) + except Exception as e: + raise create_operation_error( + message="User not found", + status_code=status.HTTP_404_NOT_FOUND, + details={"user_id": current_user, "error": str(e)}, + ) + + is_admin = authorize_user.role in {AccountRole.OWNER, AccountRole.ADMIN} + return authorize_user, is_admin